How to stop Spring Kafka Listner and invoke some function to execute after the Consumer is stopped?
I have a scenario where I need to stop kafka and invoke some function to execute after the Consumer is stopped. For the same, flow would be something like this:
- Consume messages from kafka topic
- Add Each consumed messages to the file
- Stop kafka listener if it has not received any messages in past 10s
- Call some function For Ex: UploadFileToS3()
I'm using spring kafka's @KafkaListener annotation on my consumer method.
I know for stoping Kafka consumer annotated with @KafkaListener I can use KafkaListenerEndpointRegistry
I want something like this:
if(not consumed messages for more than 10s) {
kafkaListenerEndpointRegistry.getListenerContainer("my-listener-id").stop();
UploadFileToS3()
}
How can I do this with Spring Kafka?
Solution 1:
See ListenerContainerIdleEvent
:
/**
* An event that is emitted when a container is idle if the container
* is configured to do so.
*
* @author Gary Russell
*
*/
public class ListenerContainerIdleEvent extends KafkaEvent {
The documentation is here: https://docs.spring.io/spring-kafka/docs/current/reference/html/#idle-containers