ConcurrentKafkaListenerContainerFactory message converter is ignored when configuring listeners automatically
I need to create Kafka listeners at runtime, and everything seems working, except that the message converter property seems being ignored (or maybe this is a designed feature or I've made something wrong).
When using @KafkaListener
, it works correct, but when creating listeners manually my message isn't converted to a desired object and I'm getting an error:
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.my.company.model.MyPojo (java.lang.String is in module java.base of loader 'bootstrap'; com.my.company.model.MyPojo is in unnamed module of loader 'app')
at com.my.company.config.MyPojo.kafka.KafkaConfig.lambda$createListenerContainers$2(KafkaConfig.java:142)
My configuration:
@Bean
ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Bean
MessageListenerContainer createListenerContainer1() {
ContainerProperties containerProperties = new ContainerProperties(topicConfig("my_topic"));
var container = new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
//tried this too...
//var container = kafkaListenerContainerFactory().createContainer(topicConfig("my_topic"));
container.setupMessageListener((MessageListener<String, MyPojo>) data -> getDataService.process(data.value()););
container.start();
return container;
}
The WORKING Kafka listener:
@KafkaListener(id = "1", topics = "my_topic)
public void listenGetDataTopic(@Payload MyPojo message) {
log.info(message);
}
I've tried a lot of different configs and to debug it deeply, and, of course I see the difference between handling messages when using @KafkaListener
and manually created listeners, but I didn't figure out how to apply a message conversion to a manually created listeners. Is there a possibility to achieve this?
Solution 1:
The message converter is not a property of the container, it is a property of the listener adapter used to invoke the pojo method for the @KafkaListener
.
When using a container directly, your listener must implement MessageListener
or one of its sub-interfaces.
You can either invoke the converter yourself in your listener (e.g. create a lightweight adapter) or you need to use another technique for dynamically creating @KafkaListener
s.
See
Kafka Spring: How to create Listeners dynamically or in a loop?
Kafka Consumer in spring can I re-assign partitions programmatically?
Can i add topics to my @kafkalistener at runtime
for some examples of those techniques.