pandas multiprocessing apply

Solution 1:

You can use https://github.com/nalepae/pandarallel, as in the following example:

from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

Solution 2:

A more generic version based on the author solution, that allows to run it on every function and dataframe:

from multiprocessing import  Pool
from functools import partial
import numpy as np

def parallelize(data, func, num_of_processes=8):
    data_split = np.array_split(data, num_of_processes)
    pool = Pool(num_of_processes)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def run_on_subset(func, data_subset):
    return data_subset.apply(func, axis=1)

def parallelize_on_rows(data, func, num_of_processes=8):
    return parallelize(data, partial(run_on_subset, func), num_of_processes)

So the following line:

df.apply(some_func, axis=1)

Will become:

parallelize_on_rows(df, some_func) 

Solution 3:

This is some code that I found useful. Automatically splits the dataframe into however many cpu cores you have.

import pandas as pd
import numpy as np
import multiprocessing as mp

def parallelize_dataframe(df, func):
    num_processes = mp.cpu_count()
    df_split = np.array_split(df, num_processes)
    with mp.Pool(num_processes) as p:
        df = pd.concat(p.map(func, df_split))
    return df

def parallelize_function(df):
    df[column_output] = df[column_input].apply(example_function)
    return df

def example_function(x):
    x = x*2
    return x

To run:

df_output = parallelize_dataframe(df, parallelize_function)

Solution 4:

Since I don't have much of your data script, this is a guess, but I'd suggest using p.map instead of apply_async with the callback.

p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
    results.extend(result)