Registering kafkalistenerContainers with KafkaListenerEndpointRegistry

I want to pragmatically control when to start/stop my kafka listeners.So looking through some previous posts and discussions it looks like I could use KafkaListenerEndpointRegistry.getListenerContainer(id).stop() to do that .However I verified that no containers are registered with my KafkaListenerEndpointRegistry bean.How do I register my container with KafkaListenerEndpointRegistry ?

    @Autowired
     KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;


            @Bean
            public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
                kafkaListenerEndpointRegistry =  new KafkaListenerEndpointRegistry();
                return kafkaListenerEndpointRegistry;
            }


            @Bean

            public ConcurrentKafkaListenerContainerFactory<?, ?> 
                kafkaListenerContainerFactory(
                        ConsumerFactory<String, SpecificRecord> kafkaConsumerFactory
                        ) {
                    ConcurrentKafkaListenerContainerFactory<String, SpecificRecord> factory 
                = new ConcurrentKafkaListenerContainerFactory<>();
                factory.setConsumerFactory(consumerFactory());
              factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
                    factory.getContainerProperties().setIdleEventInterval(60000L);
                    factory.getContainerProperties().setAckOnError(false);
                    factory.setRetryTemplate(getRetryTemplate());
                    factory.setConcurrency(2);
                    factory.getContainerProperties().setErrorHandler(rawLogsErrorHandler(KafkaTemplate));
                    return factory;
                }
            @Bean
        KafkaTransactionManager<String,SpecificRecord> kafkaTransactionManager() {
            return  new KafkaTransactionManager<>(producerFactory());
        }

    @Bean
        public RetryTemplate getRetryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();
            FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
            backOffPolicy.setBackOffPeriod(1000);
            RetryPolicy retryPolicy = new SimpleRetryPolicy();
            retryTemplate.setBackOffPolicy(backOffPolicy);
            retryTemplate.setRetryPolicy(retryPolicy);
            retryTemplate.registerListener(retryListener());
            return retryTemplate;
        }



        @Bean
        public LoggingErrorHandler rawLogsErrorHandler(KafkaTemplate<String,SpecificRecord> kafkaTemplate) {
    return new LoggingErrorHandler() {
                @SuppressWarnings({ "rawtypes", "unchecked" })
                @Override
                public void handle(Exception thrownException, ConsumerRecord<?,?> record) {

    // record send to a dead letter here
    //stop all listeners
kafkaListenerEndpointRegistry.stop();
    }

    @Bean
        public LogReceiver receiver() {
            return new LogReceiver();
        } 

    // and on Logreciever class


    public class Logreciever 
        @KafkaListener(topics = RAWLLOGTOPIC,id="rawLogConsumer",containerFactory="kafkaListenerContainerFactory")
        public void onMessage(@Payload Log log, 
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.OFFSET) Long offset) throws Exception
                 {

    //processing code

    }
    }

Solution 1:

See the documentation. Only containers for @KafkaListeners are registered in the registry.

Containers retrieved from the factory as @Beans are registered with the application context.

If you manually create containers using the container factory, no registration is performed.