How does an offset expire for an Apache Kafka consumer group?
I was making some tests on an old topic when I noticed some strange behaviours. Reading Kafka's log I noticed this "removed 8 expired offsets" message:
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log)
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log)
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log)
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log)
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log)
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log)
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator)
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
In fact, I have 2 questions:
How does this offset expiration work for a consumer group?
Can this expired offset explain this behaviour where my consumer would not poll anything when it had
auto.offset.reset = latest
, but it polled from the last committed offset when it hadauto.offset.reset = earliest
?
Update
Since Apache Kafka 2.1, offsets won't be deleted as long as the consumer group is active, independent if the consumers commit offsets or not, ie, the offset.retention.minutes
clocks only starts to tick when the group becomes empty (in older released, the clock started to tick directly when the commit happened).
Cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets
Original Answer
Kafka, by default deletes committed offsets after a configurable period of time. See parameter offsets.retention.minutes
. Ie, if a consumer group is inactive (ie, does not commit any offsets) for this amount of time, the offsets get deleted. Thus, even if the consumer is running, if it does not commit offsets for some partitions, those offsets are subject to offset.retention.minutes
.
If you start a consumer, the following happens:
- look for a (valid) committed offset (for the consumer group)
- if valid offset is found, resume from there
- if no valid offset is found, reset offset according to
auto.offset.reset
parameter
Thus, if your offsets got deleted and auto.offset.reset = latest
, you consumer will not poll anything until new data is added to the topic. If auto.offset.reset = earliest
it should consume the whole topic.
See this JIRA for a discussion about this https://issues.apache.org/jira/browse/KAFKA-3806 and https://issues.apache.org/jira/browse/KAFKA-4682
Check my answer here. You should not forget about file rolling. It impacts offset files removal.