I'm trying to write an integration test for my Kafka consumer. I've followed through with the official reference documentation but when I start my test I only see this repeated ad infinitum:

-2019-04-03 15:47:34.002 WARN 13120 --- [ main] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=my-group] Connection to node -1 could not be established. Broker may not be available.

What am I doing wrong?

I'm using JUnit5, Spring Boot and spring-kafka and spring-kafka-test.

I have the @EnableKafka annotation on my @Configuration class.

This is how my test class looks like:

@ExtendWith(SpringExtension::class)
@SpringBootTest(classes = [TestKafkaConfig::class])
@DirtiesContext
@EmbeddedKafka(
        partitions = 1,
        topics = [KafkaIntegrationTest.MY_TOPIC])
class KafkaIntegrationTest {

    @Autowired
    private lateinit var embeddedKafka: EmbeddedKafkaBroker

    @Test
    fun test() {
        val senderProps = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString)
        val template = KafkaTemplate(DefaultKafkaProducerFactory<Int, String>(senderProps))
        template.defaultTopic = KafkaIntegrationTest.MY_TOPIC
        template.sendDefault("foo")
    }
}

my application.yml looks like this:

kafka:
  consumer:
    group-id: my-group
    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
    value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
      specific.avro.reader: true

I also tried setting up a MockSchemaRegistryClient but I get the exact same repeated message. (This is how I tried to set up the MockSchemaRegistryClient):

@TestConfiguration
@Import(TestConfig::class)
class TestKafkaConfig {

    @Autowired
    private lateinit var props: KafkaProperties

    @Bean
    fun schemaRegistryClient() = MockSchemaRegistryClient()

    @Bean
    fun kafkaAvroSerializer() = KafkaAvroSerializer(schemaRegistryClient())

    @Bean
    fun kafkaAvroDeserializer() = KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())

    @Bean
    fun producerFactory(): ProducerFactory<*, *> = DefaultKafkaProducerFactory(
            props.buildProducerProperties(),
            StringSerializer(),
            kafkaAvroSerializer())

    @Bean
    fun consumerFactory(): ConsumerFactory<*, *> = DefaultKafkaConsumerFactory(
            props.buildConsumerProperties(),
            StringDeserializer(),
            kafkaAvroDeserializer()
    )

    @Bean
    fun kafkaListenerContainerFactory() = ConcurrentKafkaListenerContainerFactory<Any, Any>().apply {
        setConsumerFactory(consumerFactory() as ConsumerFactory<in Any, in Any>?)
    }

}

What am I doing wrong? Note that I'm using the Confluent Schema Registry and try to deserialize from Avro.

What I'm trying to test is whether my consumer works or not which looks like this:

open class SomeConsumer(private val someUseCase) {

    @KafkaListener(topics = ["\${kafka.some-topic}"])
    open fun processMessage(record: ConsumerRecord<String, SomeObject>) {
        someUseCase.call(record)
    }
}

I believe you're missing setting the broker url for your tests.

There is a note about how to get this value in the documentation:

When the embedded Kafka and embedded Zookeeper server are started by the EmbeddedKafkaBroker, a system property named spring.embedded.kafka.brokers is set to the address of the Kafka brokers and a system property named spring.embedded.zookeeper.connect is set to the address of Zookeeper. Convenient constants (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS and EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT) are provided for this property.

(It's located at the bottom of the junit section here)

One way to fix this is to set kafka.consumers.bootstrap-servers to this value in your tests, e.g.

spring:
    kafka:
        consumer:
            bootstrap-servers: ${spring.embedded.kafka.brokers}