How to iterate over a batch DF parallely in pyspark
IMHO you're doing too much lifting that Spark can handle, killing the parallelization that Spark can offer out of the box.
I would advise the following approach instead:
# use withColumn() to handle "key = anon_id[0:3]"
def filter_delta_with_batch(delta_df, batch_df):
filtered_df = delta_df.join(
batch_df,
(delta_df.pro_id == batch_df.pro_id) & (delta_df.key == batch_df.key) & (delta_df.anon_id == batch_df.anon_id) & (delta_df.prof_id == batch_df.prof_id),
how='leftsemi',
)
return filtered_df
# process filtered_df with kafka
You might be worried that a join would be highly inefficient here but leftsemi and leftanti are powerful Spark joins for filtering data. Of course it might depends on data repartition / shuffle induced.