Kafka 发送重复消息

2024-02-20

我们使用 kafka(0.9.0.0) 来编排不同微服务之间的命令消息。我们发现一个间歇性问题,即重复消息被传递到特定主题。下面给出了发生此问题时出现的日志。有人可以帮助理解这个问题吗

Wed, 21-Sep-2016 09:19:07 - WARNING Coordinator unknown during heartbeat -- will retry
Wed, 21-Sep-2016 09:19:07 - WARNING Heartbeat failed; retrying
Wed, 21-Sep-2016 09:19:07 - WARNING <BrokerConnection host=AZSG-D-BOT-DEV4 port=9092> timed out after 40000 ms. Closing connection.
Wed, 21-Sep-2016 09:19:07 - ERROR Fetch to node 1 failed: RequestTimedOutError - 7 - This error is thrown if the request exceeds the user-specified time limit in the request.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092)
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery.
Wed, 21-Sep-2016 09:19:07 - ERROR LeaveGroup request failed: UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092)
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery.
Wed, 21-Sep-2016 09:19:10 - INFO Joined group 'kafka-python-default-group' (generation 5) with member_id kafka-python-1.0.2-8585f310-cb4f-493a-a98d-12ec9810419b
Wed, 21-Sep-2016 09:19:10 - INFO Updated partition assignment: [TopicPartition(topic=u'ilinaTestPlatformReq', partition=0)]

From 关于消费者配置的 Kafka 文档 http://kafka.apache.org/documentation.html#newconsumerconfigs:

session.timeout.ms(默认 30000) - 用于检测的超时 使用 Kafka 的组管理工具时出现故障。当一个 在会话超时时间内没有收到消费者的心跳, 经纪人会将消费者标记为失败并重新平衡该组。自从 仅当以下情况时才发送心跳poll()被调用时,更高的会话 超时允许消费者有更多时间在轮询中处理消息 循环的代价是需要更长的时间来检测硬故障。也可以看看max.poll.records另一个控制处理时间的选项 轮询循环。请注意,该值必须在允许范围内 在代理配置中配置group.min.session.timeout.ms and group.max.session.timeout.ms.

看来如果消息处理时间大于30000毫秒,就会触发消费者重新平衡,这可能会导致消息重复传递。

你可以尝试的是增加session.timeout.ms.

另一种选择是在使用时异步处理消息pause() http://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.pause在处理消息之前和resume() http://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.resume处理消息后。在这种情况下,消费者会调用poll()(并发送心跳)即使处理时间长于session.timeout.ms。因此,经纪人不会将您的消费者标记为失败,也不会启动重新平衡。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 发送重复消息 的相关文章

