Python: using multiprocessing on a pandas dataframe
Solution 1:
What's wrong
This line from your code:
pool.map(calc_dist, ['lat','lon'])
spawns 2 processes - one runs calc_dist('lat')
and the other runs calc_dist('lon')
. Compare the first example in doc. (Basically, pool.map(f, [1,2,3])
calls f
three times with arguments given in the list that follows: f(1)
, f(2)
, and f(3)
.) If I'm not mistaken, your function calc_dist
can only be called calc_dist('lat', 'lon')
. And it doesn't allow for parallel processing.
Solution
I believe you want to split the work between processes, probably sending each tuple (grp, lst)
to a separate process. The following code does exactly that.
First, let's prepare for splitting:
grp_lst_args = list(df.groupby('co_nm').groups.items())
print(grp_lst_args)
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
We'll send each of these tuples (here, there are three of them) as an argument to a function in a separate process. We need to rewrite the function, let's call it calc_dist2
. For convenience, it's argument is a tuple as in calc_dist2(('aa',[0,1,2]))
def calc_dist2(arg):
grp, lst = arg
return pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], ['lat','lon']],
df.loc[c[1], ['lat','lon']])
]
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
And now comes the multiprocessing:
pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)
results
is a list of results (here data frames) of calls calc_dist2((grp,lst))
for (grp,lst)
in grp_lst_args
. Elements of results
are later concatenated to one data frame.
print(results_df)
co_nm machineA machineB distance
0 aa 1 2 156.876149391 km
1 aa 1 3 313.705445447 km
2 aa 2 3 156.829329105 km
0 cc 8 9 156.060165391 km
1 cc 8 0 311.910998169 km
2 cc 9 0 155.851498134 km
0 bb 4 5 156.665641837 km
1 bb 4 6 313.214333025 km
2 bb 4 7 469.622535339 km
3 bb 5 6 156.548897414 km
4 bb 5 7 312.957597466 km
5 bb 6 7 156.40899677 km
BTW, In Python 3 we could use a with
construction:
with mp.Pool() as pool:
results = pool.map(calc_dist2, grp_lst_args)
Update
I tested this code only on linux. On linux, the read only data frame df
can be accessed by child processes and is not copied to their memory space, but I'm not sure how it exactly works on Windows. You may consider splitting df
into chunks (grouped by co_nm
) and sending these chunks as arguments to some other version of calc_dist
.
Solution 2:
I wrote a package to use apply methods on Series, DataFrames and GroupByDataFrames on multiple cores. It makes it very easy to do multiprocessing in Pandas.
You can check the documentation at https://github.com/akhtarshahnawaz/multiprocesspandas
You can also install the package directly using pip
pip install multiprocesspandas
Then doing multiprocessing is as simple as importing the package as
from multiprocesspandas import applyparallel
and then using applyparallel instead of apply like
def func(x):
import pandas as pd
return pd.Series([x['C'].mean()])
df.groupby(["A","B"]).apply_parallel(func, num_processes=30)
Solution 3:
Strange. It seems to work under python2 but not python3.
This is a minimal modified version to print the output:
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
ret = pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], x],
df.loc[c[1], x])
]
for grp,lst in df.groupby('co_nm').groups.items()
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
print(ret)
return ret
if __name__ == '__main__':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, ['lat','lon'])
pool.close()
pool.join()
And this is the output from python2
0 aa 1 2 110.723608682 km
1 aa 1 3 221.460709525 km
2 aa 2 3 110.737100843 km
3 cc 8 9 110.827576495 km
4 cc 8 0 221.671650552 km
co_nm machineA machineB distance
5 cc 9 0 110.844074057 km
0 aa 1 2 110.575064814 km
1 aa 1 3 221.151481337 km
6 bb 4 5 110.765515243 km
2 aa 2 3 110.576416524 km
7 bb 4 6 221.5459187 km
3 cc 8 9 110.598565514 km
4 cc 8 0 221.203121352 km
8 bb 4 7 332.341640771 km
5 cc 9 0 110.604555838 km
6 bb 4 5 110.58113908 km
9 bb 5 6 110.780403457 km
7 bb 4 6 221.165643396 km
10 bb 5 7 221.576125528 km
8 bb 4 7 331.754177186 km
9 bb 5 6 110.584504316 km
10 bb 5 7 221.173038106 km
11 bb 6 7 110.795722071 km
11 bb 6 7 110.58853379 km
And this the stack trace from python3
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 123, in __new__
seq = iter(arg)
TypeError: 'numpy.int64' object is not iterable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.4/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "gps.py", line 29, in calc_dist
for grp, lst in df.groupby('co_nm').groups.items()
File "gps.py", line 30, in <listcomp>
for c in combinations(lst, 2)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 322, in __init__
super(vincenty, self).__init__(*args, **kwargs)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 115, in __init__
kilometers += self.measure(a, b)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 342, in measure
a, b = Point(a), Point(b)
File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 126, in __new__
"Failed to create Point instance from %r." % (arg,)
TypeError: Failed to create Point instance from 8.
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "gps.py", line 38, in <module>
pool.map(calc_dist, ['lat', 'lon'])
File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
TypeError: Failed to create Point instance from 8.
I know this is not the answer, but maybe it helps...