Kafka 消费者通过 JMX 滞后

2024-04-25

我正在尝试监控 Kafka 0.10 中消费者组的滞后情况。

我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量。这意味着我可以使用以下方式获取数据:

bin/kafka-consumer-groups.sh --bootstrap-server <broker> --describe --group <group-name>

这很好用。但是,我的经纪人已经使用了普罗米修斯 JMX 导出器 https://github.com/prometheus/jmx_exporter用于收集一些统计数据。我已将 JConsole 连接到代理,但看不到 JMX 中报告的相同数据kafka-consumer-groups.sh.

有没有办法使用 JMX 从 Kafka 获取这些信息without需要任何额外的工具吗?


您可以检索属性{topic}-{partition}.records-lag公制的kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id}对于所有分区。这应该相当于输出consumer-groups.sh

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 消费者通过 JMX 滞后 的相关文章

  • 何时使用 RabbitMQ 而不是 Kafka? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我被要求评估 RabbitMQ 而不是 Kafka 但发现很难找到消息队列比 Kafka 更合适的情况 有谁知道消息队列在吞吐量 耐用性 延迟或
  • Kafka-python 检索主题列表

    我在用着卡夫卡蟒蛇 http kafka python readthedocs org en 1 0 2 我想知道是否有办法显示所有主题 像这样的事情 bin kafka topics sh list zookeeper localhost
  • 如何在KafkaStream应用程序中获取partitionId和TopicName

    我们如何从 KafkaStream 获取主题名称和分区 id 对于任何其他 Kafka 消费者 我们可以获得主题名称和分区 ID 如下所示 ConsumerRecords
  • Spring Cloud Stream动态通道

    我正在使用 Spring Cloud Stream 想要以编程方式创建和绑定通道 我的用例是 在应用程序启动期间 我收到要订阅的 Kafka 主题的动态列表 如何为每个主题创建一个频道 我最近遇到了类似的场景 下面是我动态创建 Subscr
  • 卡夫卡高级消费者 error_code=15

    当尝试使用高级消费者 使用全新的消费者组 从 Kafka 进行消费时 消费者永远不会开始运行 当我将日志记录级别切换为调试时 我可以看到以下两行一遍又一遍地重复 DEBUG AbstractCoordinator 09 43 51 192
  • 卡夫卡幂等生产者

    卡夫卡文档说 幂等生产者可以使用相同的生产者会话 但我无法理解这一点 比如说 Kafka 为每条消息添加序列号 最后一个序列号保存在 Kafka 中 不确定它在哪里维护 它如何生成序列号以及它保存在哪里 为什么当生产者崩溃并再次出现时它无法
  • GCP Dataproc 作业未找到存储在存储桶中的 SSL pem 证书

    我有一个 GCP Dataproc 集群 我正在尝试部署一个 pyspark 作业 该作业使用 SSL 生成一个主题 pem 文件存储在存储桶 gs dataproc kafka code code 中 我正在使用下面所示的代码访问 pem
  • 使用kafka lib反序列化PRIMITIVE AVRO KEY

    我目前无能力反序列化 avro PRIMITIVE 密钥在 KSTREAM 应用程序中 使用 avro 模式编码的密钥 在模式注册表中注册 当我使用 kafka avro console consumer 时 我可以看到密钥已正确反序列化
  • Kafka服务器未远程连接zookeeper服务器

    我正在尝试将 kafka 服务器 在 Windows 系统上 连接到 Zookeeper 服务器 我面临着 Opening socket connection to server 10 160 10 25 10 160 10 25 2181
  • 批量插入成功后更新 Kafka 提交偏移量

    我有一个 spring kafka 消费者 它读取记录并将其移交给缓存 计划任务会定期清除缓存中的记录 我想仅在批次成功保存到数据库后更新 COMMIT OFFSET 我尝试将确认对象传递给缓存服务以调用确认方法 如下所示 public c
  • 无法向 Kafka 发送大消息

    我想从生产者向 Kafka 发送一条大消息 因此我更改了以下属性 代理 服务器 属性 replica fetch max bytes 317344026 message max bytes 317344026 max message byt
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • 如何处理Kafka流中的不同时区?

    因此 我正在评估 Kafka Streams 及其功能 看看它是否适合我的用例 因为我需要每 15 分钟 每小时 每天聚合传感器数据 并发现它由于其窗口功能而很有用 因为我可以通过应用创建窗口windowedBy on KGroupedSt
  • Java的JConsole可以用来自动配置内存吗?

    我正在学习JavaJMX https docs oracle com javase tutorial jmx and JConsole http docs oracle com javase 7 docs technotes guides
  • 在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

    我已经成功地将用 Java 编写的简单自定义 Partitioner 类用于 Confluence 3 2 x Kafka 0 10 x 上的 Kafka Connect 接收器 我想升级到 Confluence 4 1 Kafka 1 1
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con

