如果一个代理关闭,流应用程序中的 KafkaStream EXACTLY_ONCE 会导致重新平衡失败

2024-01-07

我有一个 Kafka 流应用程序,其中 kafka-streams 和 kafka-clients 均为 2.4.0 具有以下配置

properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
  • 经纪人= ip1:端口1, ip2:端口2,ip3:端口3,
  • 主题分区:3
  • 主题复制:3

场景1:我只启动 2 个代理(流应用程序在代理 ip 设置中仍然包含代理的三个 ip),当我启动我的流应用程序时,会发生以下错误。

2020-02-13 13:28:19.711  WARN 18756 --- [-1-0_0-producer] org.apache.kafka.clients.NetworkClient   : [Producer clientId=my-app1-a4c8867f-b914-49bb-bc58-203349700828-StreamThread-1-0_0-producer, transactionalId=my-app1-0_0] Connection to node -2 (/ip2:port2) could not be established. Broker may not be available.

1分钟后

org.apache.kafka.streams.errors.StreamsException: stream-thread [my-app1-a4c8867f-b914-49bb-bc58-203349700828-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [my-app1-a4c8867f-b914-49bb-bc58-203349700828-StreamThread-1] task [0_0] Failed to initialize task 0_0 due to timeout.
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:966)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:254)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:176)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298)
    at org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
    at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
    at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
    ... 3 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

我正在测试高可用性测试场景。我认为卡夫卡仍然应该作为复制正确地存在于两个代理中(我已经使用kafka GUI工具进行了检查)。

场景2:今天我注意到,当我只启动 2 个经纪人并给出这两个经纪人的 ip 时(即流应用程序只有两个工作经纪人的 ip)

2020-02-16 16:18:24.818  INFO 5741 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=my-app-0a357371-525b-46cf-9fe1-34ee94fa4158-StreamThread-1-consumer, groupId=my-app] Group coordinator ip2:port2 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-02-16 16:18:24.818 ERROR 5741 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [my-app-0a357371-525b-46cf-9fe1-34ee94fa4158-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:

org.apache.kafka.streams.errors.StreamsException: stream-thread [my-app-0a357371-525b-46cf-9fe1-34ee94fa4158-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [my-app-0a357371-525b-46cf-9fe1-34ee94fa4158-StreamThread-1] task [0_0] Failed to initialize task 0_0 due to timeout.
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:966)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:254)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:176)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298)
    at org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
    at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
    at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
    ... 3 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

Note:如果我不设置,情况就不是这样EXACTLY_ONCE在属性中。他们按预期工作。 尝试增加重连并减少最大毫秒数,但没有帮助。 谁能解释我缺少什么?

当broker 1宕机时broker2的日志:

[2020-02-17 02:29:00,302] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Retrying leaderEpoch request for partition __consumer_offsets-36 as the leader reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)

Kafka 日志中充满了上面的行。

