Kafka - 如何捕获kafka客户端后台线程生成的消息

2024-04-06

使用以下配置来模拟消费者关闭/会话超时。我们如何捕获客户端记录到控制台的消息 - SESSTMOUT|rdkafka#consumer-1| [第三:主要]

consumed message None: msg1: 0: first_topic: 0: None
consumed message None: msg2: 1: first_topic: 0: None
no message received by consumer
no message received by consumer
%4|1603348021.170|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state started) after 10005 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group
no message received by consumer
no message received by consumer
no message received by consumer
%4|1603276138.721|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (30000ms) exceeded by 7ms (adjust max.poll.interval.ms for long-running message processing): leaving group
error from consumer KafkaError{code=_MAX_POLL_EXCEEDED,val=-147,str="Application maximum poll interval (30000ms) exceeded by 7ms"}
from confluent_kafka import Consumer
def consume():
    c = Consumer({"bootstrap.servers": "localhost:9092", 
                  "group.id": "group1",
                  "enable.auto.commit": False,
                  "auto.offset.reset": "earliest",
                  "max.poll.interval.ms": 30000,
                  "session.timeout.ms": 10000,
                  "heartbeat.interval.ms": 15000
                  })
    c.subscribe(["first_topic"])
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            print(f"consumed message {message.key()}: {message.value().decode('utf-8')}: {message.offset()}: {message.topic()}: {message.partition()}: {message.headers()}")
        time.sleep(10)

心跳间隔.ms必须低于会话超时毫秒 https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#heartbeat.interval.ms

session.timeout.ms * 1/3

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka - 如何捕获kafka客户端后台线程生成的消息 的相关文章

