How can I read every 5 seconds in pyspark with kafka readStream?

I want to read a topic every 5 seconds; with older versions of pyspark I can use kafka-utils and window method, but currently, I cannot use that.

Now I am loading data from kafka with spark with the following code

spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", 'data') \
    .load()

But whit this, I am reading all data.

So I want to know how I can read the data with batches size of 1 second every 5 seconds if possible.

Thanks


Assuming you want to aggregate and group by something every 5 second interval, refer documentation on windowing

This should define a tumbling window

kafka_df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(
        window(kafka_df.timestamp, "5 seconds", "1 second"),
        kafka_df.value)