Registering kafkalistenerContainers with KafkaListenerEndpointRegistry
I want to pragmatically control when to start/stop my kafka listeners.So looking through some previous posts and discussions it looks like I could use KafkaListenerEndpointRegistry.getListenerContainer(id).stop() to do that .However I verified that no containers are registered with my KafkaListenerEndpointRegistry bean.How do I register my container with KafkaListenerEndpointRegistry ?
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Bean
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
kafkaListenerEndpointRegistry = new KafkaListenerEndpointRegistry();
return kafkaListenerEndpointRegistry;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?>
kafkaListenerContainerFactory(
ConsumerFactory<String, SpecificRecord> kafkaConsumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, SpecificRecord> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
factory.getContainerProperties().setIdleEventInterval(60000L);
factory.getContainerProperties().setAckOnError(false);
factory.setRetryTemplate(getRetryTemplate());
factory.setConcurrency(2);
factory.getContainerProperties().setErrorHandler(rawLogsErrorHandler(KafkaTemplate));
return factory;
}
@Bean
KafkaTransactionManager<String,SpecificRecord> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
@Bean
public RetryTemplate getRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
RetryPolicy retryPolicy = new SimpleRetryPolicy();
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.registerListener(retryListener());
return retryTemplate;
}
@Bean
public LoggingErrorHandler rawLogsErrorHandler(KafkaTemplate<String,SpecificRecord> kafkaTemplate) {
return new LoggingErrorHandler() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void handle(Exception thrownException, ConsumerRecord<?,?> record) {
// record send to a dead letter here
//stop all listeners
kafkaListenerEndpointRegistry.stop();
}
@Bean
public LogReceiver receiver() {
return new LogReceiver();
}
// and on Logreciever class
public class Logreciever
@KafkaListener(topics = RAWLLOGTOPIC,id="rawLogConsumer",containerFactory="kafkaListenerContainerFactory")
public void onMessage(@Payload Log log,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) Long offset) throws Exception
{
//processing code
}
}
Solution 1:
See the documentation. Only containers for @KafkaListener
s are registered in the registry.
Containers retrieved from the factory as @Bean
s are registered with the application context.
If you manually create containers using the container factory, no registration is performed.