随机推荐

  • Spring MVC:@ResponseBody,415 不支持的媒体类型

    我在将 JSON Post 映射到特定 Java 对象以通过以下方式保存时遇到问题休眠 Ajax 调用的标头设置正确 Accept application json Content Type application json charset
  • 在 shell 脚本中禁止输出到屏幕

    你好 我写了一个小脚本 usr bin ksh for i in DAT do awk BEGIN OFS FS 3 353 3 353861958962 print i gt gt i changed awk 3 353 i change
  • 选项卡视觉选择

    In many GUIs when I select a section of text and then hit the Tab or Shift Tab button the selected section will indent i
  • XML-RPC Odoo - C# 多个搜索条件

    当使用 CookComputing XML RPC net 尝试仅使用一个条件搜索 mail notification 模型时 这相当简单 因为您只需调用 object args new object 1 object subargs ne
  • 从 SQLDataReader 读取结果时出现无效转换异常

    我的存储过程 UserName nvarchar 64 AS BEGIN SELECT MPU UserName SUM TS Monday as Monday TS Monday contains float value FROM dbo
  • 如何更改 vuetify v2 中 scss 中的断点?

    我正在使用 scss 文件 我想更改 vuetify v2 中 css 端的断点 我在 vuetify 升级指南中找不到任何参考 在 1 5 版本中我做了 style x styl grid breakpoints xs 0 sm 476p
  • Laravel - 获取客户端 IP 地址 - 始终获取 127.0.0.1 结果

    我无法获取客户端的 IP 地址 我需要该地址来确定他的当前位置 我使用了 request gt ip SERVER REMOTE ADDR 并且总是得到 127 0 0 1 结果 这不是我想要的 我究竟做错了什么 有时您的客户端通过代理使用
  • pyQt4 - 如何选择表格行并禁用编辑单元格

    我使用以下命令创建 QTableWidget self table QtGui QTableWidget self table setObjectName table self table setSelectionBehavior QtGu
  • 如何使用实体框架和成员资格表初始化数据库

    我有一个使用 Entity Framework 5 0 Code First 的 MVC4 Web 应用程序 在 Global asax cs 中 我有一个引导程序 用于初始化 Entity Database 强制初始化数据库并初始化成员资
  • 如何将谓词与包含的属性一起包含在内?

    目前 我有一个函数 允许我查询数据库 同时在结果中包含一些其他相关表 以防止返回数据库 否则如果我不这样做 我知道会发生这种情况 public class EntityRepository
  • 如何拦截来自 iframe 的 http 请求?

    我以编程方式在网页中设置 iframe 的 URL 我必须知道此 URL 更改会触发哪些 http 请求 CSS 脚本 图像等加载的 URL 我拦截了 XMLHttpRequest 但这个对象从未被调用过 如何拦截这些http请求 是否有另
  • Node.js (Express) 带有路由器的错误处理中间件

    这是我的应用程序结构 app js routes index js The ExpressJS应用程序创建错误处理程序development and production环境 这是来自的代码片段app js app use routes r
  • 物化滑动选项卡在模态中不起作用

    我遇到了一个奇怪的问题 我在物化选项卡上使用滑动 当我在没有模式的情况下滑动时 它工作正常 但是当我将它们包含在模式中时 滑动功能不再工作 document ready function modal modal tabs tabs swip
  • 如何获取可公开访问的 crashlytics 报告 url?

    我想与一些第三方分享我的 crashlytics 崩溃报告 如何获得如下所示的可公开访问的网址 http crashes to s 419b5b28766 http crashes to s 419b5b28766 我是新来的 这是一个旧的
  • SchedPolicy:set_timerslack_ns 写入失败:不允许操作

    当我运行 Android 应用程序时 我在 Logcat 中遇到了这个问题 有谁知道这个问题以及如何解决它 依赖项是 implementation com android support appcompat v7 25 3 0 implem
  • 32x8 寄存器文件 VHDL 测试台

    我已经用 vhdl 编写了该电路的汇编代码 我想用测试台来模拟它 RegWrite 1 位输入 时钟 写寄存器个数 3位输入 写地址 写入数据 32 位输入 数据输入 读取 寄存器编号 A 3 位输入 读取地址 读取寄存器编号 B 3 位输
  • Spring Cloud 配置服务器无法使用本地属性文件

    我一直在玩弄位于此处的 github 上的 Spring Cloud 项目 https github com spring cloud spring cloud config https github com spring cloud sp
  • redis-cli 重定向到 127.0.0.1

    我在PC1上启动Redis集群 然后在PC2上连接它 当需要重定向到另一个集群节点时 它会显示Redirected to slot 7785 located at 127 0 0 1 但应该显示Redirected to slot 7785
  • 壁纸更换事件

    我可以在广播接收器中获取壁纸更改事件吗 我需要检测用户是否更改了壁纸 我怎样才能做到呢 我正在做的是这样的 我有一个可以自动更改壁纸的应用程序 如果用户使用不同的应用程序手动更改它 我想注意到它并询问用户是否要将新壁纸添加到我的应用程序的列
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst