Show the progress of a Python multiprocessing pool imap_unordered call?
I have a script that's successfully doing a multiprocessing Pool set of tasks with a imap_unordered()
call:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
However, my num_tasks
is around 250,000, and so the join()
locks the main thread for 10 seconds or so, and I'd like to be able to echo out to the command line incrementally to show the main process isn't locked. Something like:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print("Waiting for", remaining, "tasks to complete...")
time.sleep(2)
Is there a method for the result object or the pool itself that indicates the number of tasks remaining? I tried using a multiprocessing.Value
object as a counter (do_work
calls a counter.value += 1
action after doing its task), but the counter only gets to ~85% of the total value before stopping incrementing.
Solution 1:
My personal favorite -- gives you a nice little progress bar and completion ETA while things run and commit in parallel.
from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass
Solution 2:
There is no need to access private attributes of the result set:
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
Solution 3:
I found that the work was already done by the time I tried to check it's progress. This is what worked for me using tqdm.
pip install tqdm
from multiprocessing import Pool
from tqdm import tqdm
tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))
def do_work(x):
# do something with x
pbar.update(1)
pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
This should work with all flavors of multiprocessing, whether they block or not.
Solution 4:
Found an answer myself with some more digging: Taking a look at the __dict__
of the imap_unordered
result object, I found it has a _index
attribute that increments with each task completion. So this works for logging, wrapped in the while
loop:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)
However, I did find that swapping the imap_unordered
for a map_async
resulted in much faster execution, though the result object is a bit different. Instead, the result object from map_async
has a _number_left
attribute, and a ready()
method:
p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)
Solution 5:
As suggested by Tim, you can use tqdm
and imap
to solve this issue. I've just stumbled upon this problem and tweaked the imap_unordered
solution, so that I can access the results of the mapping. Here's how it works:
from multiprocessing import Pool
import tqdm
pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
In case you don't care about the values returned from your jobs, you don't need to assign the list to any variable.