Kafka Stream与KTable一对多关系Join

2023-12-23

我有一个卡夫卡流 - 比如说博客和一个卡夫卡表 - 比如说与这些博客相关的评论。来自 kafka 流的键可以映射到 Kafka 表中的多个值,即一个博客可以有多个评论。我想将这两个连接起来并创建一个带有评论 id 数组的新对象。但是当我进行连接时,流只包含最后一个评论 ID。是否有任何文档或示例代码可以为我指明如何实现这一目标的正确方向?基本上,是否有任何文档详细说明如何使用 Kafka 流和 Kafka 表进行一对多关系连接?

KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl,
              (blogId, blog) -> blog.getBlogId(),
              (blog, comment) -> new EnrichedBlog(blog, comment));

因此,我需要一个评论 ID 数组,而不是评论。


我无法找到签名与代码示例中的签名匹配的 join 方法,但我认为问题在于:

KTables 被解释为 changlog,也就是说,具有相同键的每条下一条消息都被解释为对记录的更新,而不是新记录。这就是为什么您只能看到给定键(博客 ID)的最后一条“评论”消息,之前的值将被覆盖。 为了克服这个问题,您首先需要更改填充 KTable 的方式。您可以做的是将评论主题作为 KStream 添加到拓扑中,然后执行聚合,简单地构建共享相同博客 ID 的数组或评论列表。该聚合返回一个 KTable,您可以使用它加入您的博客 KStream。

下面是如何构建列表值 KTable 的示意图:

builder.stream("yourCommentTopic") // where key is blog id
.groupByKey()
.aggregate(() -> new ArrayList(), 
    (key, value, agg) -> new KeyValue<>(key, agg.add(value)),
    yourListSerde);

列表在聚合中比数组更容易使用,因此我建议您在需要时将其转换为下游数组。您还需要为您的列表提供一个 serde 实现,即上面示例中的“yourListSerde”。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Stream与KTable一对多关系Join 的相关文章

随机推荐