- 首先,检查并确认所有分区都正在接收来自生产者的消息
- 确认后,检查消费者启动时的分区分配策略,并确认消费者客户端已分配主题的所有分区。如果无法使用消费者客户端本身,
kafka-consumer-groups.sh
可以使用实用程序
- 确保没有其他消费者客户端具有相同的
group_id
运行并使用来自主题的消息
使用 kafka-consumer-groups.sh 列出所有消费者组
kafka-consumer-groups.sh --list --bootstrap-server <kafka-broker>:<port>
上面的命令将列出集群管理的所有主题的所有消费者组。
缩小到您感兴趣的消费者组后,使用以下命令列出消费者组的消费者部分。
kafka-consumer-groups.sh --bootstrap-server <kafka-broker>:<port> --describe --group <your-consumer-group>
kafka-consumer-groups.sh 脚本返回有关主题分区的所有信息,包括
- 当前偏移量 - 已消费消息的当前最大偏移量
此消费者实例的分区
- 日志结束偏移量 - 的偏移量
分区中的最新消息
- lag - 消耗的偏移量和最新偏移量之间的差异
- 客户端 ID - 用于区分同一消费者中各个客户端的 ID
团体
示例1:当前偏移量为 4,滞后为 2,客户端 ID 为空 - 意味着消费者客户端不再注册并处理任何消息。
另请注意消息说消费者组中没有活跃成员。
kafka-consumer-groups.sh --bootstrap-server <broker-host>:9092 --describe --group my-group
Consumer group 'my-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group my-topic 0 4 6 2 - - -
示例2:当前偏移量为 4,滞后为 2 并且客户端 ID 不为空 - 表示消费者客户端当前未处理消息。但是,它之前处理了一些消息。
kafka-consumer-groups.sh --bootstrap-server <broker-host>:9092 --describe --group my-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group my-topic 0 4 6 2 test-client-31ca0f5d-958e-4a30-b028-e3d975b41fd6 /0.0.0.0 test-client
示例3:LOG-END-OFFSET 为 0。因此,到目前为止,还没有针对该主题生成任何消息。
kafka-consumer-groups.sh --bootstrap-server <broker-host>:9092 --describe --group my-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group test1 0 0 0 0 test-client-08763d84-c4be-49f8-a75f-286c1968f6f9 /0.0.0.0 test-client
示例4:CURRENT-OFFSET 为 0。但 LOG-END-OFFSET 为 2,LAG 也为 2 - 表示该主题分区中有消息。然而,它并没有被消耗。
kafka-consumer-groups.sh --bootstrap-server <broker-host>:9092 --describe --group my-group_new_4
Consumer group 'my-group_new_4' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group_new_4 test1 0 0 2 2 - - -
这将帮助您找出本答案开头提出的问题,并隔离/解决问题。