""" This is a simple example of the Producer/Consumer problem in Python. It makes use of generators. Contains code from: """ from collections import deque from Queue import Queue import time class Task(object): def __init__(self): self.generator = self.run() def run(self): raise NotImplementedError class Producer(Task): def __init__(self, buffer, name, limit=10): self.buffer = buffer self.name = name self.limit = limit Task.__init__(self) def run(self): for i in range(self.limit): data = "'%s-%s'" % (self.name, i) self.buffer.put(data) time.sleep(1) print "%s has put %s in the buffer." % (self.name, data) # this returns the control to the caller after producing a value yield True class Consumer(Task): def __init__(self, buffer, name): self.buffer = buffer self.name = name Task.__init__(self) def run(self): """Here we simulate two 'resume' points in the function: First we take the value and hand over the control to the scheduler, then we process the value we got. We can do so because this function is a coroutine and it stores its local variables across the calls.""" while self.buffer.qsize(): data = self.buffer.get() time.sleep(0.5) print "%s has got %s from the buffer." % (self.name, data) yield True time.sleep(0.5) print "%s is doing something with %s" % (self.name, data) yield True class Scheduler(object): """Generators and coroutines are not preemptively scheduled just like threads so we need an home made scheduler to let the flow continue.""" def __init__(self): self.tasks = deque() def add_task(self, task): self.tasks.append(task) def main(self): tasks = self.tasks while True: try: tasks[0].generator.next() tasks.rotate(-1) except StopIteration: del tasks[0] except IndexError: break def main(): sched = Scheduler() buffer = Queue() prod = Producer(buffer, "Producer") cons = Consumer(buffer, "Consumer") sched.add_task(prod) sched.add_task(cons) sched.main() if __name__ == '__main__': main()