KTable 应该发出的事件

2024-04-04

我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试是使用成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),所以我not使用TopologyTestDriver.

我的拓扑有键值类型的输入String -> Customer和输出String -> CustomerMapped。 Serdes、模式以及与模式注册表的集成都按预期工作。

我正在使用 Scala、Kafka 2.2.0、Confluence Platform 5.2.1 和kafka-streams-scala。我的拓扑尽可能简化,如下所示:

val otherBuilder = new StreamsBuilder()

otherBuilder
     .table[String,Customer](source)
     .mapValues(c => CustomerMapped(c.surname, c.age))
     .toStream.to(target)   

(所有隐式串行解串器,Produced, Consumed等是默认的并且可以正确找到)

我的测试包括发送一些记录(data)到source同步且不间断地进入主题,并从target主题,我将结果与expected:

val data: Seq[(String, Customer)] = Vector(
   "key1" -> Customer(0, "Obsolete", "To be overridden", 0),
   "key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
   "key1" -> Customer(1, "Billy", "The Man", 32),
   "key2" -> Customer(2, "Tommy", "The Guy", 31),
   "key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
   "key1" -> CustomerMapped("The Man", 32),
   "key2" -> CustomerMapped("The Guy", 31),
   "key3" -> CustomerMapped("The Lady", 40)
)

我构建了 Kafka Stream 应用程序,在其他设置之间进行设置,以下两个:

p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)

因此,我希望 KTable 使用缓存,提交之间的间隔为 5 秒,缓存大小为 50MB(对于我的场景来说绰绰有余)。

我的问题是我从target主题总是包含多个条目key1。我本希望不会发出任何事件来记录Obsolete和“已过时1”。实际输出是:

Vector(
    "key1" -> CustomerMapped("To be overridden", 0),
    "key1" -> CustomerMapped("To be overridden2", 0),
    "key1" -> CustomerMapped("The Man", 32),
    "key2" -> CustomerMapped("The Guy", 31),
    "key3" -> CustomerMapped("The Lady", 40)
)

最后要提的是:这个测试曾经按预期工作,直到我将 Kafka 从 2.1.0 更新到 2.2.0。我再次验证了我的应用程序降级。

我很困惑,谁能指出 2.2.x 版本中 KTables 的行为是否发生了变化?或者也许现在我必须设置新的设置来控制事件的发出?


在 Kafka 2.2 中,引入了优化来减少 Kafka Streams 的资源占用。 AKTable如果计算不需要,则不一定会具体化。这适用于你的情况,因为mapValues()可以即时计算。因为KTable没有具体化,没有缓存,因此每个输入记录都会生成一个输出记录。

比较:https://issues.apache.org/jira/browse/KAFKA-6036 https://issues.apache.org/jira/browse/KAFKA-6036

如果你想强制执行KTable物化,可以传入Materilized.as("someStoreName") into StreamsBuilder#table() method.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

KTable 应该发出的事件 的相关文章

