Sharing a result queue among several processes
The documentation for the multiprocessing
module shows how to pass a queue to a process started with multiprocessing.Process
. But how can I share a queue with asynchronous worker processes started with apply_async
? I don't need dynamic joining or anything else, just a way for the workers to (repeatedly) report their results back to base.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
This fails with:
RuntimeError: Queue objects should only be shared between processes through inheritance
.
I understand what this means, and I understand the advice to inherit rather than require pickling/unpickling (and all the special Windows restrictions). But how do I pass the queue in a way that works? I can't find an example, and I've tried several alternatives that failed in various ways. Help please?
Solution 1:
Try using multiprocessing.Manager to manage your queue and to also make it accessible to different workers.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
workers = pool.apply_async(worker, (33, q))
Solution 2:
multiprocessing.Pool
already has a shared result-queue, there is no need to additionally involve a Manager.Queue
. Manager.Queue
is a queue.Queue
(multithreading-queue) under the hood, located on a separate server-process and exposed via proxies. This adds additional overhead compared to Pool's internal queue. Contrary to relying on Pool's native result-handling, the results in the Manager.Queue
also are not guaranteed to be ordered.
The worker processes are not started with .apply_async()
, this already happens when you instantiate Pool
. What is started
when you call pool.apply_async()
is a new "job". Pool's worker-processes run the multiprocessing.pool.worker
-function under the hood. This function takes care of processing new "tasks" transferred over Pool's internal Pool._inqueue
and of sending results back to the parent over the Pool._outqueue
. Your specified func
will be executed within multiprocessing.pool.worker
. func
only has to return
something and the result will be automatically send back to the parent.
.apply_async()
immediately (asynchronously) returns a AsyncResult
object (alias for ApplyResult
). You need to call .get()
(is blocking) on that object to receive the actual result. Another option would be to register a callback function, which gets fired as soon as the result becomes ready.
from multiprocessing import Pool
def busy_foo(i):
"""Dummy function simulating cpu-bound work."""
for _ in range(int(10e6)): # do stuff
pass
return i
if __name__ == '__main__':
with Pool(4) as pool:
print(pool._outqueue) # DEMO
results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
# `.apply_async()` immediately returns AsyncResult (ApplyResult) object
print(results[0]) # DEMO
results = [res.get() for res in results]
print(f'result: {results}')
Example Output:
<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Note: Specifying the timeout
-parameter for .get()
will not stop the actual processing of the task within the worker, it only unblocks the waiting parent by raising a multiprocessing.TimeoutError
.