How do I pass large numpy arrays between python subprocesses without saving to disk?
Solution 1:
While googling around for more information about the code Joe Kington posted, I found the numpy-sharedmem package. Judging from this numpy/multiprocessing tutorial it seems to share the same intellectual heritage (maybe largely the same authors? -- I'm not sure).
Using the sharedmem module, you can create a shared-memory numpy array (awesome!), and use it with multiprocessing like this:
import sharedmem as shm
import numpy as np
import multiprocessing as mp
def worker(q,arr):
done = False
while not done:
cmd = q.get()
if cmd == 'done':
done = True
elif cmd == 'data':
##Fake data. In real life, get data from hardware.
rnd=np.random.randint(100)
print('rnd={0}'.format(rnd))
arr[:]=rnd
q.task_done()
if __name__=='__main__':
N=10
arr=shm.zeros(N,dtype=np.uint8)
q=mp.JoinableQueue()
proc = mp.Process(target=worker, args=[q,arr])
proc.daemon=True
proc.start()
for i in range(3):
q.put('data')
# Wait for the computation to finish
q.join()
print arr.shape
print(arr)
q.put('done')
proc.join()
Running yields
rnd=53
(10,)
[53 53 53 53 53 53 53 53 53 53]
rnd=15
(10,)
[15 15 15 15 15 15 15 15 15 15]
rnd=87
(10,)
[87 87 87 87 87 87 87 87 87 87]
Solution 2:
Basically, you just want to share a block of memory between processes and view it as a numpy array, right?
In that case, have a look at this (Posted to numpy-discussion by Nadav Horesh awhile back, not my work). There are a couple of similar implementations (some more flexible), but they all essentially use this principle.
# "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing"
# Modified and corrected by Nadav Horesh, Mar 2010
# No rights reserved
import numpy as N
import ctypes
import multiprocessing as MP
_ctypes_to_numpy = {
ctypes.c_char : N.dtype(N.uint8),
ctypes.c_wchar : N.dtype(N.int16),
ctypes.c_byte : N.dtype(N.int8),
ctypes.c_ubyte : N.dtype(N.uint8),
ctypes.c_short : N.dtype(N.int16),
ctypes.c_ushort : N.dtype(N.uint16),
ctypes.c_int : N.dtype(N.int32),
ctypes.c_uint : N.dtype(N.uint32),
ctypes.c_long : N.dtype(N.int64),
ctypes.c_ulong : N.dtype(N.uint64),
ctypes.c_float : N.dtype(N.float32),
ctypes.c_double : N.dtype(N.float64)}
_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))
def shmem_as_ndarray(raw_array, shape=None ):
address = raw_array._obj._wrapper.get_address()
size = len(raw_array)
if (shape is None) or (N.asarray(shape).prod() != size):
shape = (size,)
elif type(shape) is int:
shape = (shape,)
else:
shape = tuple(shape)
dtype = _ctypes_to_numpy[raw_array._obj._type_]
class Dummy(object): pass
d = Dummy()
d.__array_interface__ = {
'data' : (address, False),
'typestr' : dtype.str,
'descr' : dtype.descr,
'shape' : shape,
'strides' : None,
'version' : 3}
return N.asarray(d)
def empty_shared_array(shape, dtype, lock=True):
'''
Generate an empty MP shared array given ndarray parameters
'''
if type(shape) is not int:
shape = N.asarray(shape).prod()
try:
c_type = _numpy_to_ctypes[dtype]
except KeyError:
c_type = _numpy_to_ctypes[N.dtype(dtype)]
return MP.Array(c_type, shape, lock=lock)
def emptylike_shared_array(ndarray, lock=True):
'Generate a empty shared array with size and dtype of a given array'
return empty_shared_array(ndarray.size, ndarray.dtype, lock)
Solution 3:
From the other answers, it seems that numpy-sharedmem is the way to go.
However, if you need a pure python solution, or installing extensions, cython or the like is a (big) hassle, you might want to use the following code which is a simplified version of Nadav's code:
import numpy, ctypes, multiprocessing
_ctypes_to_numpy = {
ctypes.c_char : numpy.dtype(numpy.uint8),
ctypes.c_wchar : numpy.dtype(numpy.int16),
ctypes.c_byte : numpy.dtype(numpy.int8),
ctypes.c_ubyte : numpy.dtype(numpy.uint8),
ctypes.c_short : numpy.dtype(numpy.int16),
ctypes.c_ushort : numpy.dtype(numpy.uint16),
ctypes.c_int : numpy.dtype(numpy.int32),
ctypes.c_uint : numpy.dtype(numpy.uint32),
ctypes.c_long : numpy.dtype(numpy.int64),
ctypes.c_ulong : numpy.dtype(numpy.uint64),
ctypes.c_float : numpy.dtype(numpy.float32),
ctypes.c_double : numpy.dtype(numpy.float64)}
_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(),
_ctypes_to_numpy.keys()))
def shm_as_ndarray(mp_array, shape = None):
'''Given a multiprocessing.Array, returns an ndarray pointing to
the same data.'''
# support SynchronizedArray:
if not hasattr(mp_array, '_type_'):
mp_array = mp_array.get_obj()
dtype = _ctypes_to_numpy[mp_array._type_]
result = numpy.frombuffer(mp_array, dtype)
if shape is not None:
result = result.reshape(shape)
return numpy.asarray(result)
def ndarray_to_shm(array, lock = False):
'''Generate an 1D multiprocessing.Array containing the data from
the passed ndarray. The data will be *copied* into shared
memory.'''
array1d = array.ravel(order = 'A')
try:
c_type = _numpy_to_ctypes[array1d.dtype]
except KeyError:
c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)]
result = multiprocessing.Array(c_type, array1d.size, lock = lock)
shm_as_ndarray(result)[:] = array1d
return result
You would use it like this:
- Use
sa = ndarray_to_shm(a)
to convert the ndarraya
into a shared multiprocessing.Array. - Use
multiprocessing.Process(target = somefunc, args = (sa, )
(andstart
, maybejoin
) to callsomefunc
in a separate process, passing the shared array. - In
somefunc
, usea = shm_as_ndarray(sa)
to get an ndarray pointing to the shared data. (Actually, you may want to do the same in the original process, immediately after creatingsa
, in order to have two ndarrays referencing the same data.)
AFAICS, you don't need to set lock to True, since shm_as_ndarray
will not use the locking anyhow. If you need locking, you would set lock to True and call acquire/release on sa
.
Also, if your array is not 1-dimensional, you might want to transfer the shape along with sa (e.g. use args = (sa, a.shape)
).
This solution has the advantage that it does not need additional packages or extension modules, except multiprocessing (which is in the standard library).
Solution 4:
Use threads. But I guess you are going to get problems with the GIL.
Instead: Choose your poison.
I know from the MPI implementations I work with, that they use shared memory for on-node-communications. You will have to code your own synchronization in that case.
2 GB/s sounds like you will get problems with most "easy" methods, depending on your real-time constraints and available main memory.
Solution 5:
One possibility to consider is to use a RAM drive for the temporary storage of files to be shared between processes. A RAM drive is where a portion of RAM is treated as a logical hard drive, to which files can be written/read as you would with a regular drive, but at RAM read/write speeds.
This article describes using the ImDisk software (for MS Win) to create such disk and obtains file read/write speeds of 6-10 Gigabytes/second: https://www.tekrevue.com/tip/create-10-gbs-ram-disk-windows/
An example in Ubuntu: https://askubuntu.com/questions/152868/how-do-i-make-a-ram-disk#152871
Another noted benefit is that files with arbitrary formats can be passed around with such method: e.g. Picke, JSON, XML, CSV, HDF5, etc...
Keep in mind that anything stored on the RAM disk is wiped on reboot.