随机推荐

  • Visual Studio 2012 asp net mvc 3 项目剃刀视图

    我在新的 Visual Studio 2012 RC 中的 asp net mvc 3 项目中遇到智能感知问题 如果我在VS2012中打开VS2010 SP1 asp net mvc项目 cshtml视图如下所示 model Intrane
  • Windows 10 上的 Visual Studio 2012 中缺少“添加域服务类”

    我有一个 Silverlight 5 应用程序 需要对其进行维护工作 我想避免使用虚拟机 随着 Net Framework 4 6 2 的发布 目前处于预览版 this bug https connect microsoft com Vis
  • C/C++ 编译器是否将按二次方值进行常量除法优化为移位?

    问题说明了一切 有谁知道以下是否 size t div size t value const size t x 64 return value x 优化为 size t div size t value return value gt gt
  • 在 Android 的自定义适配器中设置 Textview 文本时出现问题

    我无法在其中设置 textview 的 setText 属性getView 自定义适配器的方法 我已经尝试过以下解决方案 但它对我不起作用 解决方案一 https stackoverflow com questions 5612844 ho
  • 在 ubuntu 16.04 中安装 Caffe 时遇到困难

    操作系统 ubuntu 16 04 CUDA 7 5 库德恩 5 我正在关注this https github com BVLC caffe wiki Ubuntu 16 04 or 15 10 OpenCV 3 1 Installatio
  • CSS,粘性页脚

    我知道 我知道 我以前问过这个问题 不幸的是 我失去了工作 我重新开始 回到原来的岗位 min height 100 在我的容器上不起作用 https stackoverflow com questions 9860888 min heig
  • std::remove 和 std::remove_if 设计的稳定性是否失败?

    最近 从一条评论中 我了解到std remove and std remove if是稳定的 我是否错误地认为这是一个糟糕的设计选择 因为它阻止了某些优化 想象一下删除 1M 的第一个和第五个元素std vector 由于稳定性原因 我们无
  • ie8 中未调用图像上的 onload 回调

    我正在尝试预加载图像并将高度和宽度设置为容器 问题似乎出在 ie8 中的缓存 因为它无法在后续刷新时加载 我查找并尝试了多种解决方案 但似乎没有任何效果 至少不一致 当前的 JavaScript img new Image img src
  • Pandas:将掩码应用于多索引数据帧

    我有一个带有 MultiIndex 列的 pandas 数据框 有 3 个级别 import itertools import numpy as np def mklbl prefix n return s s prefix i for i
  • UISegmentedControl 行为

    关于 iPhone 上的 UISegmentedControl 类的一个简单问题 希望有些人可能已经注意到 在具有 2 个分段的默认状态下 即使用户点击当前选定的分段 分段控件仍然会切换 我在应用程序中看到 UISegmentedContr
  • 如何安排对 MS Access 宏的调用?

    我正在寻找安排对 MS Access 宏的调用 该宏使用 csv 文件 我每天都会收到 来更新 SharePoint 日历 有人可以解释一下如何安排每日调用来在我的 Windows 7 计算机上运行此宏 我使用的是 Access 2003
  • 如何在 CodeIgniter 中获取会话超时?

    我试图在会话超时前 5 分钟运行一个函数 我的配置文件中的会话超时设置为 7 200 可以用 CodeIgniter 做到这一点吗 我想你正在寻找这样的东西 lastActivity this gt session gt userdata
  • 将 SimpleXMLElement 对象转换为数组

    我必须将 SimpleXMLElement 对象转换为数组 数组如下 Array 0 gt SimpleXMLElement Object Cell gt Array 0 gt SimpleXMLElement Object Data gt
  • 如何在 Spring 5 WebFlux WebClient 中设置超时

    我正在尝试在 WebClient 上设置超时 这是当前的代码 SslContext sslContext SslContextBuilder forClient trustManager InsecureTrustManagerFactor
  • 如何在 redmine wiki 中添加一个空行?

    我只是想知道如何在 redmine 中添加一些空行 我尝试的是输入以下文本 some sample txt with a few empty lines 我在redmine中得到的是 some sample txt with a few e
  • 当窗口滚动到特定位置时触发事件

    我想在浏览器窗口超出某个点时调用函数 例如 用户将窗口从顶部向下滚动超过 200px 是否有一个我可以绑定的事件 然后我如何检查从浏览器顶部到页面顶部的偏移量是多少 您可以使用onscroll https developer mozilla
  • 24小时内SQL选择?

    People Born 是日期时间类型 我只想删除过去 24 小时内出生的人 我将其作为 Visual Studio 2010 内的存储过程运行 这是我的尝试 DELETE FROM People WHERE People Born gt
  • 为什么一个汉字需要1个字符(2个字节)而不是3个字节?

    我有以下程序来测试Java如何处理中文字符 String s3 世界您好 char chs s3 toCharArray byte bs s3 getBytes StandardCharsets UTF 8 byte bs2 new Str
  • 将 QStackedWidget 的大小调整为打开的页面[重复]

    这个问题在这里已经有答案了 我想要我的QStackedWidget调整到打开页面的大小 我在第一页附加了很多小部件 但其余页面只有一个按钮 所以它们保持这么大 第一页还可以 我怎样才能让我的QStackedWidget获得正在查看的页面的大
  • Kafka 发送重复消息

    我们使用 kafka 0 9 0 0 来编排不同微服务之间的命令消息 我们发现一个间歇性问题 即重复消息被传递到特定主题 下面给出了发生此问题时出现的日志 有人可以帮助理解这个问题吗 Wed 21 Sep 2016 09 19 07 WAR