Python Kafka 消费者缺少轮询一些消息

2023-12-07

我的 Kafka 消费者的代码如下所示

def read_messages_from_kafka():
    topic = 'my-topic'
    consumer = KafkaConsumer(
        bootstrap_servers=['my-host1', 'my-host2'],
        client_id='my-client',
        group_id='my-group',
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        api_version=(0, 8, 2)
    )
    consumer.assign([TopicPartition(topic, 0), TopicPartition(topic, 1)])

    messages = consumer.poll(timeout_ms=kafka_config.poll_timeout_ms, max_records=kafka_config.poll_max_records)

    for partition in messages.values():
        for message in partition:
            log.info("read {}".format(message))

    if messages:
        consumer.commit()

    next_offset0, next_offset1 = consumer.position(TopicPartition(topic, 0)), consumer.position(TopicPartition(topic, 1))
    log.info("next offset0={} and offset1={}".format(next_offset0, next_offset1))

while True:
    read_messages_from_kafka()
    sleep(kafka_config.poll_sleep_ms / 1000.0)

我意识到消费者的这种设置无法读取所有消息。我无法重现这个问题,因为它是间歇性的问题。

当我使用以下命令比较最后 100 条消息时kafka-cat对于这个消费者,我发现我的消费者间歇性地随机错过一些消息。我的消费者出了什么问题?

kafkacat -C -b my-host1 -X broker.version.fallback=0.8.2.1 -t my-topic -o -100

仅有python 中消费消息的方式太多。应该有一种,最好只有一种明显的方法来做到这一点。


您的 Kafka 客户端存在消息丢失的问题。 我找到了解决方案here:

while True:
    raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
    for topic_partition, messages in raw_messages.items():
        application_message = json.loads(message.value.decode())

另外还有另一个 Kafka 客户端:confluence_kafka。它不存在这样的问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python Kafka 消费者缺少轮询一些消息 的相关文章

随机推荐

  • Javascript:从cookie中读取会话ID

    对于 websocket 我必须从 cookie 中公开我的 sessionid 我搜索了一下 发现我应该能够通过以下方式访问 cookie console log document cookie 不幸的是 这不起作用 或者更好的是 doc
  • LINQ 忽略重音和大小写

    使用 LINQ 通过以下方式过滤元素的最简单方法是什么Where方法忽略重音和大小写 到目前为止 我已经能够通过调用属性上的方法来忽略大小写 我认为这不是一个好主意 因为它为每个元素调用相同的方法 对吗 这是我到目前为止得到的 var re
  • 需要有关简单 MySQL 数据库设计的技巧

    我正在尝试使用 MySQL 为游戏制作一个简单的项目数据库 这是我的 3 张桌子的样子 items itemId itemName 0001 chest piece 0002 sword 0003 helmet attributes att
  • 如何生成常规 Excel 公式作为查询结果?

    我有一个大型且复杂的 Excel 查询 它可以按预期工作 但是 我正在实现一些实时数据验证功能 即不需要数据刷新 并且我需要在查询结果的一列中包含常规 Excel 公式 该公式将使用工作簿中的其他工作表执行实时数据比较 有意独立于查询本身
  • 在 AngularJS 服务之间共享数据

    有没有办法在 AngularJS 的服务之间共享数据 用例 来自不同服务的数据聚合例如 我想要一个从 REST 服务加载一些数据的 service1 然后 另一个 service2 将来自另一个 REST API 的附加数据添加到 serv
  • 如何在 apache htaccess 中为 angularjs 应用程序重写 url

    我使用的htaccess如下 RewriteBase RewriteEngine on RewriteCond REQUEST FILENAME s OR RewriteCond REQUEST FILENAME l OR RewriteC
  • iOS 中是否支持本机 JSON?

    iOS SDK 中是否有一个类可以从服务器解析 JSON 类似于 XML 的 NSXML 和扩展的 RSS 从 iOS5 开始 原生支持 JSON 无需第三方框架 这是由NSJSONSerialization 类
  • 如何在uwp平台中将图像转换为字节数组

    我需要将图像转换为字节数组以将其存储在数据库中 我还需要将该数组转换回图像 我做了谷歌研究 但找不到解决方案 因为UWP平台有些api不可用 我从这些文章中找到了解决方案外乡人 says 要将图像转换为 byte 我将使用存储文件的 Ope
  • 使用Python抓取动态内容

    我想使用 Python 来抓取网页上 您在寻找这些作者吗 框的内容 如下所示 http academic research microsoft com Search query lander 不幸的是 盒子的内容是由 JavaScript
  • Chrome打包应用程序-从Webview下载文件

    我正在努力让现有的 Ajax 风格的 Web 应用程序作为 Chrome 打包应用程序运行 Ajax 应用程序在打包应用程序内的 Web 视图中运行 并且大部分运行良好 Ajax 应用程序允许用户使用标准 HTML 5 上传和拖 放来上传文
  • 在 PHP5 中创建 Singleton 设计模式

    如何使用 PHP5 类创建 Singleton 类 Singleton class final class UserFactory private static inst null Prevent cloning and de serial
  • 固定div的CSS水平居中?

    menu position fixed width 800px background rgb 255 255 255 The Fallback background rgba 255 255 255 0 8 margin top 30px
  • 安卓中的动画?

    我是 android 新手 我需要了解 Android 中视图和视图组的基本动画 任何人都可以提供一些指导线来学习它 提前致谢 您可以先阅读官方指南到动画
  • JAIN API 和 JAIN SLEE API 之间的区别

    我正在阅读有关 SIP 的内容 发现有一个 java API JAIN SIP 可以开发基于 SIP 的应用程序 然后我还发现有JAIN SLEE和SIP servlet 我相信 JSLEE 和 SIP Servlet 是分别部署基于 JA
  • SQL Server 一轮又一轮的划分

    在存储过程中我有一个像这样的表达式 select some val in percents total val 100 some val 如果我使用Round像这样的函数 select some val in percents Round
  • 新手:在函数调用上挂起浏览器

    我刚刚开始学习 JavaScript 想知道为什么当我单击 调用函数 按钮时这个简单的代码片段会挂起 我缺少什么
  • 从 MATLAB 运行多进程应用程序

    我用 VC 编写了一个多进程应用程序 并尝试使用命令行参数执行它system来自 MATLAB 的命令 它可以运行 但只能在一个核心上运行 有什么建议吗 Update 事实上 它甚至没有看到第二个核心 我使用了 OpenMP 并使用了omp
  • 在windows和pycharm中设置SPARK-HOME路径变量

    我是 SPARK 的新手 并尝试在 Windows 中使用它 我能够使用 hadoop 的预构建版本成功下载并安装 Spark 1 4 1 在以下目录中 my spark directory bin 我可以运行 Spark shell 和
  • Google Android应用内购买“内容交付” 如何正确交付内容?

    我目前正在尝试对应用内购买进行编码 我一直在寻找有关 Google 无法处理的一些内容的最佳实践的文档 信息和教程 到目前为止我所做的 我正在运行一个计费服务来处理与 Google Play 的对话 该服务可以完成 示例 交易 并且我的应用
  • Python Kafka 消费者缺少轮询一些消息

    我的 Kafka 消费者的代码如下所示 def read messages from kafka topic my topic consumer KafkaConsumer bootstrap servers my host1 my hos