Multiprocessing: How to use Pool.map on a function defined in a class?
When I run something like:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
it works fine. However, putting this as a function of a class:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Gives me the following error:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
I've seen a post from Alex Martelli dealing with the same kind of problem, but it wasn't explicit enough.
Solution 1:
I could not use the codes posted so far because the codes using "multiprocessing.Pool" do not work with lambda expressions and the codes not using "multiprocessing.Pool" spawn as many processes as there are work items.
I adapted the code s.t. it spawns a predefined amount of workers and only iterates through the input list if there exists an idle worker. I also enabled the "daemon" mode for the workers s.t. ctrl-c works as expected.
import multiprocessing
def fun(f, q_in, q_out):
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
def parmap(f, X, nprocs=multiprocessing.cpu_count()):
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()
proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
for _ in range(nprocs)]
for p in proc:
p.daemon = True
p.start()
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[q_in.put((None, None)) for _ in range(nprocs)]
res = [q_out.get() for _ in range(len(sent))]
[p.join() for p in proc]
return [x for i, x in sorted(res)]
if __name__ == '__main__':
print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
Solution 2:
I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe, x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f, X):
pipe = [Pipe() for x in X]
proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p, c) in pipe]
if __name__ == '__main__':
print parmap(lambda x: x**x, range(1, 5))
Solution 3:
Multiprocessing and pickling is broken and limited unless you jump outside the standard library.
If you use a fork of multiprocessing
called pathos.multiprocesssing
, you can directly use classes and class methods in multiprocessing's map
functions. This is because dill
is used instead of pickle
or cPickle
, and dill
can serialize almost anything in python.
pathos.multiprocessing
also provides an asynchronous map function… and it can map
functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6])
)
See discussions: What can multiprocessing and dill do together?
and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
It even handles the code you wrote initially, without modification, and from the interpreter. Why do anything else that's more fragile and specific to a single case?
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
... def run(self):
... def f(x):
... return x*x
... p = Pool()
... return p.map(f, [1,2,3])
...
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]
Get the code here: https://github.com/uqfoundation/pathos
And, just to show off a little more of what it can do:
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]