multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes
Solution 1:
The first argument to Value
is typecode_or_type. That is defined as:
typecode_or_type determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the array module. *args is passed on to the constructor for the type.
Emphasis mine. So, you simply cannot put a pandas dataframe in a Value
, it has to be a ctypes type.
You could instead use a multiprocessing.Manager
to serve your singleton dataframe instance to all of your processes. There's a few different ways to end up in the same place - probably the easiest is to just plop your dataframe into the manager's Namespace
.
from multiprocessing import Manager
mgr = Manager()
ns = mgr.Namespace()
ns.df = my_dataframe
# now just give your processes access to ns, i.e. most simply
# p = Process(target=worker, args=(ns, work_unit))
Now your dataframe instance is accessible to any process that gets passed a reference to the Manager. Or just pass a reference to the Namespace
, it's cleaner.
One thing I didn't/won't cover is events and signaling - if your processes need to wait for others to finish executing, you'll need to add that in. Here is a page with some Event
examples which also cover with a bit more detail how to use the manager's Namespace
.
(note that none of this addresses whether multiprocessing
is going to result in tangible performance benefits, this is just giving you the tools to explore that question)
Solution 2:
You can use Array
instead of Value
for storing your dataframe.
The solution below converts a pandas
dataframe to an object that stores its data in shared memory:
import numpy as np
import pandas as pd
import multiprocessing as mp
import ctypes
# the origingal dataframe is df, store the columns/dtypes pairs
df_dtypes_dict = dict(list(zip(df.columns, df.dtypes)))
# declare a shared Array with data from df
mparr = mp.Array(ctypes.c_double, df.values.reshape(-1))
# create a new df based on the shared array
df_shared = pd.DataFrame(np.frombuffer(mparr.get_obj()).reshape(df.shape),
columns=df.columns).astype(df_dtypes_dict)
If now you share df_shared
across processes, no additional copies will be made. For you case:
pool = mp.Pool(15)
def fun(config):
# df_shared is global to the script
df_shared.apply(config) # whatever compute you do with df/config
config_list = [config1, config2]
res = p.map_async(fun, config_list)
p.close()
p.join()
This is also particularly useful if you use pandarallel, for example:
# this will not explode in memory
from pandarallel import pandarallel
pandarallel.initialize()
df_shared.parallel_apply(your_fun, axis=1)
Note: with this solution you end up with two dataframes (df and df_shared), which consume twice the memory and are long to initialise. It might be possible to read the data directly in shared memory.