消费者阅读 __consumer_offsets 传递不可读的消息

2023-12-12

我正在尝试从 __consumer_offsets 主题进行消费,因为这似乎可能是检索有关消费者的 kafka 指标(例如消息延迟等)的最简单方法。理想的方法是从 jmx 访问它,但想先尝试一下,返回的消息似乎被加密或以不可读的形式。也尝试添加 stringDeserializer 属性。有人对如何纠正这个问题有任何建议吗?再次提到这是重复的

重复的consumer_offset

没有帮助,因为它没有引用我的问题,即将消息作为 java 中的字符串读取。还更新了代码以尝试使用 kafka.client Consumer 进行 ConsumerRecord。

consumerProps.put("exclude.internal.topics",  false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);


consumerProps.put("key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");  
consumerProps.put("value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");

ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(
       consumerConfig);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
   consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

for (KafkaStream stream : streams) {

    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    //errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");

    while (it.hasNext()) {
         try {

             String mesg = new String(it.next().message());
             System.out.println( mesg);

代码更改:

try {       
    // errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");   
    Properties consumerProps = new Properties();
    consumerProps.put("exclude.internal.topics",  false);
    consumerProps.put("group.id" , "test");
    consumerProps.put("bootstrap.servers", servers);
    consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");  
    consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    //ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    //ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    //       consumerConfig);

    //Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    //topicCountMap.put(topic, new Integer(1));
    //Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    //List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps); 
    kconsumer.subscribe(Arrays.asList(topic)); 

    try {
        while (true) {
            ConsumerRecords<String, String> records = kconsumer.poll(10);

            for (ConsumerRecord<String, String> record : records)

                System.out.println(record.offset() + ": " + record.value());
        }
    } finally {
          kconsumer.close();
    }    

下面是该消息的快照;在图像的底部:

consumer offset


虽然可以直接读取__consumer_offsets主题,这不是推荐的或最简单的方法。

如果您可以使用 Kafka 2.0,最好是使用 AdminClient API 来描述组:

  • listConsumerGroupOffsets():查找特定组的所有偏移量
  • 描述消费者组():查找有关群组成员的详细信息

如果您绝对想直接阅读表格__consumer_offset,您需要对记录进行解码以使它们易于阅读。这可以使用以下方法完成GroupMetadataManager class:

  • GroupMetadataManager.readMessageKey()可用于解码消息密钥并检索该条目引用的主题分区。这可以返回2种类型的对象,对于消费者位置,你只感兴趣OffsetKey对象。

  • GroupMetadataManager.readOffsetMessageValue()可用于解码消息值(对于之前的键)OffsetKey)并找到偏移量信息。

This answer从您链接的问题中包含执行所有这些操作的骨架代码。

另请注意,您不应将记录反序列化为字符串,而应将它们保留为原始字节,以便这些方法能够正确解码它们。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

消费者阅读 __consumer_offsets 传递不可读的消息 的相关文章

  • 检查发送到网页的请求数

    我正在编写一个 Java 多线程应用程序 它可以访问不同 Web 服务器的数百万个 有时甚至数十亿个 URL 这个想法是检查这些 URL 是否给出有效的 200OK 响应或 404 其他代码 我如何知道我的程序是否不会在他们的服务器上造成高
  • RxJava + Retrofit 2 的正确使用方法

    我有这样的 JSON success true data id 29 name u0420 u0435 u0441 u0442 u043e u0440 u0430 u0446 u0456 u044f u0411 u0430 u0447 u0
  • Java中的文字赋值[重复]

    这个问题在这里已经有答案了 定义上有什么区别 double example 23 1d or double example 23 1 为什么long float double可以以l f d结尾 之间没有区别double example 2
  • 帮助我避免 JPA、Hibernate 和 MySQL 的连接超时

    我正在使用 JPA Hibernate 作为提供者 Glassfish 和 MySQL 开发中一切都运行良好 但是当我将应用程序部署到测试服务器并让它运行 大部分空闲 过夜时 我通常会在早上遇到这样的情况 2011 03 09T15 06
  • 如何将抽象工厂与单例模式结合起来? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在用 java 编码 并且对这些模式很陌生 谁能给我一个也使用单例的工厂抽象的例子 这是一个实现类的示例单例模式 这个实现也是线程安全
  • 从控制台生成具有空值(墓碑)的 Kafka 消息

    有没有什么方法可以在 kafka console Producer 中生成一条具有空值的消息 即 将其标记为压缩器以使用逻辑删除来删除它 我尝试过生成 mykey 和 mykey 前者产生错误 后者使该值成为空字符串 像这样运行生产者 KA
  • spring mvc 跟踪引用页面

    在基于注释的弹簧控制器中 如果用户正在url com first page并点击一个链接或提交一份表格指出url com second page 如何制作second page知道url of first page所以这样second pa
  • SwingWorker 在另一个 SwingWorker 的 did 方法中

    首先 我需要通知您 我正在尽最大努力学习如何用 Java 编写代码 虽然有点困难 但我相信我能做到 我过去提交了几个有关 SwingWorkers 等的问题 每一个我都以为我已经做到了 但后来发现我仍然需要学习 希望这一次不是那样的一次 话
  • org.apache.commons.codec.digest.Md5Crypt.md5Crypt 函数。 linux下出现异常,windows下正常

    我们正在使用commons codec加密密码 使用org apache commons codec digest Md5Crypt md5Crypt功能 在Windows环境下工作正常 但在CentOS上却抛出异常 我们有3台centOS
  • Servlet 调度程序当前不可用

  • 使用 JNI 从 Java 代码中检索 String 值的内存泄漏

    我使用 GetStringUTFChars 从使用 JNI 的 java 代码中检索字符串的值 并使用 ReleaseStringUTFChars 释放该字符串 当代码在 JRE 1 4 上运行时 不会出现内存泄漏 但如果相同的代码在 JR
  • JFrame Glasspane 也优于 JDialog,但不应该

    我有一个带有 Glasspane 的 JFrame 未装饰 该框架打开一个 JDialog 也未装饰 也有一个 glassPane 并隐藏自身 setVisible false Glasspanes 通过 setGlassPane 设置 对
  • Java字符串查找和替换的最佳方法?

    我正在寻找 Java 中字符串查找和替换的最佳方法 这是一句话 我的名字叫米兰 人们都知道我叫米兰瓦西奇 我想用 Milan Vasic 替换 Milan 弦 但在我已经有 Milan Vasic 的地方 情况不应该是这样 搜索 替换后的结
  • 如何在Java中模拟引用传递?

    我是一个十足的 Java 菜鸟 我知道 Java 将所有参数视为按值传递 并且还有其他几个线程人们对此进行了解释 例如 在 C 中我可以这样做 void makeAThree int n n 3 int main int myInt 4 m
  • Java 中更高级的泛型

    假设我有以下课程 public class FixExpr Expr
  • Java 验证日期为 yyyyMMddHHmmss

    我想在java中验证给定的日期格式为yyyyMMddHHmmss 状况 应符合格式 yyyyMMddHHmmss 它应该验证当前日期 它应该验证与当前小时有 3 小时或 3 小时差异的小时数 如果满足所有三个条件 Java 方法应返回 tr
  • while 之后无法访问的语句[重复]

    这个问题在这里已经有答案了 我只是修改代码 在以下代码中出现错误 int x 1 System out println x x while true x System out println x x 错误在最后一行 我可以知道错误 错误 无
  • 如何使 JScrollPane 与嵌套 JPanel 一起正常工作?

    我正在使用 NetBeans 在 Java 中构建 Swing 应用程序 但我遇到布局问题 我的主框架包含一个JScrollPane其中包含一个JPanel called contentPanel其中又包含一个JPanel called l
  • Retrofit 2.0:预期为 BEGIN_OBJECT,但在第 1 行第 1 列路径 $ [重复] 处为 STRING

    这个问题在这里已经有答案了 我在邮递员上传递了更新用户请求并获得了成功的响应 参见图片 现在当我尝试使用 Retrofit 2 在我的应用程序中执行相同操作时 出现错误 com google gson JsonSyntaxException
  • 使用 PDFBox 在 Android 中创建 PDF

    我正在尝试通过我的 Android 应用程序创建 PDFPDFBoxapi 但出现以下错误 java lang NoClassDefFoundError org apache pdfbox pdmodel PDDocument 我已经将以下

随机推荐

  • 使用 Twilio SMS API,我可以在一篇文章中指定多个目标电话吗?

    Twilio 将长代码 SMS 限制为 1 次 秒 为了提高吞吐量 我将批次分成 5 个电话号码 我发现 Twilio API 的每个 HTTP POST 大约需要 0 5 秒 人们可能会认为使用 5 个 twilio 电话号码向 1000
  • 用于嵌套 Div 标签的 PHP 正则表达式

    我需要一个可以与 PHP 的 preg match all 一起使用的正则表达式来匹配 div 标签内的内容 div 看起来像这样 div Content div 到目前为止 我已经想出了这个正则表达式 它匹配所有 id t number
  • 如何计算firebase实时数据库中的键数

    如何在 firebase 函数的帮助下计算键的数量 在上面的情况下有 3 个 我正在使用 firebase 实时数据库 Firebase 实时数据库没有单独的计数操作 您必须下载父节点的整个快照 geoTag 然后计算应用程序代码中的子级数
  • C# - Json POST 请求已发送,但 PHP 服务器未收到

    我正在从 C Windows 表单应用程序向 OpenShift Red Hat 上托管的 PHP 服务器发送 HTTP 请求 我正在使用 POST 方法和 Json 数据 问题是 数据似乎已正确发送 我在wireshark中看到数据包 p
  • 如何将jax-ws服务部署到eclipse或tomcat?

    作为一名 Web 服务初学者 我已经尝试了 2 周来获得一个与 Maven Eclipse 和 Tomcat 一起使用的 hello World Web 服务 我放弃了让任何代码 wsdl 生成器工作的尝试 我遵循了本教程http myar
  • 猫鼬鉴别器在数据库中有何帮助? [关闭]

    Closed 这个问题需要多问focused 目前不接受答案 你好 我正在学习 mongodb 我了解了 mongoose 中的鉴别器 我试图从文档方面理解它 但不太理解 任何人都可以用更好的方式解释吗 谢谢 举个例子 您的项目有两个角色
  • 图像和其他 div 彼此相邻的 Div 容器 [关闭]

    很难说出这里问的是什么 这个问题模棱两可 含糊不清 不完整 过于宽泛或言辞激烈 无法以目前的形式合理回答 如需帮助澄清此问题以便重新打开 访问帮助中心 我正在尝试绘制一个容器 其中包含左侧的图像 缩略图 和图像旁边的几个 div 垂直 在容
  • 检查多维数组中是否存在特定的数组键 - PHP

    我有一个多维数组 例如 这可能有很多层次 array Array 21 gt Array 24 gt Array 22 gt Array 25 gt Array 26 gt Array 我试图循环它以查看某个键是否存在 keySearch
  • NVD3 中的 ScatterChart – 从 csv 文件读取数据

    我正在尝试从 csv 文件读取数据 并希望使用 NVD3 中的 scatterChart 来可视化该数据 我会链接到 JSfiddle 或类似的东西 但我不知道如何在这些在线 JavaScript IDE 中包含 csv 文件 那可能吗 c
  • 分割字符串,然后显示没有最后一个的所有项目

    我有 例如 字符串let abc Jonny Name 所以如果我想检查 这是不是名字 我会检查 let isName abc split 1 isName Name your name is abc split 0 not name 但我
  • 同一服务器上的多个 Mongodb 实例

    我正在使用 Mongo DB 但我是它的新手 我准备将它安装在专门用于 Mongo 的服务器上 我想创建它的 2 个实例 1 个用于支持 QA 环境 另一个用于支持临时环境 我更熟悉 SQL Server 我可以在其中创建多个实例 是否可以
  • 在 CLI 中生成随机 BMP

    我需要一个真正随机的 BMP 来测试各种有损图像压缩算法 理想情况下 这不会依赖于任何库并在 Linux CLI 中运行 它应该生成一个随机的 BMP 给定一定的width and height 更新答案 2021 年 4 月 以下是关于随
  • 如何使用 LWP::Simple 处理代理服务器?

    如何向该脚本添加代理支持 use LWP Simple url http stackoverflow com word how to ask content get url if content m word print Found wor
  • 如何从电子表格公式调用库函数作为自定义函数?

    这可能是与以下相同的根本原因如何从同一库生成的电子表格下拉菜单中调用库函数 但我仍然想把它扔掉 以防出现新的情况或情况有所不同 问题是我想将所有自定义函数保留在一个库中 然后将该库添加到任何给定的电子表格中 并能够从单元格公式中引用它们 在
  • iOS UICollectionView:具有交替网格对齐方式的圆形视图的单元格

    我正在尝试实施UICollectionView用于圆形的自定义单元格 现在 默认情况下 圆的对齐方式与普通方形单元格相同 顶部圆和底部圆位于同一垂直线上 如何将对齐更改为 顶部圆和其下面的两个圆形成等边三角形 顶部圆和底部圆的位置按半径长度
  • 使用 React Route 部署到 S3 后看到空白页面

    我使用 React 和 React Router 构建了 SPA 我也在用https github com facebookincubator create react app因为它是一个非常简单的应用程序 当我使用 webpack 进行开
  • Java - 使用 ImageIO 进行多线程处理

    我有一个程序加载缓慢 我猜这是由于我一开始必须加载的图像资源量所致 我认为多线程会有所帮助 但现在我不太确定 这是我的自动多线程方法 private static Thread t private static int currentThr
  • 为什么 printf 在终端中显示额外的 % 字符?

    我开始学习用 C 语言编写代码 我写了以下程序 include
  • 替换字符组合

    我有一根绳子27AAGCB5913L2ZF 如果有任何一个A or J or K出现在字符串中 那么我需要将它们更改为这三个字母的所有可能组合 如果我将上面的字符串输入传递给程序 那么输出应该是这样的 27AAGCB5913L2ZF 27A
  • 消费者阅读 __consumer_offsets 传递不可读的消息

    我正在尝试从 consumer offsets 主题进行消费 因为这似乎可能是检索有关消费者的 kafka 指标 例如消息延迟等 的最简单方法 理想的方法是从 jmx 访问它 但想先尝试一下 返回的消息似乎被加密或以不可读的形式 也尝试添加