我有一个拓扑(见下文),可以读取一个非常大的主题(每天超过十亿条消息)。这个 Kafka Streams 应用程序的内存使用量相当高,我正在寻找一些关于如何减少状态存储占用空间的建议(更多详细信息如下)。Note:我并不是想逃避国有商店,我只是认为可能有一种方法可以改善我的拓扑 - 见下文。
// stream receives 1 billion+ messages per day
stream
.flatMap((key, msg) -> rekeyMessages(msg))
.groupBy((key, value) -> key)
.reduce(new MyReducer(), MY_REDUCED_STORE)
.toStream()
.to(OUTPUT_TOPIC);
// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);
// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)
// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)
// etc
更具体地说,我想知道是否流式传输OUTPUT_TOPIC
因为 KTable 导致状态存储(REKEYED_STORE
)比本地需要的要大。对于具有大量唯一键的变更日志主题,将它们作为流式传输会更好吗?KStream
并进行窗口聚合?或者这不会像我想象的那样减少占用空间(例如,只有记录的子集 - 窗口中的记录,会存在于本地状态存储中)。
不管怎样,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效。这是我的问题:
- 对于具有这种吞吐量级别的 Kafka Streams 应用程序,是否应该考虑任何配置选项、一般策略等?
- 是否有关于单个实例的内存密集程度的指导原则?即使您有一个有点武断的指导方针,与他人分享也可能会有所帮助。我的一个实例当前使用 15GB 内存 - 我不知道这是否好/坏/无关紧要。
任何帮助将不胜感激!
以你目前的模式
stream.....reduce().toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)
您会得到两家内容相同的商店。一个为reduce()
运算符和一个用于读取table()
-- 不过,这可以减少到一个商店:
KTable rekeyedTable = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely
这应该会显着减少你的内存使用量。
关于窗口化与非窗口化:
-
这是你所需的语义问题;如此简单地从非窗口化到窗口化缩减似乎是有问题的。
-
即使您也可以使用窗口语义,也不一定会减少内存。请注意,在聚合情况下,Streams 不存储原始记录,而仅存储当前聚合结果(即 key + currentAgg)。因此,对于单个密钥,两种情况的存储要求是相同的(单个窗口具有相同的存储要求)。同时,如果您使用 Windows,当您获得聚合专业密钥专业窗口时,您实际上可能需要更多内存(而在非窗口情况下您仅获得单个聚合专业密钥)。唯一可以节省内存的情况是“密钥空间”分布在很长一段时间内的情况。例如,您可能很长时间无法获取某些按键的输入记录。在非窗口情况下,这些记录的聚合将始终被存储,而对于窗口情况,键/聚合记录将被删除,并且如果稍后出现具有此键的记录,则将重新创建新条目再次打开(但请记住,在这种情况下您丢失了之前的聚合门 - 参见 (1))
最后但并非最不重要的一点是,您可能需要查看调整应用程序大小的指南:http://docs.confluence.io/current/streams/sizing.html http://docs.confluent.io/current/streams/sizing.html
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)