How do we cache / persist dataset in spark structured streaming 2.4.4
I want to write three separate outputs on the one calculated dataset, For that I have to cache / persist my first dataset, else it is going to caculate the first dataset three times which increase my calculation time.
e.g.
FirstDataset // Get data from kafka;
SecondDataset = FirstDataSet.mapPartitions(Some Calculations);
ThirdDataset = SecondDataset.mapPartitions(Some Calculations);
Now I want to filter my ThirdDataset and output the filtered datasets for three different conditions with different logic.
ThirdDataset.filter(**Condition1**).writeStream().foreach(**SOMECALCULATIONS1**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
ThirdDataset.filter(**Condition2**).writeStream().foreach(**SOMECALCULATIONS2**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
ThirdDataset.filter(**Condition3**).writeStream().foreach(**SOMECALCULATIONS3**).outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(600000)).start();
Now for each writestream ThirdDataset is calculating, If I cache ThirdDataset then it will not calculate thrice.
But when I do ThirdDataset.cache()
it thows me following error,
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
Can anyone please suggest me.
Use foreachbatch sink and do a cache in there on the dataframe/dataset!