Kafka Streams - 跳跃窗口 - 去重键

2024-05-15

我正在 4 小时窗口上进行跳跃窗口聚合,每 5 分钟前进一次。由于跳跃窗口重叠,我得到了具有不同聚合值的重复键。

TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)

如何消除具有重复数据的重复键或仅选择包含最新值的键。


2021 年 5 月更新:Kafka Streams API 支持“最终”窗口结果 https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results如今,通过suppress()操作员。请参阅之前的文档链接以及博客Kafka Streams 对水印和触发器的处理 https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/从 2019 年 3 月起了解详情。

定义窗口计算后,您可以抑制中间结果,在窗口关闭时发出每个用户的最终计数。

KGroupedStream<UserId, Event> grouped = ...;

grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
       .count()
       .suppress(Suppressed.untilWindowCloses(unbounded()))
       .filter((windowedUserId, count) -> count < 3)
       .toStream()
       .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

原始答案(在不使用时仍然适用suppress()上面的运算符):

如果我理解正确,那么这是预期的行为。您没有看到“重复”密钥,但您看到同一密钥的连续更新。

Think:

# Extreme case: record caches disabled (size set to 0)
alice->1, alice->2, alice->3, alice->4, ..., alice->100, ...

# With record cache enabled, you would see sth like this.
alice->23, alice->59, alice->100, ...

看一下解释http://docs.confluence.io/current/streams/developer-guide.html#streams-developer-guide-memory-management http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management,它更详细地描述了这一点。如果您希望减少每个记录键的“重复项”,您可以通过以下方式增加记录缓存的大小(当使用 DSL 时)cache.max.bytes.buffering aka StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG在您的应用程序的配置中。还有一个相互作用commit.interval.ms.

如果您想知道“为什么 Kafka Streams API 首先会以这种方式运行”,我会推荐这篇博客文章https://www.confluence.io/blog/watermarks-tables-event-time-dataflow-model/ https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/本周早些时候发布的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Streams - 跳跃窗口 - 去重键 的相关文章

随机推荐