How to stop Spring Kafka Listner and invoke some function to execute after the Consumer is stopped?

I have a scenario where I need to stop kafka and invoke some function to execute after the Consumer is stopped. For the same, flow would be something like this:

  1. Consume messages from kafka topic
  2. Add Each consumed messages to the file
  3. Stop kafka listener if it has not received any messages in past 10s
  4. Call some function For Ex: UploadFileToS3()

I'm using spring kafka's @KafkaListener annotation on my consumer method.

I know for stoping Kafka consumer annotated with @KafkaListener I can use KafkaListenerEndpointRegistry

I want something like this:

if(not consumed messages for more than 10s) {
    kafkaListenerEndpointRegistry.getListenerContainer("my-listener-id").stop();
    UploadFileToS3()
}

How can I do this with Spring Kafka?


Solution 1:

See ListenerContainerIdleEvent:

/**
 * An event that is emitted when a container is idle if the container
 * is configured to do so.
 *
 * @author Gary Russell
 *
 */
public class ListenerContainerIdleEvent extends KafkaEvent {

The documentation is here: https://docs.spring.io/spring-kafka/docs/current/reference/html/#idle-containers