Show the progress of a Python multiprocessing pool imap_unordered call?

I have a script that's successfully doing a multiprocessing Pool set of tasks with a imap_unordered() call:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

However, my num_tasks is around 250,000, and so the join() locks the main thread for 10 seconds or so, and I'd like to be able to echo out to the command line incrementally to show the main process isn't locked. Something like:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

Is there a method for the result object or the pool itself that indicates the number of tasks remaining? I tried using a multiprocessing.Value object as a counter (do_work calls a counter.value += 1 action after doing its task), but the counter only gets to ~85% of the total value before stopping incrementing.


Solution 1:

My personal favorite -- gives you a nice little progress bar and completion ETA while things run and commit in parallel.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

Solution 2:

There is no need to access private attributes of the result set:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

Solution 3:

I found that the work was already done by the time I tried to check it's progress. This is what worked for me using tqdm.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

This should work with all flavors of multiprocessing, whether they block or not.

Solution 4:

Found an answer myself with some more digging: Taking a look at the __dict__ of the imap_unordered result object, I found it has a _index attribute that increments with each task completion. So this works for logging, wrapped in the while loop:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

However, I did find that swapping the imap_unordered for a map_async resulted in much faster execution, though the result object is a bit different. Instead, the result object from map_async has a _number_left attribute, and a ready() method:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

Solution 5:

As suggested by Tim, you can use tqdm and imap to solve this issue. I've just stumbled upon this problem and tweaked the imap_unordered solution, so that I can access the results of the mapping. Here's how it works:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

In case you don't care about the values returned from your jobs, you don't need to assign the list to any variable.