随机推荐

  • Knockout无法识别手动点击

    这是样本http jsfiddle net HhXGH 57 http jsfiddle net HhXGH 57 我正在通过 jquery 单击单选按钮 但 knockout js 无法识别它 它仍然显示第一个单击的值 p Send me
  • PDO 存储过程返回值

    我正在使用返回错误代码的 SQL Server 存储过程 这是 SP 的一个非常简单的片段 DECLARE ret int BEGIN SET ret 1 RETURN ret END 我可以使用 mssql 扩展获取返回值 mssql b
  • 用于将十进制转换为任意基数(从 2 到 36)的 C 代码

    我最近刚刚开始学习 C 我写了一个非常短的程序 可以在十进制和二进制之间进行转换 我想尝试编写一个在十进制和任何基数 最多 36 位 之间转换的代码 然而 我的代码只是打印出垃圾 include
  • 如何使用聚合物提交表格?

    我有这个聚合物接触元件
  • Android Html.fromHtml 耗时太长

    如果我该怎么办tv setText Html fromHtml text 时间太长 UI 挂起 如果我可以用线程来做 你能提供一个例子吗 private Handler mHandler new Handler void handleMes
  • 使用 System.IO.File 辅助方法的“顺序”文件 I/O 安全吗?

    我刚刚看到这个问题 在 C 中的 File 类上使用静态方法安全吗 https stackoverflow com q 32413634 1207195 总结一下OP有一个IOException因为该 ASP NET 代码片段中正在使用文件
  • Freetts 无法使用外部扬声器

    我在我的 java 应用程序中使用了 Freetts jar 文件来公布令牌编号 我的应用程序在我的笔记本电脑中运行良好 但在具有外部扬声器的台式机中无法运行 我得到一个空指针异常 注意 我的两台计算机都使用 Windows 7 下面的代码
  • PHP DOMDocument saveHTML 未正确编码西里尔字母

    I use DOMDocument操作 html 和 php 7 问题是文本在页面上显示良好 西里尔文 但是当我转到 查看 HTML 页面源代码 时 情况不太好 它显示如下 1047 1076 1077 1089 1100 1086 108
  • jQuery 提交 ajax 表单,带有 2 个提交按钮

    我试图实现以下目标 在 php 中我有一个这样的表单
  • 我可以将自定义对象发送到 Android Wear 吗?

    我刚刚学习如何为 Android Wear 进行开发 我已经为智能手表创建了一个全屏活动 在应用程序的移动部分中 我获取了一些 JSON 数据并从中创建了自定义对象列表 在我的移动应用程序上 我在 ListView 中显示这些对象的信息 在
  • 在 Rust 中将原始指针转换为 16 位 Unicode 字符到文件路径

    我正在用 Rust 编写的 DLL 替换用 C 编写的 DLL 目前DLL中的函数调用如下 BOOL calledFunction wchar t pFileName 我相信在这样的背景下wchar t是一个 16 位 Unicode 字符
  • 是否可以在Azure中运行npx工具来执行包

    是否可以运行npx工具来执行Azure App服务 Web应用程序 中的包 我在用 节点 v10 19 0 npm v6 13 4 如果我在本地计算机上浏览到 nodejs 我可以看到 npx cmd 工具 但如果我在 azure 中执行相
  • Textmate for Ruby 自动完成?

    我真的很习惯 Netbeans 的自动完成功能 在 Netbeans 中 当我键入 字符串 然后点击 点 时 它将打印出 String 类的方法列表 TextMate好像没有这个功能 您可以添加什么吗 会节省很多时间 而不是一直使用 ri
  • 在列表视图上方添加图像

    我想在列表视图上方添加图像视图 我知道在列表视图中添加部分听者 但我只是想节省时间 所以我使用图像视图作为列表视图标题 而不是使用 addSectionHeader 不幸的是我只是坚持使用一些 xml 属性 图像叠加在我的列表视图中 实际上
  • 我正在尝试获取 Apple Music API 的用户令牌,我已经从 python 终端命令生成了有效的开发人员令牌

    我正在我的 IOS 应用程序中第一次使用 Apple Music api 我已经在 python 命令的帮助下生成了开发者令牌 每次请求用户令牌时 我都会收到错误 请求用户令牌时发生错误 操作无法完成 SKErrorDomain 错误 7
  • 检查 arraytype 列是否包含 null

    我有一个数据框 其中有一列可以包含整数值的数组类型 如果没有值 它将只包含一个 并且它将是空值 重要的 注意该列不会为空 而是一个具有单个值的数组 无效的 gt val df DataFrame Seq foo Seq Some 2 Som
  • 在 TFS API 中将测试用例添加到 ITestSuiteBase

    我正在使用 TFS API 并遇到了 ITestSuiteBase 和 IRequirementTestSuite 的问题 我设法在 IStaticTestSuite 中轻松创建一个新的测试用例 IStaticTestSuite worki
  • 自动同步 Visual Studio 的类视图

    是否可以让 Visual Studio 的 类视图 窗格 在 视图 gt 类视图 下可用 自动同步到当前符号 基本上只要当前符号发生变化就会执行 View SynchronizeClassView See here https stacko
  • 其元数据的二进制版本是1.7.1,预期版本是1.5.1

    C Users khare gradle caches modules 2 files 2 1 org jetbrains kotlin kotlin stdlib common 1 7 0 51736992f422993a1e741051
  • KTable 应该发出的事件

    我正在尝试测试一个拓扑 该拓扑作为最后一个节点 具有 KTable 我的测试是使用成熟的 Kafka 集群 通过 confluence 的 Docker 镜像 所以我not使用TopologyTestDriver 我的拓扑有键值类型的输入S