随机推荐

  • 如何正确更新 OpenGL Es 2.0 中的顶点数组?

    当我在 OpenGL 2 0 中更新 iOS 上的顶点数组时 原始顶点数据保留在屏幕上 即第一个刷新是持久的 我发送到 GPU 的初始点集每帧都会渲染 但第二个刷新是持久的 我发送到 GPU 的初始点集每帧都会渲染 第 3 次 第 4 次
  • 守护进程在 JVM 垃圾收集器抖动和 JVM 内存耗尽后立即停止

    当我使用 gradle 6 0 构建多模块 java 项目时 当我添加 gt 30 个模块时 抛出此错误 Gradle Daemon started in 2 s 267 ms gt Configure project soa misc o
  • Android - Facebook 集成:无法导入 com.facebook.Session

    我是 Android Facebook 集成的新手 我正在尝试将我的应用程序与 Facebook 集成 因此我按照脸书教程 https developers facebook com docs android 并且一切正常 至少我能够执行登
  • RequireJS 模块/包的相对路径

    我对 RequireJS 还很陌生 并且遇到了一些问题 我使用 RequireJS 编写了一个基于 Backbone 构建的小框架 我希望它可以在不同的项目中重复使用 因此 通过一些搜索 我了解到 require 允许包 这似乎就是我正在寻
  • 带有变量的 jQuery 选择器

    如何将变量与选择器混合使用 我有 ID 变量 我想从 div one 中选择具有此 id 的图像 jQuery one img id 是选择器 我试过了 one img id 但不起作用 编辑 根据您下面的评论 您将使用此 one img
  • 调试时 Visual Studio 不会因未处理的异常而中断

    突然我的visual studio不会因未处理的异常而中断 有时甚至不会在断点处停止 我读过许多其他相关的 SO 帖子 如下所示 如何使 Visual Studio 仅在未处理的异常上中断 https stackoverflow com q
  • 数据类:如何使用 asdict() 忽略 None 值?

    dataclass class Car brand str color str 如何获得忽略 None 值的字典 就像是 gt gt gt car Car brand Audi color None gt gt gt asdict car
  • 在 R 中绘制 x 轴上包含日期的图表

    我正在尝试在 x 轴上绘制日期 间隔为 1 个月 并旋转日期值以确保清晰 r runif 100 d lt as Date 2001 1 1 70 sort r plot d r type l xaxt n axis Date 1 at s
  • 从 Julia 程序执行 >> shell 运算符

    我试图使用反引号从 Julia 内部附加一个文件 run cat file2 gt gt file1 但这行不通 似乎 gt gt 运算符无法正确解释 有没有办法通过管道或其他技巧来做到这一点 如果您尝试以编程方式执行此操作 则主要问题正文
  • Numpy:给定索引,如何以有效的方式消除沿 axis=1 的最小值?

    给定一个形状为 A 的矩阵 1000000 6 我已经弄清楚如何获取每行的最小最右边值并在此函数中实现它 def calculate row minima indices h h is the given matrix Returns th
  • Spring Boot - 非 Web 应用程序的长时间运行应用程序

    我有一个简单的 Spring Boot 应用程序 仅使用 AMQP 依赖项 仅 org springframework boot spring boot starter amqp 例如没有 Web 依赖项 因此 JAR 中不包含应用程序服务
  • Django 过滤器调用返回的列表的默认顺序是什么?

    简短的问题连接到 PostgreSQL 数据库时 Django 过滤器调用返回的列表的默认顺序是什么 背景我自己承认 我had在应用程序层做了一个糟糕的假设 即返回列表的顺序将是恒定的 即不使用 order by 我查询的项目列表不按字母顺
  • 自动化时的 PowerShell 和 Excel 问题

    我面临着一个奇怪的问题 当我运行这段代码时 Excel New Object Com Excel Application book Excel Workbooks Add threading thread CurrentThread Cur
  • SpringBoot Undertow:如何分派到工作线程

    我目前正在查看 springboot undertow 对我来说 不太清楚如何将传入的 http 请求分派到工作线程以阻止操作处理 看着班级Undertow 嵌入式 Servlet Container class 看起来没有办法实现这种行为
  • 如何从PDO PHP 中的prepare() 获取查询错误?

    st db gt prepare SELECT FROM c6ode 在上述情况下 如何检查查询的故意 mysql 错误 需要设置错误模式属性PDO ATTR ERRMODE to PDO ERRMODE EXCEPTION 因为您期望异常
  • C 中 scanf 函数的格式说明符中 %c 规范之前的空格

    当我之间不包含空格时 d and c格式字符串中的规范scanf 在以下程序中运行函数 并在运行时输入 4 h 则输出为 Integer 4 and Character 究竟如何可变 c 在这种情况下接受输入 如果我在之间包含空格 会有什么
  • 在Google搜索时如何从第一页获取图像?

    通常使用Google搜索城市后 右侧会出现维基百科页面的一部分 其中包含图像和地图 谁能告诉我如何访问该图像 我应该知道怎么下载 实际上 主图像 与右侧地图图像一起 很少来自维基百科 因此您无法使用维基百科 API 来获取它 如果您想访问实
  • 在 GridView 或 ListView 底部添加额外空间

    是否可以在 GridView 底部添加额外的空间 有点像空行 我希望当你向下滚动到 GridView 底部时 会有额外的 50dp 的空白空间 我尝试设置paddingBottom到50dp 但似乎没有改变任何东西 如果我理解正确的话 它应
  • Grails - SpringSecurityPlugin 不生成控制器

    我是 Grails 新手 我按照说明安装 SpringSecurityPlugin 版本 2 0 RC2 并执行命令 grails s2 quickstart 用户角色 应该在其他文件中生成 登录控制器 and 注销控制器 但这些控制器不会
  • Kafka - 如何捕获kafka客户端后台线程生成的消息

    使用以下配置来模拟消费者关闭 会话超时 我们如何捕获客户端记录到控制台的消息 SESSTMOUT rdkafka consumer 1 第三 主要 consumed message None msg1 0 first topic 0 Non