所有消息消费完后如何关闭kafka消费者?

2023-11-21

我有以下程序来消耗所有传入 Kafka 的消息。

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
consumer.close()

使用上面的程序,我能够使用所有传入 Kafka 的消息。但是一旦所有消息都被消耗掉,我想关闭卡夫卡消费者,但这并没有发生。我同样需要帮助。


如果我提供,我现在可以关闭卡夫卡消费者KafkaConsumer 对象的consumer_timeout_ms 参数。它接受以毫秒为单位的超时值。 下面是代码片段。

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'],
                         consumer_timeout_ms=1000)
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
consumer.close()

在上面的代码中,如果消费者在 1 秒内没有看到任何消息,它将关闭会话。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

所有消息消费完后如何关闭kafka消费者? 的相关文章

随机推荐