Kafka Streams 应用程序无尽的重新平衡

2024-03-08

我们正在运行一个卡夫卡流应用程序并遇到一个奇怪的问题。我们正在使用全局状态存储和多个其他状态存储。

我们的应用程序已加载所有数据,状态存储中现在有大量信息。现在,当我们尝试关闭应用程序并再次将其恢复(一些配置更改)时,它会进入无休止的重新平衡。为了验证我们恢复了配置更改,但它仍然停留在该阶段。没有错误等

INFO  o.apache.kafka.streams.KafkaStreams - stream-client [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb] Started Streams client
INFO  o.a.k.s.p.internals.StreamThread - stream-thread [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-2] State transition from RUNNING to PARTITIONS_REVOKED
INFO  o.apache.kafka.streams.KafkaStreams - stream-client [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb] State transition from RUNNING to REBALANCING
INFO  o.a.k.s.p.internals.StreamThread - stream-thread [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-2] partition revocation took 1 ms.
    suspended active tasks: []
    suspended standby tasks: []
INFO  o.a.k.s.p.internals.StreamThread - stream-thread [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
INFO  o.a.k.s.p.internals.StreamThread - stream-thread [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-1] partition revocation took 0 ms.
    suspended active tasks: []
    suspended standby tasks: []
04:02:13.682 6985 [main] INFO  com..... - Started Application in 6.647 seconds (JVM running for 7.484)
04:02:23.300 16603 [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
04:02:23.300 16603 [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-2] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
04:02:23.328 16631 [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-1] partition assignment took 28 ms.
    current active tasks: [0_0, 1_0, 2_0, 3_0, 4_0, 5_0, 6_0, 7_5, 8_5, 9_5, 10_5, 12_4, 13_4, 14_4, 15_4, 16_4, 17_4, 19_3, 20_3, 21_3, 22_3, 23_3, 24_3, 25_3, 29_0]
    current standby tasks: [0_2]
    previous active tasks: []

04:02:23.328 16631 [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-2] partition assignment took 28 ms.
    current active tasks: [0_3, 1_3, 2_3, 3_3, 4_3, 5_3, 7_2, 8_2, 9_2, 10_2, 12_1, 13_1, 14_1, 15_1, 16_1, 17_1, 19_0, 20_0, 21_0, 22_0, 23_0, 24_0, 25_0, 26_0]
    current standby tasks: [0_5]
    previous active tasks: []
04:03:47.602 100905 [http-nio-8080-exec-10] INFO  c.j.d.r.b.p.base.StreamsRestService - State of Kafka Streams Application: REBALANCING
04:03:49.356 102659 [http-nio-8080-exec-2] INFO  c.j.d.r.b.p.base.StreamsRestService - State of Kafka Streams Application: REBALANCING
04:03:51.600 104903 [http-nio-8080-exec-3] INFO  c.j.d.r.b.p.base.StreamsRestService - State of Kafka Streams Application: REBALANCING
04:03:53.356 106659 [http-nio-8080-exec-4] INFO  c.j.d.r.b.p.base.StreamsRestService - State of Kafka Streams Application: REBALANCING

Number of topics - 100
Partitions per topic - 6.  (7 topics with 1 partition only)
kubernetes env - 3 pods ( 2 stream threads )

当我们尝试使用以下命令列出消费者组时

root@bastion-0:/app/confluent-5.2.2/bin# ./kafka-consumer-groups --describe --group app  --bootstrap-server kafka-0..local:9094 --command-config /app/client-sasl-ssl.properties --members

CONSUMER-ID                                                                                               HOST                    CLIENT-ID                                                            #PARTITIONS     
app-b8c729c9-dc1c-457b-8120-457035e84e58-StreamThread-1-consumer-3b370697-e737-411c-af28-fb04cfbae1dd 1.1.1.1/1.1.1.1 app-b8c729c9-dc1c-457b-8120-457035e84e58-StreamThread-1-consumer 45              
app-aaef2f83-d51c-4b6f-bbd8-616db988bd33-StreamThread-2-consumer-3edb3e5f-9f1a-499f-8732-6cd2c8b96c96 2.2.2.2/2.2.2.2 app-aaef2f83-d51c-4b6f-bbd8-616db988bd33-StreamThread-2-consumer 45              
app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-1-consumer-00e24df4-5669-4e2c-a775-8f6c4f689714 3.3.3.3/3.3.3.3 app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-1-consumer 46              
app-b8c729c9-dc1c-457b-8120-457035e84e58-StreamThread-2-consumer-1b6b2955-5dfd-4be7-8ad9-9f1b54fe6310 1.1.1.1/1.1.1.1 app-b8c729c9-dc1c-457b-8120-457035e84e58-StreamThread-2-consumer 45              
app-aaef2f83-d51c-4b6f-bbd8-616db988bd33-StreamThread-1-consumer-72cd0319-8ca7-493c-891d-3022b235ea01 2.2.2.2/2.2.2.2 app-aaef2f83-d51c-4b6f-bbd8-616db988bd33-StreamThread-1-consumer 45              
app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-2-consumer-c1a16d64-8d49-4758-ab64-2af3cd9aef0f 3.3.3.3/3.3.3.3 app-1f6b14fc-685c-49fb-83c0-54e15bca15cb-StreamThread-2-consumer 45   

上述命令的输出不断变化 - 从 0 到某个变量。理想情况下,一段时间后它应该变得稳定。

是否有用于 kafka 流平衡(重新平衡)的可调参数/配置

问题:

  1. 是什么导致应用程序在启动时不断地重新平衡(即使没有错误/异常等)。

  2. 是否有任何可调参数可以帮助我们避免重新平衡?


查看您添加的日志,消费者 Pod 正在启动,所以我猜想其他 2 个 Pod 可能会滚动重新启动,因此每次一个停止和一个启动时都会重新平衡。

虽然 Kafka 在运行重新平衡时速度不快,但由于在此过程中组之间存在聊天,所以虽然 Kafka 很快 - 虽然分区可能会分配给一个消费者,但组只有在所有消费者都完成分配后才开始消费,并且分配的发现只会发生在 poll 方法中(参见https://chrisg23.blogspot.com/2020/02/why-is-pausing-kafka-consumer-so.html https://chrisg23.blogspot.com/2020/02/why-is-pausing-kafka-consumer-so.html).

因此,加快进程的方法是更频繁地轮询,以便您更快地了解更改,但需要权衡 - 如果在正常运行中主题不忙,那么将会有很多旋转什么都不做。

但是,您不太清楚无限是什么意思。如果您的意思是该应用程序实际上只是重新平衡,那么请参阅我上面的评论。可能是 pod 不断上升和下降(心跳消失),或者轮询需要很长时间 - 您是否为每条记录执行大量 I/O?从日志和 Pod 名称中可以明显看出重新启动的情况。过多的轮询也会导致警告消息,建议您增加max.poll.interval.ms或减少max.poll.records

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Streams 应用程序无尽的重新平衡 的相关文章

  • 如何将 javax.persistence.Column 定义为 Unsigned TINYINT?

    我正在基于 MySQL 数据库中的现有表创建 Java 持久性实体 Bean 使用 NetBeans IDE 8 0 1 我在这个表中遇到了一个字段 其类型为 无符号 TINYINT 3 我发现可以执行以下操作将列的类型定义为 unsign
  • Java:无法从同一包中的不同类访问静态变量

    这很奇怪 因为我有一个可以访问 Frame dimension getWidth 的 Character 类 及其伙伴 getHeight 但是当我想在 Map 类中使用它时 Eclipse 强调了它并且无法给我反馈 运行该程序最终会出现
  • 是否有任何简单(且最新)的 Java 框架可用于在 Swing 应用程序中嵌入电影?

    我正在构建一个小型 Swing 应用程序 我想在其中嵌入一部电影 重要的是 这个应用程序是一个 WebStart 应用程序 并且该库应该能够打包在我启动的 jnlp 中 即 不依赖于本机库 我知道并尝试过 JMF 但我认为与其他框架相比 其
  • Spring Data JPA 选择不同

    我有一个情况 我需要建立一个select distinct a address from Person a 其中地址是 Person 内的地址实体 类型的查询 我正在使用规范动态构建我的 where 子句并使用findAll Specifi
  • 在 Wildfly 中与 war 部署共享 util jar 文件

    假设我有一个名为 util jar 的 jar 文件 该 jar 文件主要包含 JPA 实体和一些 util 类 无 EJB 如何使这个 jar 可用于 Wildfly 中部署的所有 war 无需将 jar 放置在 war 的 WEB IN
  • Integer.parseInt("0x1F60A") 以 NumberformatException 结束

    我尝试从数据库中获取长字符串内的表情符号代码 格式如下 0x1F60A 所以我可以访问代码 但它将是String 起初 我尝试通过执行以下操作来转换变量tv setText beforeEmo getEmijoByUnicode int e
  • 是否可以使用 Flying Saucer (XHTML-Renderer) 将 css 解析为类路径资源?

    我正在尝试将资源打包到 jar 中 但我无法让 Flying Saucer 在类路径上找到 css 我无法轻松构建 URL 来无缝解决此问题 https stackoverflow com questions 861500 url to l
  • 如何根据运行的 jar 的结果让我的 ant 任务通过或失败?

    我正在运行 CrossCheck 无浏览器 js 单元测试 作为 ant 脚本的一部分 如果 CrossCheck 测试失败 我希望 ant 报告失败 这是 build xml 中的相关部分
  • ConcurrentHashMap 内部是如何工作的?

    我正在阅读有关 Java 并发性的 Oracle 官方文档 我想知道Collection由返回 public static
  • 需要使用 joda 进行灵活的日期时间转换

    我想使用 joda 解析电子邮件中的日期时间字符串 不幸的是我得到了各种不同的格式 例如 Wed 19 Jan 2011 12 52 31 0600 Wed 19 Jan 2011 10 15 34 0800 PST Wed 19 Jan
  • 内部存储的安全性如何?

    我需要的 对于 Android 我需要永久保存数据 但也能够编辑 并且显然是读取 它 用户不应访问此数据 它可以包含诸如高分之类的内容 用户不得对其进行编辑 我的问题 我会 并且已经 使用过Internal Storage 但我不确定它实际
  • 读取电子邮件的文本文件转换为 Javamail MimeMessage

    我有一个电子邮件原始来源的文本文件 直接从 gmail 复制 如果您单击 查看原始文件 您就会看到它 我想读入该文件并将其转换为 MimeMessage 如果您好奇为什么 我设置了 JavaMaildir 并且需要用电子邮件填充它的收件箱以
  • GWT 2.3 开发模式 - 托管模式 JSP 编译似乎不使用 java 1.5 兼容性

    无法编译 JSP 类 生成的 servlet 错误 DefaultMessage 上次更新 0 日期 中 0 时间 HH mm ss z 语法 错误 注释仅在源级别为 1 5 时可用 在尝试以开发模式在 Web 浏览器中打开我的 gwt 模
  • 流中的非终结符 forEach() ?

    有时 在处理 Java Stream 时 我发现自己需要一个非终端 forEach 来触发副作用但不终止处理 我怀疑我可以用 map item gt f item 之类的方法来做到这一点 其中方法 f 执行副作用并将项目返回到流中 但这似乎
  • 在 SWT/JFace RCP 应用程序中填充巨大的表

    您将如何在 SWT 表中显示大量行 巨大是指超过 20K 行 20 列的东西 不要问我为什么需要展示那么多数据 这不是重点 关键是如何让它尽可能快地工作 这样最终用户就不会厌倦等待 每行显示某个对象的实例 列是其属性 一些 我想使用 JFa
  • java库维护数据库结构

    我的应用程序一直在开发 所以偶尔 当版本升级时 需要创建 更改 删除一些表 修改一些数据等 通常需要执行一些sql代码 是否有一个 Java 库可用于使我的数据库结构保持最新 通过分析类似 db structure version 信息并执
  • 是否可以使用 Java Guava 将函数应用于集合?

    我想使用 Guava 将函数应用于集合 地图等 基本上 我需要调整 a 的行和列的大小Table分别使所有行和列的大小相同 执行如下操作 Table
  • 将 Apache Camel 执行器指标发送到 Prometheus

    我正在尝试转发 添加 Actuator Camel 指标 actuator camelroutes 将交换 交易数量等指标 发送到 Prometheus Actuator 端点 有没有办法让我配置 Camel 将这些指标添加到 Promet
  • 在浏览器刷新中刷新检票面板

    我正在开发一个付费角色系统 一旦用户刷新浏览器 我就需要刷新该页面中可用的统计信息 统计信息应该从数据库中获取并显示 但现在它不能正常工作 因为在页面刷新中 java代码不会被调用 而是使用以前的数据加载缓存的页面 我尝试添加以下代码来修复
  • 洪水填充优化:尝试使用队列

    我正在尝试创建一种填充方法 该方法采用用户指定的初始坐标 检查字符 然后根据需要更改它 这样做之后 它会检查相邻的方块并重复该过程 经过一番研究 我遇到了洪水填充算法并尝试了该算法 它可以工作 但无法满足我对 250 x 250 个字符的数

随机推荐

  • 将文件夹的多个 csv 文件加载到一个数据框中

    我有多个 csv 文件保存在一个具有相同列布局的文件夹中 并希望将其作为 pandas 中的数据框加载到 python 中 这个问题确实与此类似thread https stackoverflow com questions 3833052
  • dequeueReusableCellWithIdentifier:forIndexPath 中断言失败:

    所以我正在为我的学校制作一个RSS阅读器并完成代码 我运行了测试 它给了我这个错误 这是它所引用的代码 UITableViewCell tableView UITableView tableView cellForRowAtIndexPat
  • Magento FPC Cache Warm 与用户组、wget、Lesti FPC

    我在 Magento 网站上使用 Lesti FPC 该网站有 10 个客户组和很多类别 产品 我创建了一个 shell 脚本 它会在一夜之间读取 sitemap xml 和 wget 的每个 url 以构建站点的缓存 这对于访客来说非常有
  • 不带 SQL 的 ContentProvider

    我有两条数据需要从外部应用程序访问并存储 根据文档 ContentProviders 是唯一可能的方式 但它也提到了外部存储 ContentProviders 实现了一个类似数据库的 接口 对于两条数据来说使用数据库是非常不必要的 我宁愿将
  • 如何将 XMLGregorianCalendar 与仅日期部分(日、月、年)进行比较?

    我正在开发一个与 spring struts Web 应用程序集成的 Web 服务 在 XSD 中有一个 XMLGregorianCalendar 类型属性 假设属性名称是trxDate 在 SOAPUI 测试应用程序中 如果我使用以下命令
  • Java:使用 Bouncy Castle 进行 PGP 加密

    我正在尝试使用 PGP 实现加密 并且我的加密方法成功加密了输入字符串 但是当我尝试解密它以验证加密是否正确完成时 该字符串不会被解密 我尝试了两种方法 第一种方法使用文件输出流编写加密字符串和第二种方法使用字节数组输出流 文件输出流创建一
  • Angular 2 + RxJS:带有 .share() 运算符的异步管道

    当使用async管道上的可观察对象正在使用 share 运算符 由于后端计算成本高昂 我偶然发现了这种行为 data new Observable observer gt let counter 0 observer next counte
  • Angular UI Bootstrap Popover 添加关闭按钮

    我有以下弹出窗口 并尝试添加一个关闭按钮以将其关闭 directive popoverHtmlUnsafePopup function use strict return restrict EA replace true scope tit
  • libvirt 和 VirtualBox / 入门

    我正在尝试使用 VirtualBox 作为虚拟化解决方案来开始使用 libvirt 我安装了所有内容 并且 VirtualBox 本身在使用 VBoxHeadless 命令时正在运行 但是 libvirt 无法连接到 VirtualBox
  • 运行“docker build”时不会反映对我的 dockerfile 的更改

    Docker 初学者在这里 我正在尝试通过调用来构建 docker 映像docker build t my image 并对失败的行进行 dockerfile 更改 我目前在这一行遇到问题 RUN apt get install qy lo
  • Spring MVC:不反序列化 JSON 请求正文

    我正在开发一个 Spring MVC 项目 我需要做的任务之一要求我拥有用户在 POST 请求中发送的一串 JSON 数据 我知道 Spring 会使用 Jackson 将 JSON 反序列化为对象 但是如果我尝试如下操作 RequestM
  • 了解 SQL Server 中的锁定行为

    我尝试重现问题 1 的情况 在桌子上 获取并填充了来自 wiki 的 隔离 数据库系统 2 的数据 在 SQL Server 2008 R2 SSMS 中 我执行了 1 首先在SSMS的第一个选项卡 窗口 中 transaction iso
  • .ico 的理想大小

    位于窗口顶角的 ico 文件的理想大小是多少 简短回答 16 x 16 像素 长答案 ico 文件实际上可以包含多种颜色深度的多个图像 您可以在单个文件中提供 16x16 32x32 48x48 和 64x64 操作系统将选择最好的一个进行
  • 使用sql查询将字符串转换为int

    如何在 SQL Server 2005 上使用 SQL 查询将字符串转换为整数 你可以使用投射或转换 http msdn microsoft com en us library ms187928 SQL 90 aspx SELECT CAS
  • 将数据属性传递给模态引导程序

    a class my link href modal link a 我有这个链接来打开引导模式 但我需要传递数据属性 data val 我尝试使用 javascript 但没有得到它 你能帮我么 您可以收听show bs modal模式上的
  • 如何在 Asp.Net MVC 中使用属性路由为操作生成 URL

    public class HomeController Controller Route Users about Route Users WhoareWe Route Users OurTeam Route Users aboutCompa
  • Java Fx 将场景大小调整为舞台

    我进行了搜索 但找不到任何与我在 Fx 中遇到的问题相近的内容 我正在使用 Java Fx JDK 8 并且在调整场景大小时遇到 问题 下面的代码在场景图中一次仅加载一个屏幕 并在屏幕之间切换 问题是当我调整舞台大小时 场景没有随舞台调整大
  • Nexus S 上的 OpenGL ES 黑色纹理

    在 Nexus One 上运行的 OpenGL 代码在 Nexus S 上无法正常运行 纹理似乎没有渲染 纹理应该在的地方只剩下黑色 有人有什么想法吗 The 此处给出的已接受答案 https stackoverflow com quest
  • 第二次绑定源时Datagridview完全隐藏

    private void populateListingGrid try Dictionary
  • Kafka Streams 应用程序无尽的重新平衡

    我们正在运行一个卡夫卡流应用程序并遇到一个奇怪的问题 我们正在使用全局状态存储和多个其他状态存储 我们的应用程序已加载所有数据 状态存储中现在有大量信息 现在 当我们尝试关闭应用程序并再次将其恢复 一些配置更改 时 它会进入无休止的重新平衡