有一个拓扑:
kStreamBuilder.stream(kafkaProperties.getInboundTopicName(), consumed)
.filterNot((k,v) -> Objects.isNull(v))
.transform(() -> new CustomTransformer(...))
.transform(() -> new AnotherTransformer(...))
.to(kafkaProperties.getOutTopicName(), resultProduced);
已配置
num.stream.threads: 50
在启动应用程序时,卡住了不断记录的消息(我不能 100% 确定它卡住了,但 20 分钟后状态和 CPU 没有变化,网络使用率非常高):
State transition from RUNNING to PARTITIONS_REVOKED
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-1-consumer, groupId=group_id] (Re-)joining group
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-2-consumer, groupId=group_id] (Re-)joining group
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-3-consumer, groupId=group_id] (Re-)joining group
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-4-consumer, groupId=group_id] (Re-)joining group
AbstractCoordinator : [Consumer clientId=consumer_id-StreamThread-5-consumer, groupId=group_id] (Re-)joining group
etc.
主题有 100 个分区。
我们注意到:每个变压器都使用它自己的 persistenceStateStore。将其替换为 inMemoryStateStore 后,上面仍然有日志写入,但大约 3 分钟后拓扑成功启动。
卡夫卡流版本 2.1.0。
经纪商版本1.1.0
None
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)