我需要读取 Kafka 主题中给定时间范围内的消息。我能想到的解决方案是首先找出时间范围开始的最大偏移量,然后继续消费消息,直到所有分区上的偏移量超过时间范围的末尾。有没有更好的方法来解决这个问题?谢谢!
好吧,您肯定必须首先搜索适合时间范围开头的第一个偏移量。
这可以使用以下方法完成KafkaConsumer#offsetsForTimes https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map- method.
该方法接受一个映射Map<TopicPartition, Long(timestamp)>
,并返回一个Map<TopicPartition, OffsetAndTimestamp>
时间戳在哪里OffsetAndTimestamp
是带有时间戳的第一条消息等于或大于然后是指定的那个。
从那里,您可以将消费者分配给返回的偏移量,并进行迭代,直到记录中的时间戳超过时间范围的末尾。
一些伪代码:
static void main(String[] args) {
String topic = args[1];
long timestampBeginning = Long.parseLong(args[2]);
long timestampEnd = Long.parseLong(args[3]);
TopicPartition partition = new TopicPartition(topic, 0);
Consumer<Object, Object> consumer = createConsumer();
long beginningOffset = consumer.offsetsForTimes(
Collections.singletonMap(partition, timestampBeginning))
.get(partition).offset();
consumer.assign(Collections.singleton(partition)); // must assign before seeking
consumer.seek(partition, beginningOffset);
for (ConsumerRecord<Object, Object> record : consumer.poll()) {
if (record.timestamp() > timestampEnd) {
break; // or whatever
}
// handle record
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)