Retrieve always latest messages from Kafka on reconnection
Solution 1:
Finally I solved by managing myself the callback on rebalance. This callback will be always executed when a new consumer join or leave the group.
The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS.
So within the rebalance callback I iterate over the TopicPartition
s in order to assign them to the consumer, using the latest offsets. The snippet of code is this:
class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
for (auto partition = partitions.begin(); partition != partitions.end(); partition++) {
(*partition)->set_offset(RdKafka::Topic::OFFSET_END);
consumer->assign(partitions);
}
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
}
}
};
In order to use that callback I will set it to the consumer.
SeekEndRebalanceCb ex_rb_cb;
if (consumer->set("rebalance_cb", &ex_rb_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
return false;
}