multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes

Solution 1:

The first argument to Value is typecode_or_type. That is defined as:

typecode_or_type determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the array module. *args is passed on to the constructor for the type.

Emphasis mine. So, you simply cannot put a pandas dataframe in a Value, it has to be a ctypes type.

You could instead use a multiprocessing.Manager to serve your singleton dataframe instance to all of your processes. There's a few different ways to end up in the same place - probably the easiest is to just plop your dataframe into the manager's Namespace.

from multiprocessing import Manager

mgr = Manager()
ns = mgr.Namespace()
ns.df = my_dataframe

# now just give your processes access to ns, i.e. most simply
# p = Process(target=worker, args=(ns, work_unit))

Now your dataframe instance is accessible to any process that gets passed a reference to the Manager. Or just pass a reference to the Namespace, it's cleaner.

One thing I didn't/won't cover is events and signaling - if your processes need to wait for others to finish executing, you'll need to add that in. Here is a page with some Event examples which also cover with a bit more detail how to use the manager's Namespace.

(note that none of this addresses whether multiprocessing is going to result in tangible performance benefits, this is just giving you the tools to explore that question)

Solution 2:

You can use Array instead of Value for storing your dataframe.

The solution below converts a pandas dataframe to an object that stores its data in shared memory:

import numpy as np
import pandas as pd
import multiprocessing as mp
import ctypes

# the origingal dataframe is df, store the columns/dtypes pairs
df_dtypes_dict = dict(list(zip(df.columns, df.dtypes)))

# declare a shared Array with data from df
mparr = mp.Array(ctypes.c_double, df.values.reshape(-1))

# create a new df based on the shared array
df_shared = pd.DataFrame(np.frombuffer(mparr.get_obj()).reshape(df.shape),
                         columns=df.columns).astype(df_dtypes_dict)

If now you share df_shared across processes, no additional copies will be made. For you case:

pool = mp.Pool(15)

def fun(config):
    # df_shared is global to the script
    df_shared.apply(config)  # whatever compute you do with df/config

config_list = [config1, config2]
res = p.map_async(fun, config_list)
p.close()
p.join()

This is also particularly useful if you use pandarallel, for example:

# this will not explode in memory
from pandarallel import pandarallel
pandarallel.initialize()
df_shared.parallel_apply(your_fun, axis=1)

Note: with this solution you end up with two dataframes (df and df_shared), which consume twice the memory and are long to initialise. It might be possible to read the data directly in shared memory.