Concurrent.futures vs Multiprocessing in Python 3
I wouldn't call concurrent.futures
more "advanced" - it's a simpler interface that works very much the same regardless of whether you use multiple threads or multiple processes as the underlying parallelization gimmick.
So, like virtually all instances of "simpler interface", much the same trade-offs are involved: it has a shallower learning curve, in large part just because there's so much less available to be learned; but, because it offers fewer options, it may eventually frustrate you in ways the richer interfaces won't.
So far as CPU-bound tasks go, that's way too under-specified to say much meaningful. For CPU-bound tasks under CPython, you need multiple processes rather than multiple threads to have any chance of getting a speedup. But how much (if any) of a speedup you get depends on the details of your hardware, your OS, and especially on how much inter-process communication your specific tasks require. Under the covers, all inter-process parallelization gimmicks rely on the same OS primitives - the high-level API you use to get at those isn't a primary factor in bottom-line speed.
Edit: example
Here's the final code shown in the article you referenced, but I'm adding an import statement needed to make it work:
from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
# Let the executor divide the work among processes by using 'map'.
with ProcessPoolExecutor(max_workers=nprocs) as executor:
return {num:factors for num, factors in
zip(nums,
executor.map(factorize_naive, nums))}
Here's exactly the same thing using multiprocessing
instead:
import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
with mp.Pool(nprocs) as pool:
return {num:factors for num, factors in
zip(nums,
pool.map(factorize_naive, nums))}
Note that the ability to use multiprocessing.Pool
objects as context managers was added in Python 3.3.
As for which one is easier to work with, they're essentially identical.
One difference is that Pool
supports so many different ways of doing things that you may not realize how easy it can be until you've climbed quite a way up the learning curve.
Again, all those different ways are both a strength and a weakness. They're a strength because the flexibility may be required in some situations. They're a weakness because of "preferably only one obvious way to do it". A project sticking exclusively (if possible) to concurrent.futures
will probably be easier to maintain over the long run, due to the lack of gratuitous novelty in how its minimal API can be used.
Probably for most of the time when you need parallel processing, you will find that either the ProcessPoolExecutor
class from the concurrent.futures
module or the Pool
class from the multiprocessing
module will provide equivalent facilities and it boils down to a matter of personal preference. But each does offer some facilities that make certain processing more convenient. I thought I would just point out a couple:
When submitting a batch of tasks, you sometimes want to be get the task results (i.e. return values) as soon as they become available. Both facilities provide for notification that a result from a submitted task is available via callback mechanisms:
Using multiprocessing.Pool
:
import multiprocessing as mp
def worker_process(i):
return i * i # square the argument
def process_result(return_value):
print(return_value)
def main():
pool = mp.Pool()
for i in range(10):
pool.apply_async(worker_process, args=(i,), callback=process_result)
pool.close()
pool.join()
if __name__ == '__main__':
main()
The same can be done, albeit awkwardly, using a callback with concurrent.futures
:
import concurrent.futures
def worker_process(i):
return i * i # square the argument
def process_result(future):
print(future.result())
def main():
executor = concurrent.futures.ProcessPoolExecutor()
futures = [executor.submit(worker_process, i) for i in range(10)]
for future in futures:
future.add_done_callback(process_result)
executor.shutdown()
if __name__ == '__main__':
main()
Here each task is individually submitted for which a Future
instance is returned. Then the callback must be added to the Future
. Finally, when the callback is invoked, the argument passed is the Future
instance for the task that has been completed and method result
must be called to get the actual return value. But with the concurrent.futures
module, there is actually no need to use a callback at all. You can use the as_completed
method:
import concurrent.futures
def worker_process(i):
return i * i # square the argument
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(worker_process, i) for i in range(10)]
for future in concurrent.futures.as_completed(futures):
print(future.result())
if __name__ == '__main__':
main()
And it is easy to tie the return value back to the original passed argument to worker_process
by using a dictionary to hold the Future
instances:
import concurrent.futures
def worker_process(i):
return i * i # square the argument
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(worker_process, i): i for i in range(10)}
for future in concurrent.futures.as_completed(futures):
i = futures[future] # retrieve the value that was squared
print(i, future.result())
if __name__ == '__main__':
main()
multiprocessing.Pool
has methods imap
and imap_unordered
, the latter which allows task results to be returned in arbitrary order, but not necessarily in completion order. These methods are considered to be a lazier version of map
. With method map
, if the passed iterable argument does not have a __len__
attribute, it will first be converted to a list
and its length will be used to compute an effective chunksize
value if None
was supplied as the chunksize argument. Therefore, you cannot achieve any storage optimizations by using a generator or generator expression as the iterable. But with methods imap
and imap_unordered
, the iterable can be a generator or generator expression; it will be iterated as necessary to produce new tasks for submission. But this necessitates that the default chunksize parameter be 1 since the length of the iterable in general cannot be known. But that doesn't stop you from providing a reasonable value using the same algorithm that the multiprocessing.Pool
class uses if you have a good approximation to the length of the iterable (or the exact size as in the example below):
import multiprocessing as mp
def worker_process(i):
return i * i # square the argument
def compute_chunksize(pool_size, iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, pool_size * 4)
if extra:
chunksize += 1
return chunksize
def main():
cpu_count = mp.cpu_count()
N = 100
chunksize = compute_chunksize(cpu_count, N)
with mp.Pool() as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
print(result)
if __name__ == '__main__':
main()
But with imap_unordered
there is no way to easily tie a result with a submitted job unless the worker process returned the original call arguments along with the return value. On the other hand the ability to specify a chunksize
with imap_unordered
and imap
, for which the results will be in a predictable order, should make these methods more efficient than invoking the apply_async
method repeatedly, which is essentially equivalent to using a chunksize of 1. But if you do need to process results in completion order, then to be sure you should use method apply_async
with a callback function. It does, however, appear based on experimentation that if you use a chunksize value of 1 with imap_unordered
, the results will be returned in completion order.
The map
method of the ProcessPoolExecutor
class from the concurrent.futures
package is similar in one regard to the Pool.imap
method from the multiprocessing
package. This method will not convert its passed iterable arguments that are generator expressions to lists in order to compute effective chunksize values and that is why the chunksize argument defaults to 1 and why, if you are passing large iterables, you should consider specifying an appropriate chunksize value. However, unlike with Pool.imap
, it appears from my expereince that you cannot begin to iterate results until all the iterables being passed to map
have been iterated.
The multiprocessing.Pool
class has a method apply
that submits a task to the pool and blocks until the result is ready. The return value is just the return value from the worker function passed to the apply
function. For example:
import multiprocessing as mp
def worker_process(i):
return i * i # square the argument
def main():
with mp.Pool() as pool:
print(pool.apply(worker_process, args=(6,)))
print(pool.apply(worker_process, args=(4,)))
if __name__ == '__main__':
main()
The concurrent.futures.ProcessPoolExecutor
class has no such equivalent. You have to issue a submit
and then a call to result
against the returned Future
instance. It's not a hardship to have to do this, but the Pool.apply
method is more convenient for the use case where a blocking task submission is appropriate. Such a case is when you have processing that calls for threading because most of the work being done in the threads is heavily I/O except for perhaps one function that is very CPU bound. The main program that creates the threads first creates a multiprocessing.Pool
instance and passes it as an argument to all the threads. When the threads need to call the heavily CPU-bound function, it now runs the function using the Pool.apply
method thereby running the code in another process and freeing the current process to allow the other threads to run.
A big deal has been made of the concurrent.futures
module having two classes, ProcessPoolExecutor
and ThreadPoolExecutor
with identical interfaces. That is a nice feature. But the multiprocessing
module also has an undocumented ThreadPool
class with an identical interface as Pool
:
>>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>
You can submit tasks with either ProcessPoolExecutor.submit
, which returns a Future
instance, or Pool.apply_async
, which returns an AsyncResult
instance, and specify a timeout value for retrieving the result:
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep
def worker_1():
while True:
print('hanging')
sleep(1)
def main():
with ProcessPoolExecutor(1) as pool:
future = pool.submit(worker_1)
try:
future.result(3) # kill task after 3 seconds?
except TimeoutError:
print('timeout')
if __name__ == '__main__':
main()
print("return from main()")
Prints:
hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.
The main process when calling future.result(3)
will get a TimeoutError
exception after 3 seconds because the submitted task has not completed within that time period. But the task is continuing to run, tying up the process and the with ProcessPoolExecutor(1) as pool:
block never exits and thus the program does not terminate.
from multiprocessing import Pool, TimeoutError
from time import sleep
def worker_1():
while True:
print('hanging')
sleep(1)
def main():
with Pool(1) as pool:
result = pool.apply_async(worker_1, args=())
try:
result.get(3) # kill task after 3 seconds?
except TimeoutError:
print('timeout')
if __name__ == '__main__':
main()
print("return from main()")
Prints:
hanging
hanging
hanging
timeout
return from main()
This time, however, even though the timed-out task is still continuing to run and is tying up the process, the with
block is not prevented from exiting and thus the program terminates normally. The reason for this is that the context manager for the Pool
instance will execute a call to terminate
when the block exits and this results in the immediate termination of all processes in the pool. This is contrasted with the context handler for the ProcessPoolExecutor
instance, which executes a call to shutdown(wait=True)
to await the termination of all processes in the pool when the block it governs exits. The advantage would seem to go to multiprocessing.Pool
if you are using context handlers to handle pool termination and the possibility of a timeout exists.
But since the context handler for multiprocessing.Pool
only calls terminate
and not close
followed by join
, you must then ensure that all the jobs you have submitted have completed before exiting the with
block, for example by submitting jobs with a blocking, synchronous call such as map
or calling get
on the AsyncResult
object returned by a call to apply_async
or iterating the results of the call to imap
or by calling close
followed by join
on the pool instance.
Although there is no way to exit until timed-out tasks complete when using the ProcessPoolExecutor
, you can cancel the starting of submitted tasks that are not already running. In the following demo we have a pool of size 1 so that jobs can only run consecutively. We submit 3 jobs one after another where the first two jobs take 3 seconds to run because of calls to time.sleep(3)
. We immediately try to cancel the first two jobs. The first attempt of canceling fails because the first job is already running. But because the pool only has one process, the second job must wait 3 seconds for the the first job to complete before it can start running and therefore the cancel succeeds. Finally, job 3 will begin and end almost immediately after job 1 completes, which will be approximately 3 seconds after we started the job submissions:
from concurrent.futures import ProcessPoolExecutor
import time
def worker1(i):
time.sleep(3)
print('Done', i)
def worker2():
print('Hello')
def main():
with ProcessPoolExecutor(max_workers=1) as executor:
t = time.time()
future1 = executor.submit(worker1, 1)
future2 = executor.submit(worker1, 2)
future3 = executor.submit(worker2)
# this will fail since this task is already running:
print(future1.cancel())
# this will succeed since this task hasn't started (it's waiting for future1 to complete):
print(future2.cancel())
future3.result() # wait for completion
print(time.time() - t)
if __name__ == '__main__':
main()
Prints:
False
True
Done 1
Hello
3.1249606609344482
In addition to other answers' detailed list of differences, I've personally run into a unfixed (as-of 2020-10-27) indefinite hang that can happen with multiprocess.Pool when one of the workers crashes in certain ways. (In my case, an exception from a cython extension, though others say this can happen when a worker gets a SIGTERM, etc.) According to the documentation for ProcessPoolExecutor, it has been robust to this since python 3.3.
In my experience, I faced a lot of issues with the multiprocessing module as compared to concurrent.futures.(But this was on Windows os)
Two of main differences i could see were:
- Frequent Hangs in the multiprocessing module
- Concurrent.futures has got a relatively simpler way of execution. Meaning fetching the results, tracking of child processes etc.is very simple.
Example: (Fetching the result)
with concurrent.futures.ProcessPoolExecutor() as executor:
f1 = executor.submit(some_function, parameter_to_be_passed)
print(f1.result())
So if you returning any value from some_function()
you can directly catch/store it using f1.result()
. The very same thing will need additional steps in the "multiprocessing" module.
If you are running on Linux systems then the hangs might not occur but the execution complexity is still more in the "multiprocessing" module.
Also having said this, it is also important to note my tasks were highly CPU intensive tasks.
On a personal note, I would recommend concurrent.futures.
I love concurrent.futures
, mainly because the iterator of multiple function parameters: multiprocessing
is somehow hacky when it comes to obtain multiple arguments to a function (there is no istarmap()
-equivalent of starmap()
):
import multiprocessing as mp
def power_plus_one(x, y):
return (x**y) + 1
def wrapper(t):
return power_plus_one(*t)
with mp.Pool() as pool:
r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))
print(r)
I find imap()
/imap_unordered()
super helpful for progress bars like tqdm
or time estimations for larger computation. In concurrents.futures
, this is super handy:
def power_plus_one(x, y):
return (x**y) + 1
o = dict() # dict for output
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]}
for future in concurrent.futures.as_completed(futures):
i = futures[future]
o[i] = future.result()
print(o)
I also love the handy result mapping as a dict. :)
With tqdm you can easily:
for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
...