当消费者组 A 的 Kafka 消费者连接到 Kafka 代理时,我想查找所有分区的末尾,即使偏移量存储在代理端。如果有更多额外的消费者连接同一消费者组,他们应该获取最新存储的偏移量。
我正在做以下事情:
consumer.poll(timeout)
consumer.seekToEnd(emptyList())
while(true) {
val records = consumer.poll(timeout)
if(records.isNotEmpty()) {
//print records
consumer.commitSync()
}
}
问题是,当我连接消费者组 A 的第一个消费者 c1 时,一切都按预期工作,如果我连接消费者组 A 的附加消费者 c2,该组正在重新平衡,并且 c1 将消耗跳过的偏移量。
有任何想法吗?
您可以创建一个实现的类ConsumerRebalanceListener
, 如下所示:
public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {
private Consumer<K, V> consumer;
public AlwaysSeekToEndListener(Consumer consumer) {
this.consumer = consumer;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToEnd(partitions);
}
}
然后在订阅主题时使用此监听器:
consumer.subscribe(Collections.singletonList("test"), new AlwaysSeekToEndListener<String, String>(consumer));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)