业务的需要,需要得到最新的一条消息从kafka中,但是发现ConsumerRecords 这个对象并没有 get(index) 这种方式的获取并且只能 iterator 或者增强for 循环这种方式来循环 记录。
但是有一个count() 可以得到这次拉取的条数因此可以这么写
package com.test.kafka;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.JaasUtils;
import kafka.admin.AdminUtils;
import kafka.api.OffsetRequest;
import kafka.utils.ZkUtils;
import scala.collection.JavaConversions;
public class TestOffset {
private static ZkUtils zkUtils = null;
@SuppressWarnings("resource")
public static void main(String[] args) {
// final String topicName = "ROOM-FFFFFF-999";
final String topicName = "test111";
Properties props = new Properties();
// 构建client name及groupId
// String topic = topicPartitionInfo.topic;
// int partitionID = topicPartitionInfo.partitionID;
// String clientName = this.createClientName(topic, partitionID);
// String groupId = clientName;
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "front_ROOM-FFFFFF-999_0");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
// 要发送自定义对象,需要指定对象的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
long earliestTime = OffsetRequest.EarliestTime();
long lastTime = OffsetRequest.LatestTime();
long defaultClientId = OffsetRequest.LatestTime();
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(0));
consumer.subscribe(Arrays.asList(topicName)); // consumer 订阅主题
// assign方法由用户 订阅哪些具体分区 按分区来订阅
// consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
// 指定从第一条开始读 此topic
// consumer.seekToBeginning(Arrays.asList(new
// TopicPartition(topicName,0)));
// 存在的groupid获取指定的topic任意的offset
System.out.println("话题 topic " + topicName + " eraliessTime " + defaultClientId + " lastTime :"
+ earliestTime + " " + lastTime);
// consumer.seek(new TopicPartition(topicName,0),lastTime);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
int i = 1;
for (ConsumerRecord<String, String> record : records) {
if (records.count() > 0) {
if (records.count() == 1) {
// 提交偏移量 这个是如果自动提交偏移量改为false的时候 手动提交需要
System.out.println(
"==1==" + records.count() + "----->" + record.value() + " " + record.offset());
} else {
if (i == records.count()) {
System.out.println(
"==>2==" + records.count() + "----->" + record.value() + " " + record.offset());
}
i++;
}
}
}
}
}
}
这个是订阅的的简单的实现消费者这种,用了subscribe() 这个方法,就是这种就是订阅topic 消息会实时拉取显示,如果消费者停止,再启动,那么这段时间未读取的消息会一股脑拉取到,如果你这个groupID 新加入的那么之前的消息跟你就没关系了。 我这个就是 topic 和groupID 都是原来的旧的,有时候就会一下显示太多条,然后我就用 int i=1 来控制这个。