我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试是使用成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),所以我not使用TopologyTestDriver
.
我的拓扑有键值类型的输入String -> Customer
和输出String -> CustomerMapped
。 Serdes、模式以及与模式注册表的集成都按预期工作。
我正在使用 Scala、Kafka 2.2.0、Confluence Platform 5.2.1 和kafka-streams-scala
。我的拓扑尽可能简化,如下所示:
val otherBuilder = new StreamsBuilder()
otherBuilder
.table[String,Customer](source)
.mapValues(c => CustomerMapped(c.surname, c.age))
.toStream.to(target)
(所有隐式串行解串器,Produced
, Consumed
等是默认的并且可以正确找到)
我的测试包括发送一些记录(data
)到source
同步且不间断地进入主题,并从target
主题,我将结果与expected
:
val data: Seq[(String, Customer)] = Vector(
"key1" -> Customer(0, "Obsolete", "To be overridden", 0),
"key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
"key1" -> Customer(1, "Billy", "The Man", 32),
"key2" -> Customer(2, "Tommy", "The Guy", 31),
"key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
我构建了 Kafka Stream 应用程序,在其他设置之间进行设置,以下两个:
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)
因此,我希望 KTable 使用缓存,提交之间的间隔为 5 秒,缓存大小为 50MB(对于我的场景来说绰绰有余)。
我的问题是我从target
主题总是包含多个条目key1
。我本希望不会发出任何事件来记录Obsolete
和“已过时1”。实际输出是:
Vector(
"key1" -> CustomerMapped("To be overridden", 0),
"key1" -> CustomerMapped("To be overridden2", 0),
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
最后要提的是:这个测试曾经按预期工作,直到我将 Kafka 从 2.1.0 更新到 2.2.0。我再次验证了我的应用程序降级。
我很困惑,谁能指出 2.2.x 版本中 KTables 的行为是否发生了变化?或者也许现在我必须设置新的设置来控制事件的发出?