Retrieve always latest messages from Kafka on reconnection

Solution 1:

Finally I solved by managing myself the callback on rebalance. This callback will be always executed when a new consumer join or leave the group.

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS.

So within the rebalance callback I iterate over the TopicPartitions in order to assign them to the consumer, using the latest offsets. The snippet of code is this:

class SeekEndRebalanceCb : public RdKafka::RebalanceCb {
  public:
  void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
      for (auto partition = partitions.begin(); partition != partitions.end(); partition++) {
        (*partition)->set_offset(RdKafka::Topic::OFFSET_END);
        consumer->assign(partitions);
      }
    } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
      consumer->unassign();
    } else {
      std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
    }
  }
};

In order to use that callback I will set it to the consumer.

SeekEndRebalanceCb ex_rb_cb;
if (consumer->set("rebalance_cb", &ex_rb_cb, errstr) != RdKafka::Conf::CONF_OK) {
  std::cerr << errstr << std::endl;
  return false;
}