我在我的框架中使用 Kafka 生产者-消费者模型。消费者端消费的记录随后被索引到elasticsearch上。这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者,直到 ES 启动,一旦启动,我需要恢复消费者并使用我上次离开的位置的记录。
我认为@KafkaListener 无法实现这一点。谁能给我一个解决方案吗?我发现我需要为此编写自己的 KafkaListenerContainer,但我无法正确实现它。任何帮助将非常感激。
有几种方法可以实现这一目标。
方法#1
创建您的KafkaConsumer
线程内的对象并运行无限while
循环消耗事件。
一旦完成此设置,您就可以中断线程并在while
循环,检查是否Thread.interrupt()
is true
。如果是,则跳出循环并关闭消费者。
完成恢复活动后,使用相同的组 ID 重新创建使用者。请注意,这可能会重新平衡消费者。
如果你使用 python,同样的事情可以使用线程来实现stop_event
.
方法#2
使用 KafkaConumer APIpause(partitions_list)
功能。它接受 Kafka 分区作为输入。因此,提取分配给消费者的所有部分并将这些部分传递给pause(partitions_list)
功能。消费者将停止从这些分区中提取数据。
经过一定时间后,您可以使用resume(partitions_list)
函数来恢复消费者。此方法不会重新平衡消费者。
注意:如果您使用的是 Spring Kafka 客户端。这变得容易多了。您可以启动/停止消息侦听器容器。
你可以找到详细的解释here https://bikas-katwal.medium.com/start-stop-kafka-consumers-or-subscribe-to-new-topic-programmatically-using-spring-kafka-2d4fb77c9117.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)