How to iterate over a batch DF parallely in pyspark

IMHO you're doing too much lifting that Spark can handle, killing the parallelization that Spark can offer out of the box.

I would advise the following approach instead:

# use withColumn() to handle "key = anon_id[0:3]"

def filter_delta_with_batch(delta_df, batch_df):
    filtered_df = delta_df.join(
        batch_df,
        (delta_df.pro_id == batch_df.pro_id) & (delta_df.key == batch_df.key) & (delta_df.anon_id == batch_df.anon_id) & (delta_df.prof_id == batch_df.prof_id),
        how='leftsemi',
    )
    return filtered_df

# process filtered_df with kafka

You might be worried that a join would be highly inefficient here but leftsemi and leftanti are powerful Spark joins for filtering data. Of course it might depends on data repartition / shuffle induced.