Distributing jobs evenly across multiple GPUs with `multiprocessing.Pool`
Let's say that I have the following:
- A system with 4 GPUs.
- A function,
foo
, which may be run up to 2 times simultaneously on each GPU. - A list of
files
that need to be processed usingfoo
in any order. However, each file takes an unpredictable amount of time to be processed.
I would like to process all the files, keeping all the GPUs as busy as possible by ensuring there are always 8 instances of foo
running at any given time (2 instance on each GPU) until less than 8 files remain.
The actual details of invoking the GPU are not my issue. What I'm trying to figure out is how to write the parallelization so that I can keep 8 instances of foo
running but somehow making sure that exactly 2 of each GPU ID are used at all times.
I've come up with one way to solve this problem using multiprocessing.Pool
, but the solution is quite brittle and relies on (AFAIK) undocumented features. It relies on the fact that the processes within the Pool
are named in the format FormPoolWorker-%d
where %d
is a number between one and the number of processes in the pool. I take this value and mod it with the number of GPUs and that gives me a valid GPU id. However, it would be much nicer if I could somehow give the GPU id directly to each process, perhaps on initialization, instead of relying on the string format of the process names.
One thing I considered is that if the initializer
and initargs
parameters of Pool.__init__
allowed for a list of initargs
so that each process could be initialized with a different set of arguments then the problem would be moot. Unfortunately that doesn't appear to work.
Can anybody recommend a more robust or pythonic solution to this problem?
Hacky solution (Python 3.7):
from multiprocessing import Pool, current_process
def foo(filename):
# Hacky way to get a GPU id using process name (format "ForkPoolWorker-%d")
gpu_id = (int(current_process().name.split('-')[-1]) - 1) % 4
# run processing on GPU <gpu_id>
ident = current_process().ident
print('{}: starting process on GPU {}'.format(ident, gpu_id))
# ... process filename
print('{}: finished'.format(ident))
pool = Pool(processes=4*2)
files = ['file{}.xyz'.format(x) for x in range(1000)]
for _ in pool.imap_unordered(foo, files):
pass
pool.close()
pool.join()
I figured it out. It's actually quite simple. All we need to do is use a multiprocessing.Queue
to manage the available GPU IDs. Start by initializing the Queue
to contain 2 of each GPU ID, then get
the GPU ID from the queue
at the beginning of foo
and put
it back at the end.
from multiprocessing import Pool, current_process, Queue
NUM_GPUS = 4
PROC_PER_GPU = 2
queue = Queue()
def foo(filename):
gpu_id = queue.get()
try:
# run processing on GPU <gpu_id>
ident = current_process().ident
print('{}: starting process on GPU {}'.format(ident, gpu_id))
# ... process filename
print('{}: finished'.format(ident))
finally:
queue.put(gpu_id)
# initialize the queue with the GPU ids
for gpu_ids in range(NUM_GPUS):
for _ in range(PROC_PER_GPU):
queue.put(gpu_ids)
pool = Pool(processes=PROC_PER_GPU * NUM_GPUS)
files = ['file{}.xyz'.format(x) for x in range(1000)]
for _ in pool.imap_unordered(foo, files):
pass
pool.close()
pool.join()
A while ago I have created a package gpuMultiprocessing which handles running a queue of processes on one or more GPUs. More precisely, it only handles the choice of a GPU by exploiting the functionality of multiprocessing package. It maps CPUid to GPUid and then it makes the GPUid available to the subprocess as an environment variable. So you can do something like CUDA_AVAILABLE_DEVICES=GPUid
inside the script. It also allows you to run multiple processes per GPU and keeps track of failed processes, in case they are not able to run, or in case they failed due to e.g. VRAM error. Probably not exactly what OP asks for but I though might be useful to somebody who comes here from google looking for GPU multiprocessing.
# Example of running queue of 4 commands in parallel on 2 GPUs
# The number of CPUs must be equal or larger than the numbrer of GPUs!
import gpuMultiprocessing
gpu_id_list = [0,7] # Let's use these two GPUs
command_queue = ['BATCHSIZE=16 python example-script.py',
'BATCHSIZE=32 python example-script.py',
'BATCHSIZE=64 python example-script.py',
'BATCHSIZ=80 python example-script.py'] # Typo on purpose
gpuMultiprocessing.queue_runner(command_queue, gpu_id_list,
env_gpu_name='CUDA_VISIBLE_DEVICES',
processes_per_gpu=2, allowed_restarts=1)
This code snippet will run the commands from command_queue on GPUs 0 and 7 allowing for 2 processes per GPU at once. If some of the commands fail, it will try to restart them once any GPU is free. If it fails again, it will return a list containing the failed commands.