Python multiprocess queue strangeness
By: Guest | Date: Apr 24 2017 19:41 | Format: Python | Expires: never | Size: 1.9 KB | Hits: 719
- # Expected behavior: Each consumer prints its own number, 100 times.
- # Observed behavior: Every consumer gets every number.
- # Python version 3.6 on Windows.
- from multiprocessing import Manager
- from threading import Thread
- from concurrent.futures import ProcessPoolExecutor
- class QueueSplitter(object):
- def __init__(self, queues=[]):
- self.queues = queues
- def append(self, q):
- self.queues.append(q)
- def put(self, obj):
- for q in self.queues:
- q.put(obj)
- def close(self):
- for q in self.queues:
- q.close()
- self.queues = []
- self.done = True
- class IterQueueSplitter(QueueSplitter):
- def __init__(self, it, sentinel=None, queues=[]):
- self.it = it
- self.sentinel = sentinel
- self.queues = queues
- def send(self):
- try:
- self.put(next(self.it))
- except StopIteration:
- self.put(self.sentinel)
- self.close()
- def serve(server):
- while not hasattr(server, 'done'):
- server.send()
- def consume(me, q):
- for v in iter(q.get, None):
- print('consumer %d: %d' % (me, v))
- def repeatabunch(n):
- for i in range(100):
- yield n
- if __name__ == '__main__':
- with Manager() as man, ProcessPoolExecutor(4) as ex:
- consumers = []
- producers = []
- servers = []
- for i in range(8):
- queue = man.Queue()
- consumer = ex.submit(consume, i, queue)
- consumers.append(consumer)
- server = IterQueueSplitter(repeatabunch(i))
- server.append(queue)
- servers.append(server)
- producers.append(Thread(target=serve, args=[server]))
- for t in producers:
- t.start()
- for t in producers:
- t.join()
- for consumer in consumers:
- consumer.result()
Latest pastes
1 hours ago
11 hours ago
1 days ago
2 days ago
2 days ago