Tell me more ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

I'm trying to implement a kind of message queue. Tasks will come in at unknown random times, and must be executed FIFO. I can do multiple tasks in one hit, but the setup and tear down unavoidably takes some time, so I want to do as many in one 'burst' as I can - i.e. empty whatever is in the queue. I guess the worker thread should block when the queue is empty, but the class should not be blocked while it is working, and a bunch of tasks might queue up while I was processing the last lot.

Here is my first design, it seems to work but I can't help feeling it's a bit clunky and could be implemented more simply or elegantly.

class Foo(object):

  def __init__(self):
    import sys, threading, Queue, time

    def worker():
      while True:
        sys.stderr.write('getting some things from queue...')
        things = []
        try:
          things.append(q.get())
          while True:
            things.append(q.get_nowait())
        except Queue.Empty:
          pass
        n = len(things)
        sys.stderr.write('got {0} thing(s)\n'.format(n))

        # setup stuff
        time.sleep(1)
        # do some things        
        sys.stderr.write('hello {0}!\n'.format(', '.join(things)))
        # tear down stuff
        time.sleep(1)

        sys.stderr.write('processed {0} thing(s) in this iteration\n'.format(n))
        [q.task_done() for task in xrange(n)]

    q = Queue.Queue()
    n_worker_threads = 1
    for i in xrange(n_worker_threads):
      t = threading.Thread(target=worker)
      t.daemon = True
      t.start()

    self.queue = q


if __name__ == '__main__':
  print 'enter __main__'

  foo = Foo()

  foo.queue.put('world')
  foo.queue.put('cruel world')
  foo.queue.put('stack overflow')

  for i in xrange(20):
    foo.queue.put(str(i))
  import time
  time.sleep(1)
  foo.queue.put('a')
  foo.queue.put('b')
  time.sleep(1)
  foo.queue.put('1')
  foo.queue.put('2')
  foo.queue.put('3')
  print 'finish __main__'
share|improve this question

1 Answer

http://stackoverflow.com/questions/854906/is-this-python-producer-consumer-lockless-approach-thread-safe addresses your question with an implementation of a locking queue and discussion of lockless approaches and possible implementation pitfalls.

I recently wrote a program that used a simple producer/consumer pattern. It initially had a bug related to improper use of threading.Lock that I eventually fixed. But it made me think whether it's possible to implement producer/consumer pattern in a lockless manner.

share|improve this answer

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.