Dask map_partitions meta when using lambda function to add column
meta
can be provided via kwarg to .map_partitions
:
some_result = dask_df.map_partitions(some_func, meta=expected_df)
expected_df
could be specified manually, or alternatively you could compute it explicitly on a small sample of data (in which case it will be a pandas
dataframe).
There are more details in the docs.
Sultan's answer is perfect about using meta
. :)
You can also avoid using map_partitions
here because Dask implements apply
, which calls map_partitions
internally:
import json
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({'x': range(1,5),
'y': range(6,10),
}).astype('str')
ddf = dd.from_pandas(df, npartitions=2)
def myfunc(x):
s = "string: " + x[0]
j = json.dumps({'json': x[1]})
return [s, j]
ddf[['new_col_1', 'new_col_2']] = ddf.apply(myfunc, axis=1, result_type="expand", meta={0: 'object', 1: 'object'})
ddf.compute()
# Output of ddf.compute():
#
# x y new_col_1 new_col_2
# 0 1 6 string: 1 {"json": "6"}
# 1 2 7 string: 2 {"json": "7"}
# 2 3 8 string: 3 {"json": "8"}
# 3 4 9 string: 4 {"json": "9"}
Also, in your code snippet, calling .compute()
will create a pandas DataFrame, and hence, you'll get an error if you assign it to a Dask DataFrame (ddata
). I'd suggest calling compute
on ddata
after assignment.