Kafka Connect 进入重新平衡循环

2024-04-18

我刚刚部署了 Kafka Connect(我只使用连接源 MQTT)应用程序位于两个实例的集群上(2 个容器上 机器),现在它似乎进入了一种重新平衡循环,我一开始有一点数据,但没有新数据出现。这就是我在日志中得到的内容。

[2017-08-11 07:27:35,810] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-592bcc91-9d99-4c54-b707-3f52d0f8af50', leaderUrl='http:// 10.120.233.78:9040/', offset=2, connectorIds=[SourceConnector1], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1009)
[2017-08-11 07:27:35,810] WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:679)
[2017-08-11 07:27:35,810] INFO Current config state offset 1 is behind group assignment 2, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder:723)
[2017-08-11 07:27:36,310] INFO Finished reading to end of log and updated config snapshot, new config log offset: 1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:727)
[2017-08-11 07:27:36,310] INFO Current config state offset 1 does not match group assignment 2. Forcing rebalance. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:703)
[2017-08-11 07:27:36,311] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1030)
[2017-08-11 07:27:36,311] INFO Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1056)
[2017-08-11 07:27:36,311] INFO (Re-)joining group source-connector11234 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:381)
[2017-08-11 07:27:36,315] INFO Successfully joined group source-connector11234 with generation 28 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:349)
[2017-08-11 07:27:36,317] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-592bcc91-9d99-4c54-b707-3f52d0f8af50', leaderUrl='http:// 10.120.233.78:9040/', offset=2, connectorIds=[SourceConnector1], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1009)
[2017-08-11 07:27:36,317] WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:679)
[2017-08-11 07:27:36,317] INFO Current config state offset 1 is behind group assignment 2, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder:723

我也遇到了类似的问题,在 mesos 集群上运行两个单独的容器 - 最终的解决方案是一个烦人的解决方案,没有在任何地方记录:

使用奇数个容器!

一些分布式系统依靠其工作人员来选举领导者。如果有两个,他们就会各自投票给对方并陷入循环。这似乎也是这里发生的事情。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Connect 进入重新平衡循环 的相关文章

  • Kafka 中的“__consumer_offsets”主题是什么

    当我运行此命令时 我得到 2 个主题 我知道我创建了测试主题 但我看到了一个名为 consumer offsets 的附加主题 从名称上看 它与消费者抵消有关 但它是如何使用的呢 bin kafka topics sh list zooke
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • Spark:将 bytearray 转换为 bigint

    尝试使用 pyspark 和 Spark sql 将 kafka 键 二进制 字节数组 转换为 long bigint 会导致数据类型不匹配 无法将二进制转换为 bigint 环境详情 Python 3 6 8 Anaconda custo
  • Kafka 一遍又一遍地重放消息 - 心跳会话已过期 - 标记协调器已死亡

    使用 python kafka api 从只有少量消息的主题中读取消息 Kafka 不断地一遍又一遍地重放队列中的消息 它从我的主题接收一条消息 返回每条消息内容 然后抛出ERROR Heartbeat session expired ma
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 使用Spring Cloud Stream Kafka动态更改instanceindex

    如同 在运行时更改 spring cloud stream 实例索引 计数 https stackoverflow com questions 37579939 changing spring cloud stream instance i
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

    Kafka JDBC Sink Connector 对于具有可选字段 parentId 的模式的消息给出空指针异常 我错过了什么吗 我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector 关于 Kafk
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • Apache kafka - 消费者延迟选项

    我想在 Kafka 中为特定主题稍稍延迟启动一个消费者 具体来说 我希望消费者在从生成消息的时间起经过特定的时间延迟后开始使用该主题的消息 Kafka 中有任何属性或选项可以启用它吗 我们对火花流做了同样的事情 我希望 这种方法也适合您 这
  • 无法找到任何实现 Connector 且名称与 io.debezium.connector.mysql.MySqlConnector 匹配的类,可用的连接器有

    使用 Kafka MySQL 和 Debezium 设置数据流管道 我是这个版本的 Kafka 3 4 0 MySQL 8 Debezium 2 2 1 Java 11 目标 我想从 MySQL 捕获所有 CDC 并将数据流式传输到 Kaf
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • KafkaConsumer.commitAsync() 行为的偏移量比以前更低

    kafka 将如何处理调用 KafkaConsumer commitAsync Map

随机推荐