ConcurrentKafkaListenerContainerFactory message converter is ignored when configuring listeners automatically

I need to create Kafka listeners at runtime, and everything seems working, except that the message converter property seems being ignored (or maybe this is a designed feature or I've made something wrong).

When using @KafkaListener, it works correct, but when creating listeners manually my message isn't converted to a desired object and I'm getting an error:

Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.my.company.model.MyPojo (java.lang.String is in module java.base of loader 'bootstrap'; com.my.company.model.MyPojo is in unnamed module of loader 'app')
    at com.my.company.config.MyPojo.kafka.KafkaConfig.lambda$createListenerContainers$2(KafkaConfig.java:142)

My configuration:

@Bean
ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {    
    var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());       
    return factory;
}

@Bean
MessageListenerContainer createListenerContainer1() {
    ContainerProperties containerProperties = new ContainerProperties(topicConfig("my_topic"));
    var container = new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
    //tried this too...
    //var container = kafkaListenerContainerFactory().createContainer(topicConfig("my_topic")); 
    container.setupMessageListener((MessageListener<String, MyPojo>) data -> getDataService.process(data.value()););
    container.start();

    return container;
}

The WORKING Kafka listener:

@KafkaListener(id = "1", topics = "my_topic)
public void listenGetDataTopic(@Payload MyPojo message) {
    log.info(message);
}

I've tried a lot of different configs and to debug it deeply, and, of course I see the difference between handling messages when using @KafkaListener and manually created listeners, but I didn't figure out how to apply a message conversion to a manually created listeners. Is there a possibility to achieve this?


Solution 1:

The message converter is not a property of the container, it is a property of the listener adapter used to invoke the pojo method for the @KafkaListener.

When using a container directly, your listener must implement MessageListener or one of its sub-interfaces.

You can either invoke the converter yourself in your listener (e.g. create a lightweight adapter) or you need to use another technique for dynamically creating @KafkaListeners.

See

Kafka Spring: How to create Listeners dynamically or in a loop?

Kafka Consumer in spring can I re-assign partitions programmatically?

Can i add topics to my @kafkalistener at runtime

for some examples of those techniques.