Dask map_partitions meta when using lambda function to add column

meta can be provided via kwarg to .map_partitions:

some_result = dask_df.map_partitions(some_func, meta=expected_df)

expected_df could be specified manually, or alternatively you could compute it explicitly on a small sample of data (in which case it will be a pandas dataframe).

There are more details in the docs.


Sultan's answer is perfect about using meta. :)

You can also avoid using map_partitions here because Dask implements apply, which calls map_partitions internally:

import json
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'x': range(1,5),
                   'y': range(6,10),
                  }).astype('str')

ddf = dd.from_pandas(df, npartitions=2)

def myfunc(x):
    s = "string: " + x[0]
    j = json.dumps({'json': x[1]})
    return [s, j]

ddf[['new_col_1', 'new_col_2']] = ddf.apply(myfunc, axis=1, result_type="expand", meta={0: 'object', 1: 'object'})

ddf.compute()

# Output of ddf.compute():
#
#    x  y  new_col_1      new_col_2
# 0  1  6  string: 1  {"json": "6"}
# 1  2  7  string: 2  {"json": "7"}
# 2  3  8  string: 3  {"json": "8"}
# 3  4  9  string: 4  {"json": "9"}

Also, in your code snippet, calling .compute() will create a pandas DataFrame, and hence, you'll get an error if you assign it to a Dask DataFrame (ddata). I'd suggest calling compute on ddata after assignment.