Public paste
Python multiprocess queue strangeness
By: Guest | Date: Apr 24 2017 20:09 | Format: Python | Expires: never | Size: 1.84 KB | Hits: 716

  1. # Expected behavior: Each consumer prints its own number, 100 times.
  2. # Observed behavior: Every consumer gets every number.
  3. # Python version 3.6 on Windows.
  4.  
  5.  
  6. from multiprocessing import Manager
  7. from threading import Thread
  8. from concurrent.futures import ProcessPoolExecutor
  9.  
  10.  
  11. class QueueSplitter(object):
  12.     def __init__(self, queues=[]):
  13.         self.queues = queues
  14.  
  15.     def append(self, q):
  16.         self.queues.append(q)
  17.  
  18.     def put(self, obj):
  19.         for q in self.queues:
  20.             q.put(obj)
  21.  
  22.     def close(self):
  23.         self.queues = []
  24.         self.done = True
  25.  
  26.  
  27. class IterQueueSplitter(QueueSplitter):
  28.     def __init__(self, it, sentinel=None, queues=[]):
  29.         self.it = it
  30.         self.sentinel = sentinel
  31.         self.queues = queues
  32.  
  33.     def send(self):
  34.         try:
  35.             self.put(next(self.it))
  36.         except StopIteration:
  37.             self.put(self.sentinel)
  38.             self.close()
  39.  
  40.  
  41. def serve(server):
  42.     while not hasattr(server, 'done'):
  43.         server.send()
  44.  
  45.  
  46. def consume(me, q):
  47.     for v in iter(q.get, None):
  48.         print('consumer %d: %d' % (me, v))
  49.  
  50.  
  51. def repeatabunch(n):
  52.     for i in range(100):
  53.         yield n
  54.  
  55.  
  56. if __name__ == '__main__':
  57.     with Manager() as man, ProcessPoolExecutor(4) as ex:
  58.         consumers = []
  59.         producers = []
  60.         servers = []
  61.         for i in range(8):
  62.             queue = man.Queue()
  63.             consumer = ex.submit(consume, i, queue)
  64.             consumers.append(consumer)
  65.             server = IterQueueSplitter(repeatabunch(i))
  66.             server.append(queue)
  67.             servers.append(server)
  68.             producers.append(Thread(target=serve, args=[server]))
  69.         for t in producers:
  70.             t.start()
  71.         for t in producers:
  72.             t.join()
  73.         for consumer in consumers:
  74.             consumer.result()