现在一项重大观察: 当我关闭broker2(即broker 1和broker 3正在运行)时,我的流应用程序运行良好。仅当代理 1 关闭时,我的应用程序才会关闭。我猜测一些应该在所有代理之间分发的关键信息仅保存在代理 1 中。


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如果一个代理关闭,流应用程序中的 KafkaStream EXACTLY_ONCE 会导致重新平衡失败 的相关文章

  • 声纳违规:安全性 - 使用 byte[] 时直接存储数组

    我确实遇到过以下两篇关于类似问题的帖子声纳违规 https stackoverflow com questions 11580948 sonar violation security array is stored directly and
  • 将 CSV 文件读入 Java 作为数据库表

    我发现了很多关于使用 Java 读取 CSV 的帖子 并且他们所指向的 API 在读取 CSV 文件时都采用了面向行的方法 就像 当你得到一行时 获取每一列的值 我希望有一个更高级别的 API 比如在 Perl 中 DBI 允许您在 CSV
  • 使用比较器对对象进行排序给出空指针

    我正在尝试对包含 3 张卡的 ArrayList 进行排序 我正在用比较器来做这件事 这是否太过分了 Card getRank 返回 2 到 14 之间的整数 我完全不知道哪里出了问题 我之前已经成功完成了这个 并与我的其他代码进行了比较
  • Antlr 处理异常

    我使用 Antlr 3 和 AST 树开发了一个复杂的语法 ANTLR 生成词法分析器和解析器 问题是 例如 当用户输入无效的语法时 该语法需要 用户没有输入此内容 然后在我的 Eclipse IDE 中出现以下异常 line 1 24 m
  • eclipse juno 打开时出错

    在安装 Eclipse 并正常工作一年多后 我今天打开 Eclipse Juno 并在打开工作区时收到一条错误消息 我使用的是 Windows 8 64 位 Java 64 位和 Eclipse 64 位 此后我尝试重新安装 Java 和
  • 如何在流中收集到TreeMap中?

    我有两个Collectors groupingBy在流中 我需要收集所有信息TreeMap 我的代码 Map
  • 无法解析配置“:app:debugRuntimeClasspath”的所有文件。问题

    我的 android studio 遇到了下一个问题 导致 org gradle api internal artifacts ivyservice DefaultLenientConfiguration ArtifactResolveEx
  • RSA 加密-解密:BadPaddingException:数据必须以零开头

    对于一个被问了很多次的问题 我很抱歉向您询问您的技能 我有一个关于 RSA 加密的问题 我已经检查过有关此问题的其他主题 但没有找到任何有用的答案 我希望你能帮助我 我想读取一个文件 加密其内容 然后解密它并将这些解密的字节放入一个新文件中
  • 将二进制数据的 byte[] 转换为 String

    我有二进制格式的数据 hex 80 3b c8 87 0a 89 我需要将其转换为字符串 以便通过 Jackcess 将二进制数据保存在 MS Access 数据库中 我知道 我不打算在 Java 中使用 String 来存储二进制数据 但
  • 如何将 wsdl 内部架构设置为 Jaxb2Marshaller 以验证我所做的每篇文章?

    我正在使用 SOAP Web 服务 在调用它之前我必须验证每个 xml 帖子 所以我正在使用 The CXF codegen 插件生成POJO树结构 第三部分 wsdl xxxx soap service wsdl 一个类实现Web服务网关
  • 比较和删除列表和数组java中不存在的元素

    我有一个String数组和一List
  • 如何使用 UUID 生成唯一的正 Long

    我需要为我的数据库主键列生成唯一的长 ID 我以为我可以用UUID randomUUID getMostSignificantBits 但有时它也会产生一些负多头 这对我来说是个问题 是否可以从 UUID 中仅生成正长 将会有数十亿个条目
  • 将传入字符串的 unicode 表示形式转换为 UTF-8?

    我正在读取一些已经转换为 html 样式 代码的数据 我现在需要将其转换回 UTF 8 字符以供查看 不幸的是我无法使用浏览器查看该字符串 我读过有关 java 中的转换的内容 似乎如果你有一个 uxxxx 字符串 那么编译器会为你转换 然
  • 使用antlr4获取预处理器行并解析C代码

    我正在使用 Antlr4 来解析 C 代码 并使用以下语法来解析 链接到 C g4 https github com antlr grammars v4 blob master c C g4 上面的语法默认不提供任何解析规则来获取预处理器语
  • 如何减少 JSF 中的 javax.faces.ViewState

    减少 JSF 中视图状态隐藏字段大小的最佳方法是什么 我注意到我的视图状态约为 40k 这会在每次请求和响应时下降到客户端并返回到服务器 特别是到达服务器时 这对用户来说会显着减慢 我的环境 JSF 1 2 MyFaces Tomcat T
  • 如何获取队列中的第 n 个项目?

    我的应用程序中有许多队列和优先级队列 我想轻松访问这些队列中的第 n 个项目 但没有看到使用 API 实现此目的的简单方法 我想我可以创建一个Iterator并迭代到第 n 个元素或使用toArray index 但似乎应该有一个更简单的方
  • 如何在Webview中保存用户名和密码

    目前 我还在学习Android开发的过程中 所以如果我的这个问题对你来说不太容易理解 请原谅 我创建了一个 Android 应用程序 它使用 RecyclerView 显示一组列表 当用户单击列表中的每个名称时 它会将它们重定向到一组不同的
  • Android - 保持用户登录状态

    我正在尝试使用 PHP 和 MySQLi for Android 进行登录 我不明白的是如何保持用户登录状态 我看到一个简单的教程 其中有人使用 SQLite 来保护信息 但我不知道这是否真的安全 如何保存用户信息以保持用户登录状态 谢谢
  • HashSet 与 LinkedHashSet

    它们之间有什么区别 我知道 LinkedHashSet 是 HashSet 的有序版本 维护一个跨所有元素的双向链接列表 使用此类代替 HashSet 当您关心迭代顺序时 当你迭代 HashSet 时 顺序是不可预测的 而 LinkedHa
  • 如何正确使用Google Calendar API Events.Insert命令?

    所以我一直使用REST方法来调用Google的API 我需要将事件插入到我拥有 ID 的特定日历中 这是我发送的 POST 请求 地址 https www googleapis com calendar v3 calendars https

