Take the 2-minute tour ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

I just took a stab at creating a library for something I'm dubbing semi-synchronous programming (async + dependencies). It's rather dense, but I'd really appreciate a quick code review to ensure I'm not introducing any race conditions. Feedback on best practices, general improvements is also welcome.

from multiprocessing import Queue, Process, Manager
from collections import defaultdict
from time import sleep

def queue_function(fn, args, kwargs):
  semisync.q.put([fn(*args, **kwargs), id(fn)])

def start_process(fn, args, kwargs):
  p = Process(target=queue_function, args=(fn, args, kwargs))
  p.start()

  semisync.processes.append(p)

def cleanup():
  # ensure no processes remain in a zombie state
  while semisync.processes:
    p = semisync.processes.pop()
    p.join()

def generate_dependency_trees(tree):
  for fn in tree.keys():
    for dependency in tree[fn].get('dependencies', []):
      semisync.depends_on[fn].add(dependency)
      semisync.needed_for[dependency].add(fn)

def dependencies(fn):
  tree, completed = semisync.tree, semisync.completed
  return [d for d in tree[fn]['dependencies'] if d not in completed]

def independent_fns(tree):
  result = []
  for key in tree.keys():
    if not dependencies(key):
      result.append(key)
  return result

# wrap method in fn to call semisynchronously
def semisync_method(c, method_name):
  def method(*args, **kwargs):
    getattr(c, method_name)(*args, **kwargs)
  return method 

def merge_dicts(d1, d2):
  for key in ['args', 'kwargs']:
    d1[key] += d2.get(key, [])
  return d1

class semisync:
  tree = {}
  q = Queue()
  processes = []
  map = {}
  manager = Manager()
  depends_on = defaultdict(set)
  needed_for = defaultdict(set)
  completed = set()
  fn_map = {}
  lock = manager.Lock()

  def __init__(self, callback=False, dependencies=set()):
    self.callback = callback
    self.dependencies = dependencies

  def __call__(self, fn):
    """Returns decorated function"""
    def semisync_fn(*args, **kwargs):
      fn_call = {'callback': self.callback, 'args': [args], 'kwargs': [kwargs],
                 'dependencies': set([semisync.map[d] for d in self.dependencies])}
      semisync.tree[fn] = merge_dicts(fn_call, semisync.tree.get(fn, {}))

    # functions cannot be added to queue
    # work around this by passing an id inst
    semisync.fn_map[id(fn)] = fn

    #mapping from decorated function to undecorated function
    semisync.map[semisync_fn] = fn
    return semisync_fn

  @classmethod
  def clear(self):
    semisync.completed = set()
    semisync.depends_on = defaultdict(set)
    semisync.needed_for = defaultdict(set)

  @classmethod
  def begin(self):
    # applies fn(*args) for each obj in object, ensuring
    # that the proper attributes of shared_data exist before calling a method

    # because some functions depend on the results of other functions, this is 
    # a semi-synchronous operation -- certain methods must be guaranteed to
    # terminate before others 

    # aliasing
    completed = semisync.completed
    tree, q, processes = semisync.tree, semisync.q, semisync.processes
    depends_on, needed_for = semisync.depends_on, semisync.needed_for
    fn_map = semisync.fn_map

    generate_dependency_trees(tree)

    # start a new process for each object that has no dependencies
    for fn in independent_fns(tree):
      for i in range(len(tree[fn]['args'])):
        args, kwargs = tree[fn]['args'].pop(), tree[fn]['kwargs'].pop()
        start_process(fn, args, kwargs)


    # read from queue as items are added
    i = 0
    while i < len(processes):

      # update note with new data
      result, fn_id = semisync.q.get()
      fn = fn_map[fn_id]
      completed.add(fn)

      #execute callback
      if tree[fn]['callback']:
        tree[fn]['callback'](*result)

      # iterate through objects that depended on the completed obj
      # and remove the completed object from the list of their dependencies

      for other_fn in needed_for[fn]:
        depends_on[other_fn].remove(fn)

        # if any objects now have zero dependencies
        # start an async process for them
        if not depends_on[other_fn]:
          for j in range(len(tree[other_fn]['args'])):
            args, kwargs = tree[other_fn]['args'].pop(), tree[other_fn]['kwargs'].pop()
            start_process(other_fn, args, kwargs)


      i += 1

    cleanup()

Usage is as follows:

from semisync import semisync
from multiprocessing import Manager
from random import random, randint
from time import sleep

# shared data between processes
shared = Manager().Namespace()

# a demo callback function
def output(field, value):
    print field + ": $" + str(value)

# simple callback syntax
@semisync(callback=output)
def revenue():
    # simulated api call
    sleep(random())
    shared.revenue = randint(1, 1000)
    return "Revenue", shared.revenue

@semisync(callback=output)
def expenses():
    # simulated api call
    sleep(random())
    shared.expenses = randint(1, 500)
    return "Expenses", shared.expenses

# will run only when revenue() and expenses() have completed
@semisync(callback=output, dependencies=[revenue, expenses])
def profit():
    shared.profit = shared.revenue - shared.expenses
    return "Profit", shared.profit

# queue function calls
revenue()
expenses()
profit()

# executes queued calls semi-synchronously
semisync.begin()
share|improve this question
    
You should check out the new 3.5 keywords. –  Caridorc Jul 20 at 7:42
1  
I actually have! Also realized that concurrent.futures exists, haha. –  Madison May Jul 20 at 20:55

1 Answer 1

PEP0008 says to use 4 spaces per indentation level. Using just 2 can make it harder to see what is and isn't a full indentation.

You should use docstrings, especially since for cleanup you wrote a comment that could be one if it was formatted as a docstring. They're basically comments that are programmatically accessible so that someone else using your script can read what the function does.

def cleanup():
    """Ensure no processes remain in a zombie state"""
share|improve this answer

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.