Writing Custom Kafka Serializer
I am using my own class in a Kafka message which has a bunch of String data types.
I therefore cannot use the default serializer class or the StringSerializer
that comes with Kafka library.
I guess I need to write my own serializer and feed it to the producer properties?
EDIT
In newer Kafka Clients, implement Serializer
rather than Encoder
.
The things required for writing a custom serializer are:
- Implement
Encoder
with an object specified for the generic- Supplying a
VerifiableProperties
constructor is required
- Supplying a
- Override
toBytes(...)
method making sure a byte array is returned - Inject the serializer class into
ProducerConfig
Declaring a custom serializer for a producer
As you noted in your question, Kafka supplies a means to declare a specific serializer for a producer. The serializer class is set in a ProducerConfig
instance and that instance is used to construct the desired Producer
class.
If you follow Kafka's Producer Example you will construct ProducerConfig
via a Properties
object. When building your properties file be sure to include:
props.put("serializer.class", "path.to.your.CustomSerializer");
With the path to the class you want Kafka to use to serialize messages before appending them to the log.
Creating a custom serializer that Kafka understands
Writing a custom serializer that Kafka can properly interpret requires implementing the Encoder[T]
scala class that Kafka provides. Implementing traits in java is weird, but the following method worked for serializing JSON in my project:
public class JsonEncoder implements Encoder<Object> {
private static final Logger logger = Logger.getLogger(JsonEncoder.class);
// instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
private static final ObjectMapper objectMapper = new ObjectMapper();
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
Your question makes it sound like you are using one object (lets call it CustomMessage
) for all messages appended to your log. If that's the case, your serializer could look more like this:
package com.project.serializer;
public class CustomMessageEncoder implements Encoder<CustomMessage> {
public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(CustomMessage customMessage) {
return customMessage.toBytes();
}
}
Which would leave your property config to look like this:
props.put("serializer.class", "path.to.your.CustomSerializer");
You need to implement both encode and decoder
public class JsonEncoder implements Encoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
The decoder code
public class JsonDecoder implements Decoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonDecoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public Object fromBytes(byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(bytes, Map.class);
} catch (IOException e) {
LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
}
return null;
}
}
The pom entry
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.1.3</version>
</dependency>
Set the default encoder in the Kafka property
properties.put("serializer.class","kafka.serializer.DefaultEncoder");
The writer and reader code is as follows
byte[] bytes = encoder.toBytes(map);
KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);
JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());