Take the 2-minute tour ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

I just took a stab at creating a library for something I'm dubbing semi-synchronous programming (async + dependencies). It's rather dense, but I'd really appreciate a quick code review to ensure I'm not introducing any race conditions. Feedback on best practices, general improvements is also welcome.

Code below:

from multiprocessing import Queue, Process, Manager
from collections import defaultdict
from time import sleep

def queue_function(fn, args, kwargs):
  semisync.q.put([fn(*args, **kwargs), id(fn)])

def start_process(fn, args, kwargs):
  p = Process(target=queue_function, args=(fn, args, kwargs))
  p.start()

  semisync.processes.append(p)

def cleanup():
  # ensure no processes remain in a zombie state
  while semisync.processes:
    p = semisync.processes.pop()
    p.join()

def generate_dependency_trees(tree):
  for fn in tree.keys():
    for dependency in tree[fn].get('dependencies', []):
      semisync.depends_on[fn].add(dependency)
      semisync.needed_for[dependency].add(fn)

def dependencies(fn):
  tree, completed = semisync.tree, semisync.completed
  return [d for d in tree[fn]['dependencies'] if d not in completed]

def independent_fns(tree):
  result = []
  for key in tree.keys():
    if not dependencies(key):
      result.append(key)
  return result

# wrap method in fn to call semisynchronously
def semisync_method(c, method_name):
  def method(*args, **kwargs):
    getattr(c, method_name)(*args, **kwargs)
  return method 

def merge_dicts(d1, d2):
  for key in ['args', 'kwargs']:
    d1[key] += d2.get(key, [])
  return d1

class semisync:
  tree = {}
  q = Queue()
  processes = []
  map = {}
  manager = Manager()
  depends_on = defaultdict(set)
  needed_for = defaultdict(set)
  completed = set()
  fn_map = {}
  lock = manager.Lock()

  def __init__(self, callback=False, dependencies=set()):
    self.callback = callback
    self.dependencies = dependencies

  def __call__(self, fn):
    """Returns decorated function"""
    def semisync_fn(*args, **kwargs):
      fn_call = {'callback': self.callback, 'args': [args], 'kwargs': [kwargs],
                 'dependencies': set([semisync.map[d] for d in self.dependencies])}
      semisync.tree[fn] = merge_dicts(fn_call, semisync.tree.get(fn, {}))

    # functions cannot be added to queue
    # work around this by passing an id inst
    semisync.fn_map[id(fn)] = fn

    #mapping from decorated function to undecorated function
    semisync.map[semisync_fn] = fn
    return semisync_fn

  @classmethod
  def clear(self):
    semisync.completed = set()
    semisync.depends_on = defaultdict(set)
    semisync.needed_for = defaultdict(set)

  @classmethod
  def begin(self):
    # applies fn(*args) for each obj in object, ensuring
    # that the proper attributes of shared_data exist before calling a method

    # because some functions depend on the results of other functions, this is 
    # a semi-synchronous operation -- certain methods must be guaranteed to
    # terminate before others 

    # aliasing
    completed = semisync.completed
    tree, q, processes = semisync.tree, semisync.q, semisync.processes
    depends_on, needed_for = semisync.depends_on, semisync.needed_for
    fn_map = semisync.fn_map

    generate_dependency_trees(tree)

    # start a new process for each object that has no dependencies
    for fn in independent_fns(tree):
      for i in range(len(tree[fn]['args'])):
        args, kwargs = tree[fn]['args'].pop(), tree[fn]['kwargs'].pop()
        start_process(fn, args, kwargs)


    # read from queue as items are added
    i = 0
    while i < len(processes):

      # update note with new data
      result, fn_id = semisync.q.get()
      fn = fn_map[fn_id]
      completed.add(fn)

      #execute callback
      if tree[fn]['callback']:
        tree[fn]['callback'](*result)

      # iterate through objects that depended on the completed obj
      # and remove the completed object from the list of their dependencies

      for other_fn in needed_for[fn]:
        depends_on[other_fn].remove(fn)

        # if any objects now have zero dependencies
        # start an async process for them
        if not depends_on[other_fn]:
          for j in range(len(tree[other_fn]['args'])):
            args, kwargs = tree[other_fn]['args'].pop(), tree[other_fn]['kwargs'].pop()
            start_process(other_fn, args, kwargs)


      i += 1

    cleanup()

Usage is as follows:

from semisync import semisync
from multiprocessing import Manager
from random import random, randint
from time import sleep

# shared data between processes
shared = Manager().Namespace()

# a demo callback function
def output(field, value):
    print field + ": $" + str(value)

# simple callback syntax
@semisync(callback=output)
def revenue():
    # simulated api call
    sleep(random())
    shared.revenue = randint(1, 1000)
    return "Revenue", shared.revenue

@semisync(callback=output)
def expenses():
    # simulated api call
    sleep(random())
    shared.expenses = randint(1, 500)
    return "Expenses", shared.expenses

# will run only when revenue() and expenses() have completed
@semisync(callback=output, dependencies=[revenue, expenses])
def profit():
    shared.profit = shared.revenue - shared.expenses
    return "Profit", shared.profit

# queue function calls
revenue()
expenses()
profit()

# executes queued calls semi-synchronously
semisync.begin()
share|improve this question

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Browse other questions tagged or ask your own question.