我正在尝试从 __consumer_offsets 主题进行消费,因为这似乎可能是检索有关消费者的 kafka 指标(例如消息延迟等)的最简单方法。理想的方法是从 jmx 访问它,但想先尝试一下,返回的消息似乎被加密或以不可读的形式。也尝试添加 stringDeserializer 属性。有人对如何纠正这个问题有任何建议吗?再次提到这是重复的
重复的consumer_offset
没有帮助,因为它没有引用我的问题,即将消息作为 java 中的字符串读取。还更新了代码以尝试使用 kafka.client Consumer 进行 ConsumerRecord。
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(
consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");
while (it.hasNext()) {
try {
String mesg = new String(it.next().message());
System.out.println( mesg);
代码更改:
try {
// errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");
Properties consumerProps = new Properties();
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , "test");
consumerProps.put("bootstrap.servers", servers);
consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
//ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
// consumerConfig);
//Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//topicCountMap.put(topic, new Integer(1));
//Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps);
kconsumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = kconsumer.poll(10);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
kconsumer.close();
}
下面是该消息的快照;在图像的底部: