Combine Pool.map with shared memory Array in Python multiprocessing
I have a very large (read only) array of data that I want to be processed by multiple processes in parallel.
I like the Pool.map
function and would like to use it to calculate functions on that data in parallel.
I saw that one can use the Value
or Array
class to use shared memory data between processes. But when I try to use this I get a RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance
when using the Pool.map function:
Here is a simplified example of what I am trying to do:
from sys import stdin
from multiprocessing import Pool, Array
def count_it( arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
return count
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
# this works
print count_it( toShare, "a" )
pool = Pool()
# RuntimeError here
print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
Can anyone tell me what I am doing wrong here?
So what I would like to do is pass info about a newly created shared memory allocated array to the processes after they have been created in the process pool.
Trying again as I just saw the bounty ;)
Basically I think the error message means what it said - multiprocessing shared memory Arrays can't be passed as arguments (by pickling). It doesn't make sense to serialise the data - the point is the data is shared memory. So you have to make the shared array global. I think it's neater to put it as the attribute of a module, as in my first answer, but just leaving it as a global variable in your example also works well. Taking on board your point of not wanting to set the data before the fork, here is a modified example. If you wanted to have more than one possible shared array (and that's why you wanted to pass toShare as an argument) you could similarly make a global list of shared arrays, and just pass the index to count_it (which would become for c in toShare[i]:
).
from sys import stdin
from multiprocessing import Pool, Array, Process
def count_it( key ):
count = 0
for c in toShare:
if c == key:
count += 1
return count
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool()
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
[EDIT: The above doesn't work on windows because of not using fork. However, the below does work on Windows, still using Pool, so I think this is the closest to what you want:
from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule
def count_it( key ):
count = 0
for c in mymodule.toShare:
if c == key:
count += 1
return count
def initProcess(share):
mymodule.toShare = share
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool(initializer=initProcess,initargs=(toShare,))
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
Not sure why map won't Pickle the array but Process and Pool will - I think perhaps it has be transferred at the point of the subprocess initialization on windows. Note that the data is still set after the fork though.
If the data is read only just make it a variable in a module before the fork from Pool. Then all the child processes should be able to access it, and it won't be copied provided you don't write to it.
import myglobals # anything (empty .py file)
myglobals.data = []
def count_it( key ):
count = 0
for c in myglobals.data:
if c == key:
count += 1
return count
if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )
If you do want to try to use Array though you could try with the lock=False
keyword argument (it is true by default).
The problem I see is that Pool doesn't support pickling shared data through its argument list. That's what the error message means by "objects should only be shared between processes through inheritance". The shared data needs to be inherited, i.e., global if you want to share it using the Pool class.
If you need to pass them explicitly, you may have to use multiprocessing.Process. Here is your reworked example:
from multiprocessing import Process, Array, Queue
def count_it( q, arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
q.put((key, count))
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
q = Queue()
keys = ['a', 'b', 's', 'd']
workers = [Process(target=count_it, args = (q, toShare, key))
for key in keys]
for p in workers:
p.start()
for p in workers:
p.join()
while not q.empty():
print q.get(),
Output: ('s', 9) ('a', 2) ('b', 3) ('d', 12)
The ordering of elements of the queue may vary.
To make this more generic and similar to Pool, you could create a fixed N number of Processes, split the list of keys into N pieces, and then use a wrapper function as the Process target, which will call count_it for each key in the list it is passed, like:
def wrapper( q, arr, keys ):
for k in keys:
count_it(q, arr, k)
If you're seeing:
RuntimeError: Synchronized objects should only be shared between processes through inheritance
Consider using multiprocessing.Manager
as it doesn't have this limitation. The manager works considering it presumably runs in a separate process altogether.
import ctypes
import multiprocessing
# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock() # pylint: disable=no-member
with counter_lock:
counter.value = count = counter.value + 1