How do I pass large numpy arrays between python subprocesses without saving to disk?

Solution 1:

While googling around for more information about the code Joe Kington posted, I found the numpy-sharedmem package. Judging from this numpy/multiprocessing tutorial it seems to share the same intellectual heritage (maybe largely the same authors? -- I'm not sure).

Using the sharedmem module, you can create a shared-memory numpy array (awesome!), and use it with multiprocessing like this:

import sharedmem as shm
import numpy as np
import multiprocessing as mp

def worker(q,arr):
    done = False
    while not done:
        cmd = q.get()
        if cmd == 'done':
            done = True
        elif cmd == 'data':
            ##Fake data. In real life, get data from hardware.
            rnd=np.random.randint(100)
            print('rnd={0}'.format(rnd))
            arr[:]=rnd
        q.task_done()

if __name__=='__main__':
    N=10
    arr=shm.zeros(N,dtype=np.uint8)
    q=mp.JoinableQueue()    
    proc = mp.Process(target=worker, args=[q,arr])
    proc.daemon=True
    proc.start()

    for i in range(3):
        q.put('data')
        # Wait for the computation to finish
        q.join()   
        print arr.shape
        print(arr)
    q.put('done')
    proc.join()

Running yields

rnd=53
(10,)
[53 53 53 53 53 53 53 53 53 53]
rnd=15
(10,)
[15 15 15 15 15 15 15 15 15 15]
rnd=87
(10,)
[87 87 87 87 87 87 87 87 87 87]

Solution 2:

Basically, you just want to share a block of memory between processes and view it as a numpy array, right?

In that case, have a look at this (Posted to numpy-discussion by Nadav Horesh awhile back, not my work). There are a couple of similar implementations (some more flexible), but they all essentially use this principle.

#    "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing"
# Modified and corrected by Nadav Horesh, Mar 2010
# No rights reserved


import numpy as N
import ctypes
import multiprocessing as MP

_ctypes_to_numpy = {
    ctypes.c_char   : N.dtype(N.uint8),
    ctypes.c_wchar  : N.dtype(N.int16),
    ctypes.c_byte   : N.dtype(N.int8),
    ctypes.c_ubyte  : N.dtype(N.uint8),
    ctypes.c_short  : N.dtype(N.int16),
    ctypes.c_ushort : N.dtype(N.uint16),
    ctypes.c_int    : N.dtype(N.int32),
    ctypes.c_uint   : N.dtype(N.uint32),
    ctypes.c_long   : N.dtype(N.int64),
    ctypes.c_ulong  : N.dtype(N.uint64),
    ctypes.c_float  : N.dtype(N.float32),
    ctypes.c_double : N.dtype(N.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))


def shmem_as_ndarray(raw_array, shape=None ):

    address = raw_array._obj._wrapper.get_address()
    size = len(raw_array)
    if (shape is None) or (N.asarray(shape).prod() != size):
        shape = (size,)
    elif type(shape) is int:
        shape = (shape,)
    else:
        shape = tuple(shape)

    dtype = _ctypes_to_numpy[raw_array._obj._type_]
    class Dummy(object): pass
    d = Dummy()
    d.__array_interface__ = {
        'data' : (address, False),
        'typestr' : dtype.str,
        'descr' :   dtype.descr,
        'shape' : shape,
        'strides' : None,
        'version' : 3}
    return N.asarray(d)

def empty_shared_array(shape, dtype, lock=True):
    '''
    Generate an empty MP shared array given ndarray parameters
    '''

    if type(shape) is not int:
        shape = N.asarray(shape).prod()
    try:
        c_type = _numpy_to_ctypes[dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[N.dtype(dtype)]
    return MP.Array(c_type, shape, lock=lock)

def emptylike_shared_array(ndarray, lock=True):
    'Generate a empty shared array with size and dtype of a  given array'
    return empty_shared_array(ndarray.size, ndarray.dtype, lock)

Solution 3:

From the other answers, it seems that numpy-sharedmem is the way to go.

However, if you need a pure python solution, or installing extensions, cython or the like is a (big) hassle, you might want to use the following code which is a simplified version of Nadav's code:

import numpy, ctypes, multiprocessing

_ctypes_to_numpy = {
    ctypes.c_char   : numpy.dtype(numpy.uint8),
    ctypes.c_wchar  : numpy.dtype(numpy.int16),
    ctypes.c_byte   : numpy.dtype(numpy.int8),
    ctypes.c_ubyte  : numpy.dtype(numpy.uint8),
    ctypes.c_short  : numpy.dtype(numpy.int16),
    ctypes.c_ushort : numpy.dtype(numpy.uint16),
    ctypes.c_int    : numpy.dtype(numpy.int32),
    ctypes.c_uint   : numpy.dtype(numpy.uint32),
    ctypes.c_long   : numpy.dtype(numpy.int64),
    ctypes.c_ulong  : numpy.dtype(numpy.uint64),
    ctypes.c_float  : numpy.dtype(numpy.float32),
    ctypes.c_double : numpy.dtype(numpy.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(),
                            _ctypes_to_numpy.keys()))


def shm_as_ndarray(mp_array, shape = None):
    '''Given a multiprocessing.Array, returns an ndarray pointing to
    the same data.'''

    # support SynchronizedArray:
    if not hasattr(mp_array, '_type_'):
        mp_array = mp_array.get_obj()

    dtype = _ctypes_to_numpy[mp_array._type_]
    result = numpy.frombuffer(mp_array, dtype)

    if shape is not None:
        result = result.reshape(shape)

    return numpy.asarray(result)


def ndarray_to_shm(array, lock = False):
    '''Generate an 1D multiprocessing.Array containing the data from
    the passed ndarray.  The data will be *copied* into shared
    memory.'''

    array1d = array.ravel(order = 'A')

    try:
        c_type = _numpy_to_ctypes[array1d.dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)]

    result = multiprocessing.Array(c_type, array1d.size, lock = lock)
    shm_as_ndarray(result)[:] = array1d
    return result

You would use it like this:

  1. Use sa = ndarray_to_shm(a) to convert the ndarray a into a shared multiprocessing.Array.
  2. Use multiprocessing.Process(target = somefunc, args = (sa, ) (and start, maybe join) to call somefunc in a separate process, passing the shared array.
  3. In somefunc, use a = shm_as_ndarray(sa) to get an ndarray pointing to the shared data. (Actually, you may want to do the same in the original process, immediately after creating sa, in order to have two ndarrays referencing the same data.)

AFAICS, you don't need to set lock to True, since shm_as_ndarray will not use the locking anyhow. If you need locking, you would set lock to True and call acquire/release on sa.

Also, if your array is not 1-dimensional, you might want to transfer the shape along with sa (e.g. use args = (sa, a.shape)).

This solution has the advantage that it does not need additional packages or extension modules, except multiprocessing (which is in the standard library).

Solution 4:

Use threads. But I guess you are going to get problems with the GIL.

Instead: Choose your poison.

I know from the MPI implementations I work with, that they use shared memory for on-node-communications. You will have to code your own synchronization in that case.

2 GB/s sounds like you will get problems with most "easy" methods, depending on your real-time constraints and available main memory.

Solution 5:

One possibility to consider is to use a RAM drive for the temporary storage of files to be shared between processes. A RAM drive is where a portion of RAM is treated as a logical hard drive, to which files can be written/read as you would with a regular drive, but at RAM read/write speeds.

This article describes using the ImDisk software (for MS Win) to create such disk and obtains file read/write speeds of 6-10 Gigabytes/second: https://www.tekrevue.com/tip/create-10-gbs-ram-disk-windows/

An example in Ubuntu: https://askubuntu.com/questions/152868/how-do-i-make-a-ram-disk#152871

Another noted benefit is that files with arbitrary formats can be passed around with such method: e.g. Picke, JSON, XML, CSV, HDF5, etc...

Keep in mind that anything stored on the RAM disk is wiped on reboot.