我的 Kafka 消费者的代码如下所示
def read_messages_from_kafka():
topic = 'my-topic'
consumer = KafkaConsumer(
bootstrap_servers=['my-host1', 'my-host2'],
client_id='my-client',
group_id='my-group',
auto_offset_reset='earliest',
enable_auto_commit=False,
api_version=(0, 8, 2)
)
consumer.assign([TopicPartition(topic, 0), TopicPartition(topic, 1)])
messages = consumer.poll(timeout_ms=kafka_config.poll_timeout_ms, max_records=kafka_config.poll_max_records)
for partition in messages.values():
for message in partition:
log.info("read {}".format(message))
if messages:
consumer.commit()
next_offset0, next_offset1 = consumer.position(TopicPartition(topic, 0)), consumer.position(TopicPartition(topic, 1))
log.info("next offset0={} and offset1={}".format(next_offset0, next_offset1))
while True:
read_messages_from_kafka()
sleep(kafka_config.poll_sleep_ms / 1000.0)
我意识到消费者的这种设置无法读取所有消息。我无法重现这个问题,因为它是间歇性的问题。
当我使用以下命令比较最后 100 条消息时kafka-cat
对于这个消费者,我发现我的消费者间歇性地随机错过一些消息。我的消费者出了什么问题?
kafkacat -C -b my-host1 -X broker.version.fallback=0.8.2.1 -t my-topic -o -100
仅有python 中消费消息的方式太多。应该有一种,最好只有一种明显的方法来做到这一点。