随机推荐

  • 使用unicode字符u201c

    我是 python 新手 在理解 unicode 时遇到问题 我在用着 Python 3 4 我花了一整天的时间试图通过阅读有关 unicode 的内容来解决这个问题 包括http www fileformat info info unic
  • 具有新 Firebase 的 Nodejs 应用程序不会检索数据库项目

    我是 Nodejs 新手 但已经有一个工作的 js 客户端程序 Firebase 版本 3 0 2 事实证明 我需要一个服务器来完成一些在 js 客户端中不可能完成的简单事情 当我在 Nodejs 中尝试这个基本的事情时 没有任何反应 数据
  • 在 MATLAB 中查找变量的小数位数

    给定变量 x 12 3442 我想知道变量的小数位数 在这种情况下 结果将是 4 如何在不反复试验的情况下做到这一点 这是一个紧凑的方法 y x 10 1 20 find y round y 1 假设x是您的数字 20 是小数点后的最大位数
  • 在java中将数组的字符串表示形式转换回int数组

    刚刚开始使用 Java 编程 如果我有一个存储在 txt 文件中的数组 如下所示 10 22 30 55 10 20 19 如何将其转换回正常的 int 数组以在代码中使用 我需要能够将其简单地存储在这样的 txt 文件中 以便我可以手动对
  • 如何在 Windows 上安装 python-levenshtein?

    经过几天的搜索 我准备放弃寻找 Python 2 7 Windows 64 位 的预编译二进制文件Python Levenshtein 库 http pypi python org pypi python Levenshtein 所以不是我
  • Java 中的 getter/setter

    我是 Java 新手 但有一些使用 ActionScript 3 的 OOP 经验 因此我尝试依靠我所知道的内容进行迁移 在 ActionScript 3 中 您可以使用 get 和 set 关键字创建 getter 和 setter 这意
  • 相机控件在 iOS 7 上不可见

    我使用图像选择器控制器来调用设备相机 下面列出的代码在 iOS 7 下工作正常 但是当我在 iOS 7 上使用相同的代码启动相机时 我看不到 使用 和 取消 按钮 void getCameraPicture UIImagePickerCon
  • R - 使用“rep”创建重复序列

    我想知道是否有更简单的方法来制作列表 例如 10 4 20 6 和 30 3 然后手写 example lt c 4 4 4 4 与函数 rep 我知道我可以重复某个序列 n 次 每次重复 n 次 但我不知道如何用每个数字的不同数量来制作一
  • O(n) 算法的计算时间可以超过 O(n^2) 吗?

    假设我有两种算法 for int i 0 i lt n i for int j 0 j lt n j do something in constant time 这自然是O n 2 假设我也有 for int i 0 i lt 100 i
  • 渐进式 Web 应用程序中的重定向

    我试图在通知单击时重定向到渐进式网络应用程序中的特定网址 但它不会重定向 情况 1 如果 Web 应用程序未添加到主屏幕 则在收到通知后单击浏览器窗口将打开并重定向到所需的 URL 情况 2 如果 Web 应用程序添加到主屏幕 则登陆页面是
  • PostgreSQL 9.1 时区

    我正在使用 postgresql 在数据库中存储一些日期 在我的应用程序中 它完全了解时区是至关重要的 我正在客户端 服务器和数据库之间进行一些基本测试 我从 GWT 中执行的浏览器应用程序发送日期 并读取 postgresql 上的日期
  • iTunes Connect - 无法邀请预发行应用程序的“内部测试人员”

    我的应用程序已获准通过新的 Apple TestFlight 应用程序进行分发 我试图通过邀请 内部测试员 iTunes Connect gt Prerelease gt Internal Testers但我看到的只是两个信息框 要开始测试
  • Pycharm 不接受“list[Example]”作为项目列表的类型提示[重复]

    这个问题在这里已经有答案了 我在 PyCharm 中发现了一个奇怪的类型 Example是我自己的班级 但我想这并不那么重要 因为 IDE 正在抱怨list类型没有定义 getitem 这是不正确的方法 我想知道这是一个错误还是我以错误的方
  • Magento - 对自定义报价总计字段应用税

    我为 Magento 创建了一个附加费模块 它在报价中添加了一个自定义总计字段 附加费含税输入到 Magento 中 我已成功获取将附加费添加到报价中的模块 并且结帐页面上的总计是正确的 当我尝试对附加费征税 以便将其包含并显示在结账页面的
  • jQuery Ajax Post 与数据

    当使用某些参数单击按钮时 我尝试调用 PHP 文件 它一直执行到 jsfile js 中的警报语句为止 之后ajax部分没有被执行 帮助我 主要 html
  • 当应用程序在后台运行时获取 GPS 位置更新

    我有一个 Android 应用程序 可以跟踪客户位置并每 10 秒发送一次位置 但是 在 android O 中 位置更新每小时会获得几次 正如有关 android O 中 GPS 位置更新限制的文档中所述 无论如何 为了克服这个问题 我使
  • C/C++ MPI 加速未达到预期

    我正在尝试编写一个 MPI 应用程序来通过计算机集群加速数学算法 但在此之前我正在做某种基准测试 但最初的结果并不像预期的那么好 测试应用程序在 4 核时具有线性加速 但 5 6 核并未加速应用程序 我正在使用 Odroid N2 平台进行
  • 在 Sympy.mpmath.plot 中更改图形大小

    我希望这个问题不是太初级 我已经广泛搜索了解决方案 但尚未找到 我最近开始使用 Jupyter Notebook 和 Sympy 在微积分 II 课上做笔记和做作业 这真是一个巨大的好处 然而 我唯一的问题是我无法弄清楚如何配置绘图的大小
  • 什么是堆栈溢出?

    什么是堆栈溢出错误 它可能出现在什么类型的程序 编程语言中 它不太可能出现在 Web 应用程序代码中吗 From 维基百科 http en wikipedia org wiki Stack overflow 在软件中 会发生堆栈溢出 当内存
  • 如果一个代理关闭,流应用程序中的 KafkaStream EXACTLY_ONCE 会导致重新平衡失败

    我有一个 Kafka 流应用程序 其中 kafka streams 和 kafka clients 均为 2 4 0 具有以下配置 properties put StreamsConfig BOOTSTRAP SERVERS CONFIG