Public paste
Python multiprocess queue strangeness
By: Guest | Date: Apr 24 2017 19:41 | Format: Python | Expires: never | Size: 1.9 KB | Hits: 660

  1. # Expected behavior: Each consumer prints its own number, 100 times.
  2. # Observed behavior: Every consumer gets every number.
  3. # Python version 3.6 on Windows.
  4.  
  5.  
  6. from multiprocessing import Manager
  7. from threading import Thread
  8. from concurrent.futures import ProcessPoolExecutor
  9.  
  10.  
  11. class QueueSplitter(object):
  12.     def __init__(self, queues=[]):
  13.         self.queues = queues
  14.  
  15.     def append(self, q):
  16.         self.queues.append(q)
  17.  
  18.     def put(self, obj):
  19.         for q in self.queues:
  20.             q.put(obj)
  21.  
  22.     def close(self):
  23.         for q in self.queues:
  24.             q.close()
  25.         self.queues = []
  26.         self.done = True
  27.  
  28.  
  29. class IterQueueSplitter(QueueSplitter):
  30.     def __init__(self, it, sentinel=None, queues=[]):
  31.         self.it = it
  32.         self.sentinel = sentinel
  33.         self.queues = queues
  34.  
  35.     def send(self):
  36.         try:
  37.             self.put(next(self.it))
  38.         except StopIteration:
  39.             self.put(self.sentinel)
  40.             self.close()
  41.  
  42.  
  43. def serve(server):
  44.     while not hasattr(server, 'done'):
  45.         server.send()
  46.  
  47.  
  48. def consume(me, q):
  49.     for v in iter(q.get, None):
  50.         print('consumer %d: %d' % (me, v))
  51.  
  52.  
  53. def repeatabunch(n):
  54.     for i in range(100):
  55.         yield n
  56.  
  57.  
  58. if __name__ == '__main__':
  59.     with Manager() as man, ProcessPoolExecutor(4) as ex:
  60.         consumers = []
  61.         producers = []
  62.         servers = []
  63.         for i in range(8):
  64.             queue = man.Queue()
  65.             consumer = ex.submit(consume, i, queue)
  66.             consumers.append(consumer)
  67.             server = IterQueueSplitter(repeatabunch(i))
  68.             server.append(queue)
  69.             servers.append(server)
  70.             producers.append(Thread(target=serve, args=[server]))
  71.         for t in producers:
  72.             t.start()
  73.         for t in producers:
  74.             t.join()
  75.         for consumer in consumers:
  76.             consumer.result()