Kafka Connect - Failed to flush, timed out while waiting for producer to flush outstanding messages
I am trying to use the Kafka Connect JDBC Source Connector with following properties in BULK mode.
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
I get the following error about committing offsets, changing various parameters seems to have little effect.
[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
Solution 1:
The error indicates that there are a lot of messages buffered and cannot be flushed before the timeout is reached.
To address this issue you can
- either increase
offset.flush.timeout.ms
configuration parameter in your Kafka Connect Worker Configs - or you can reduce the amount of data being buffered by decreasing
producer.buffer.memory
in your Kafka Connect Worker Configs. This turns to be the best option when you have fairly large messages.
Solution 2:
When security.protocol=SSL
is enabled make sure that there are separate SSL parameters for Connect workers and Connect producers.
Provide SSL settings for both
# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234
# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234
see https://docs.confluent.io/5.2.3/connect/security.html#separate-principals
Solution 3:
If you're trying to connect with confluent cloud this error is probably because a missing configuration in the worker properties, make sure You added the producer and consumer configuration.
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
consumer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
producer.security.protocol=SASL_SSL