Python multiprocessing PicklingError: Can't pickle <type 'function'>
I am sorry that I can't reproduce the error with a simpler example, and my code is too complicated to post. If I run the program in IPython shell instead of the regular Python, things work out well.
I looked up some previous notes on this problem. They were all caused by using pool to call function defined within a class function. But this is not the case for me.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
I would appreciate any help.
Update: The function I pickle is defined at the top level of the module. Though it calls a function that contains a nested function. i.e, f()
calls g()
calls h()
which has a nested function i()
, and I am calling pool.apply_async(f)
. f()
, g()
, h()
are all defined at the top level. I tried simpler example with this pattern and it works though.
Here is a list of what can be pickled. In particular, functions are only picklable if they are defined at the top-level of a module.
This piece of code:
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
yields an error almost identical to the one you posted:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
The problem is that the pool
methods all use a mp.SimpleQueue
to pass tasks to the worker processes. Everything that goes through the mp.SimpleQueue
must be pickable, and foo.work
is not picklable since it is not defined at the top level of the module.
It can be fixed by defining a function at the top level, which calls foo.work()
:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
Notice that foo
is pickable, since Foo
is defined at the top level and foo.__dict__
is picklable.
I'd use pathos.multiprocesssing
, instead of multiprocessing
. pathos.multiprocessing
is a fork of multiprocessing
that uses dill
. dill
can serialize almost anything in python, so you are able to send a lot more around in parallel. The pathos
fork also has the ability to work directly with multiple argument functions, as you need for class methods.
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
... @staticmethod
... def work(self, x):
... return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
Get pathos
(and if you like, dill
) here:
https://github.com/uqfoundation
As others have said multiprocessing
can only transfer Python objects to worker processes which can be pickled. If you cannot reorganize your code as described by unutbu, you can use dill
s extended pickling/unpickling capabilities for transferring data (especially code data) as I show below.
This solution requires only the installation of dill
and no other libraries as pathos
:
import os
from multiprocessing import Pool
import dill
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
return fun(*args)
def apply_async(pool, fun, args):
payload = dill.dumps((fun, args))
return pool.apply_async(run_dill_encoded, (payload,))
if __name__ == "__main__":
pool = Pool(processes=5)
# asyn execution of lambda
jobs = []
for i in range(10):
job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
jobs.append(job)
for job in jobs:
print job.get()
print
# async execution of static method
class O(object):
@staticmethod
def calc():
return os.getpid()
jobs = []
for i in range(10):
job = apply_async(pool, O.calc, ())
jobs.append(job)
for job in jobs:
print job.get()