How do I write a big message into kafka with kafka producer API? [duplicate]
I'm trying to write a large message into kafka (around 15mb) and it doesn't get written, the program finishes as if everything is ok, but there's no message inside the topic.
Small messages do get written.
Here's the code:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
private final static String TOPIC = "rpdc_21596_in2";
private final static String BOOTSTRAP_SERVERS = "host:port";
private static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
props.put("test.whatever", "fdsfdsf");
return new KafkaProducer<>(props);
}
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(TOPIC,
0,
123L,
"fdsfdsdsdssss",
new String(Files.readAllBytes(Paths.get("/Users/user/Desktop/value1.json")))
);
KafkaProducer<String, String> producer = createProducer();
RecordMetadata recordMetadata = producer.send(record).get();
producer.flush();
producer.close();
System.out.println(recordMetadata);
}
}
The topic has been configured to accept big messages, I've been able to write into it with python. Here's the python code:
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['host:port'], max_request_size=20971520, request_timeout_ms=100000)
with open('/Users/user/Desktop/value1.json', 'rb') as f:
lines = f.read()
print(type(lines))
# produce keyed messages to enable hashed partitioning
future = producer.send('rpdc_21596_in2', key=b'foo', value=lines)
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=50)
except KafkaError:
# Decide what to do if produce request failed...
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
producer.flush()
But that java version doesn't work.
You need to configure your topic appropriately when creating it: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes
$ kafka-topics.sh --create --bootstrap-servers ... --config max.message.bytes=20971520
UPDATE:
maybe add some more properties, I've been pushing big base64 blobs with this:
// Only one in-flight messages per Kafka broker connection
// - max.in.flight.requests.per.connection (default 5)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
// Set the number of retries - retries
props.put(ProducerConfig.RETRIES_CONFIG, "3");
// Request timeout - request.timeout.ms
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");
// Only retry after one second.
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
// set max block to one minute by default
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
// set transaction timeout to one minute
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
// set delivery timeout to two minutes
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
//time to wait before sending messages out to Kafka, should not be too high
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
// maximum amount of data to be collected before sending the batch, you will always hit that
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
//those ones are not neccessary but useful for your usecase
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myClient");