How do we cache / persist dataset in spark structured streaming 2.4.4

I want to write three separate outputs on the one calculated dataset, For that I have to cache / persist my first dataset, else it is going to caculate the first dataset three times which increase my calculation time.

e.g.

FirstDataset // Get data from kafka;

SecondDataset = FirstDataSet.mapPartitions(Some Calculations);

ThirdDataset = SecondDataset.mapPartitions(Some Calculations);

Now I want to filter my ThirdDataset and output the filtered datasets for three different conditions with different logic.

ThirdDataset.filter(**Condition1**).writeStream().foreach(**SOMECALCULATIONS1**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

ThirdDataset.filter(**Condition2**).writeStream().foreach(**SOMECALCULATIONS2**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

ThirdDataset.filter(**Condition3**).writeStream().foreach(**SOMECALCULATIONS3**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();

Now for each writestream ThirdDataset is calculating, If I cache ThirdDataset then it will not calculate thrice.

But when I do ThirdDataset.cache() it thows me following error,

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Can anyone please suggest me.


Use foreachbatch sink and do a cache in there on the dataframe/dataset!