How to send final kafka-streams aggregation result of a time windowed KTable?

In Kafka Streams there is no such thing as a "final aggregation". Windows are kept open all the time to handle out-of-order records that arrive after the window end-time passed. However, windows are not kept forever. They get discarded once their retention time expires. There is no special action as to when a window gets discarded.

See Confluent documentation for more details: http://docs.confluent.io/current/streams/

Thus, for each update to an aggregation, a result record is produced (because Kafka Streams also update the aggregation result on out-of-order records). Your "final result" would be the latest result record (before a window gets discarded). Depending on your use case, manual de-duplication would be a way to resolve the issue (using lower lever API, transform() or process())

This blog post might help, too: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

Another blog post addressing this issue without using punctuations: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

Update

With KIP-328, a KTable#suppress() operator is added, that will allow to suppress consecutive updates in a strict manner and to emit a single result record per window; the tradeoff is an increase latency.


From Kafka Streams version 2.1, you can achieve this using suppress.

There is an example from the mentioned apache Kafka Streams documentation that sends an alert when a user has less than three events in an hour:

KGroupedStream<UserId, Event> grouped = ...;
grouped
  .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
  .count()
  .suppress(Suppressed.untilWindowCloses(unbounded()))
  .filter((windowedUserId, count) -> count < 3)
  .toStream()
  .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

As mentioned in the update of this answer, you should be aware of the tradeoff. Moreover, note that suppress() is based on event-time.