Modify object in python multiprocessing

I have a large array of custom objects which I need to perform independent (parallelizable) tasks on, including modifying object parameters. I've tried using both a Manager().dict, and 'sharedmem'ory, but neither is working. For example:

import numpy as np
import multiprocessing as mp
import sharedmem as shm


class Tester:

    num = 0.0
    name = 'none'
    def __init__(self,tnum=num, tname=name):
        self.num  = tnum
        self.name = tname

    def __str__(self):
        return '%f %s' % (self.num, self.name)

def mod(test, nn):
    test.num = np.random.randn()
    test.name = nn


if __name__ == '__main__':

    num = 10

    tests = np.empty(num, dtype=object)
    for it in range(num):
        tests[it] = Tester(tnum=it*1.0)

    sh_tests = shm.empty(num, dtype=object)
    for it in range(num):
        sh_tests[it] = tests[it]
        print sh_tests[it]

    print '\n'
    workers = [ mp.Process(target=mod, args=(test, 'some') ) for test in sh_tests ]

    for work in workers: work.start()

    for work in workers: work.join()

    for test in sh_tests: print test

prints out:

0.000000 none
1.000000 none
2.000000 none
3.000000 none
4.000000 none
5.000000 none
6.000000 none
7.000000 none
8.000000 none
9.000000 none


0.000000 none
1.000000 none
2.000000 none
3.000000 none
4.000000 none
5.000000 none
6.000000 none
7.000000 none
8.000000 none
9.000000 none

I.e. the objects aren't modified.

How can I achieve the desired behavior?


The problem is that when the objects are passed to the worker processes, they are packed up with pickle, shipped to the other process, where they are unpacked and worked on. Your objects aren't so much passed to the other process, as cloned. You don't return the objects, so the cloned object are happily modified, and then thrown away.

It looks like this can not be done (Python: Possible to share in-memory data between 2 separate processes) directly.

What you can do is return the modified objects.

import numpy as np
import multiprocessing as mp



class Tester:

    num = 0.0
    name = 'none'
    def __init__(self,tnum=num, tname=name):
        self.num  = tnum
        self.name = tname

    def __str__(self):
        return '%f %s' % (self.num, self.name)

def mod(test, nn, out_queue):
    print test.num
    test.num = np.random.randn()
    print test.num
    test.name = nn
    out_queue.put(test)




if __name__ == '__main__':       
    num = 10
    out_queue = mp.Queue()
    tests = np.empty(num, dtype=object)
    for it in range(num):
        tests[it] = Tester(tnum=it*1.0)


    print '\n'
    workers = [ mp.Process(target=mod, args=(test, 'some', out_queue) ) for test in tests ]

    for work in workers: work.start()

    for work in workers: work.join()

    res_lst = []
    for j in range(len(workers)):
        res_lst.append(out_queue.get())

    for test in res_lst: print test

This does lead to the interesting observation that because the spawned processes are identical, they all start with the same seed for the random number, so they all generate the same 'random' number.


Your code doesn't try to modify the shared memory. It just clones individual objects.

dtype=object means that sharedmem won't work due to reasons outlined in the link provided by @tcaswell:

sharing of object graphs that include references/pointers to other objects is basically unfeasible

For plain (value) types you can use shared memory, see Use numpy array in shared memory for multiprocessing.

The manager approach should also work (it just copies the objects around):

import random
from multiprocessing import Pool, Manager

class Tester(object):
    def __init__(self, num=0.0, name='none'):
        self.num  = num
        self.name = name

    def __repr__(self):
        return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)

def init(L):
    global tests
    tests = L

def modify(i_t_nn):
    i, t, nn = i_t_nn
    t.num += random.normalvariate(mu=0, sigma=1) # modify private copy
    t.name = nn
    tests[i] = t # copy back
    return i

def main():
    num_processes = num = 10 #note: num_processes and num may differ
    manager = Manager()
    tests = manager.list([Tester(num=i) for i in range(num)])
    print(tests[:2])

    args = ((i, t, 'some') for i, t in enumerate(tests))
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
    for i in pool.imap_unordered(modify, args):
        print("done %d" % i)
    pool.close()
    pool.join()
    print(tests[:2])

if __name__ == '__main__':
    main()

I don't see you passing shm references out into the child processes so I don't see how work done by them could be written back into the shared memory. Perhaps I'm missing something here.

Alternatively, have you considered numpy.memmap? (BTW: tcaswell, the module referred to here seems to be:numpy-sharedmem).

Also you might want to read Sturla Molden's Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing(PDF) as recommended in unutbu's answer to [StackOverflow:How do I pass large numpy arrays between python subprocesses without saving to disk?] and (How do I pass large numpy arrays between python subprocesses without saving to disk?). and Joe Kington's StackOverflow: NumPy vs. multiprocessing and mmap.

These might be more inspirational than directly relevant.


Because you can't share Python objects between processes, any implementation using multiprocessing will be inefficient if you have significant objects, since you will have to copy the objects in order to share data.

If you're willing to try a different approach, you can try out Ray (docs)! It's a framework that makes it easy to write parallel and distributed Python. In a nutshell, it gives you the ability to launch Python functions in parallel, similar to multiprocessing, but it's also more flexible in that Ray processes can share memory. Here's your script written in Ray, using the concept of "actors" (shared objects):

# You can install Ray with pip.
import ray

import numpy as np


# Add this line to signify that you want to share Tester objects
# (called "actors" in Ray) between processes.
@ray.remote
class Tester(object):

    num = 0.0
    name = 'none'
    def __init__(self,tnum=num, tname=name):
        self.num  = tnum
        self.name = tname

    def __str__(self):
        return '%f %s' % (self.num, self.name)

    # Convert mod to be a method of the Tester object.
    def mod(self, nn):
        self.num = np.random.randn()
        self.name = nn


if __name__ == '__main__':

    # Start Ray. This allows you to create shared Testers (called "actors").
    ray.init()

    num = 10

    tests = np.empty(num, dtype=object)
    for it in range(num):
        # Create a shared Tester object (an "actor").
        tests[it] = Tester.remote(tnum=it*1.0)

    # Do some parallel work.
    for test in tests:
        test.mod.remote('some')

    # Compute the __str__ representations of each Tester in parallel.
    test_str_futures = [test.__str__.remote() for test in tests]
    # Get and print the __str__ return values. `ray.get` will block
    # until the return values are ready.
    test_strs = ray.get(test_str_futures)
    for test_str in test_strs:
        print(test_str)