2021 年 5 月更新:Kafka Streams API 支持“最终”窗口结果 https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results如今,通过suppress()
操作员。请参阅之前的文档链接以及博客Kafka Streams 对水印和触发器的处理 https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/从 2019 年 3 月起了解详情。
定义窗口计算后,您可以抑制中间结果,在窗口关闭时发出每个用户的最终计数。
KGroupedStream<UserId, Event> grouped = ...;
grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 3)
.toStream()
.foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
原始答案(在不使用时仍然适用suppress()
上面的运算符):
如果我理解正确,那么这是预期的行为。您没有看到“重复”密钥,但您看到同一密钥的连续更新。
Think:
# Extreme case: record caches disabled (size set to 0)
alice->1, alice->2, alice->3, alice->4, ..., alice->100, ...
# With record cache enabled, you would see sth like this.
alice->23, alice->59, alice->100, ...
看一下解释http://docs.confluence.io/current/streams/developer-guide.html#streams-developer-guide-memory-management http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management,它更详细地描述了这一点。如果您希望减少每个记录键的“重复项”,您可以通过以下方式增加记录缓存的大小(当使用 DSL 时)cache.max.bytes.buffering
aka StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
在您的应用程序的配置中。还有一个相互作用commit.interval.ms
.
如果您想知道“为什么 Kafka Streams API 首先会以这种方式运行”,我会推荐这篇博客文章https://www.confluence.io/blog/watermarks-tables-event-time-dataflow-model/ https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/本周早些时候发布的。