假设我有一个无限期运行的计时器任务,它会迭代 kafka 集群中的所有消费者组,并输出每个组的所有分区的延迟、提交偏移量和结束偏移量。与 Kafka 控制台消费者组脚本的工作方式类似,只不过它适用于所有组。
就像是
单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)
Consumer consumer;
static {
consumer = createConsumer();
}
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
}
}
多个消费者 - 工作
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
Consumer consumer = createConsumer();
consumer.endOffsets(topicParitions); This works!!!
}
}
版本:Kafka-Client 2.0.0
我是否错误地使用了消费者 api?理想情况下,我想使用单一消费者。
如果您需要更多详细信息,请告诉我。
我想你已经快到了。首先收集all您感兴趣的主题分区,以及then发出一个consumer.endOffsets
命令。
请记住,我还没有尝试运行它,但类似这样的东西应该可以工作:
run() {
Consumer consumer = createConsumer();
List<String> groupIds = getConsumerGroups();
List<TopicPartition> topicPartitions = new ArrayList<>();
for (String groupId: groupIds) {
topicPartitions.addAll(getTopicPartitions(groupId));
}
consumer.endOffsets(topicPartitions);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)