Using multiprocessing.Process with a maximum number of simultaneous processes
Solution 1:
It might be most sensible to use multiprocessing.Pool
which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.
The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
And it's also handy to know that there is the multiprocessing.cpu_count()
method to count the number of cores on a given system, if needed in your code.
Edit: Here's some draft code that seems to work for your specific case:
import multiprocessing
def f(name):
print 'hello', name
if __name__ == '__main__':
pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
for i in xrange(0, 512):
pool.apply_async(f, args=(i,))
pool.close()
pool.join()
Solution 2:
I think Semaphore is what you are looking for, it will block the main process after counting down to 0. Sample code:
from multiprocessing import Process
from multiprocessing import Semaphore
import time
def f(name, sema):
print('process {} starting doing business'.format(name))
# simulate a time-consuming task by sleeping
time.sleep(5)
# `release` will add 1 to `sema`, allowing other
# processes blocked on it to continue
sema.release()
if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
# once 20 processes are running, the following `acquire` call
# will block the main process since `sema` has been reduced
# to 0. This loop will continue only after one or more
# previously created processes complete.
sema.acquire()
p = Process(target=f, args=(i, sema))
all_processes.append(p)
p.start()
# inside main process, wait for all processes to finish
for p in all_processes:
p.join()
The following code is more structured since it acquires and releases sema
in the same function. However, it will consume too much resources if total_task_num
is very large:
from multiprocessing import Process
from multiprocessing import Semaphore
import time
def f(name, sema):
print('process {} starting doing business'.format(name))
# `sema` is acquired and released in the same
# block of code here, making code more readable,
# but may lead to problem.
sema.acquire()
time.sleep(5)
sema.release()
if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
p = Process(target=f, args=(i, sema))
all_processes.append(p)
# the following line won't block after 20 processes
# have been created and running, instead it will carry
# on until all 1000 processes are created.
p.start()
# inside main process, wait for all processes to finish
for p in all_processes:
p.join()
The above code will create total_task_num
processes but only concurrency
processes will be running while other processes are blocked, consuming precious system resources.
Solution 3:
more generally, this could also look like this:
import multiprocessing
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
numberOfThreads = 4
if __name__ == '__main__':
jobs = []
for i, param in enumerate(params):
p = multiprocessing.Process(target=f, args=(i,param))
jobs.append(p)
for i in chunks(jobs,numberOfThreads):
for j in i:
j.start()
for j in i:
j.join()
Of course, that way is quite cruel (since it waits for every process in a junk until it continues with the next chunk). Still it works well for approx equal run times of the function calls.