我需要每天每小时获取 Kafka 生成的消息。每隔一小时我就会启动一个作业来消费 1 小时前生成的消息。例如,如果当前时间是 20:12,我将在 19:00:00 到 19:59:59 之间消费该消息。这意味着我需要在时间 19:00:00 之前获取开始偏移,并在时间 19:59:59 之前获取结束偏移。我使用了SimpleConsumer.getOffsetsBefore,如「0.8.0 简单消费者示例 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example」。问题是返回的偏移量与作为参数给出的时间戳不匹配。例如当 make timestamp 19:00:00 时,我收到在时间 16:38:00 生成的消息。
下面的kafka消费者api方法getOffsetsByTimes()
可以用于此,它可以从 0.10.0 版本或更高版本开始使用。看JavaDoc https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html.
/**
* Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
*
* This is a blocking call. The consumer does not have to be assigned the partitions.
* If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
* will be returned for that partition.
*
* Notice that this method may block indefinitely if the partition does not exist.
*
* @param timestampsToSearch the mapping from partition to the timestamp to look up.
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws IllegalArgumentException if the target timestamp is negative.
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
// we explicitly exclude the earliest and latest offset here so the timestamp in the returned
// OffsetAndTimestamp is always positive.
if (entry.getValue() < 0)
throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be negative.");
}
return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)