如果第一个经纪人宕机,Kafka 消费者将无法消费

2023-12-04

我正在使用最新版本的kafka(kafka_2.12-1.0.0.tgz)。我已经设置了带有 3 个代理的简单集群(只是在每个实例的属性文件中更改了broker.id=1 和listeners=PLAINTEXT://:9092)。集群启动后,我使用以下命令创建了主题

./kafka-topics.sh --create    --zookeeper localhost:2181  --replication-factor 3     --partitions 13    --topic demo

然后使用以下命令启动kafka消费者和生产者

./kafka-console-producer.sh --topic  demo  --broker-list localhost:9094,localhost:9093,localhost:9092

./kafka-console-consumer.sh --group test --bootstrap-server localhost:9094,localhost:9093,localhost:9092  --topic demo

当所有经纪人都起来时,一切都好。但是,如果我先杀死(按启动顺序)代理消息将发送到代理,但消费者无法接收任何消息。消息不会丢失。启动该代理消费者后立即收到消息。

关闭broker实例后consumer的日志:

[2018-01-09 13:39:31,130] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,132] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 1 的连接。经纪人 可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,344] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,451] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 1 的连接。经纪人 可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,848] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,950] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 1 的连接。经纪人 可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:32,363] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:33,092] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,216] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,218] 警告 [消费者 clientId=consumer-1, groupId=test] 异步自动提交偏移量 {demo-0=OffsetAndMetadata{偏移=3,元数据=''}, demo-1=OffsetAndMetadata{偏移=3, 元数据=''}, demo-2=OffsetAndMetadata{offset=2, 元数据=''}, demo-3=OffsetAndMetadata{offset=2, 元数据=''}, demo-4=OffsetAndMetadata{offset=1, 元数据=''}, demo-5=OffsetAndMetadata{offset=1, 元数据=''}, demo-6=OffsetAndMetadata{offset=3, 元数据=''}, demo-7=OffsetAndMetadata{offset=2, 元数据=''}, demo-8=OffsetAndMetadata{offset=3, 元数据=''}, demo-9=OffsetAndMetadata{offset=2, 元数据=''}, demo-10=OffsetAndMetadata{偏移=3, 元数据=''}, demo-11=OffsetAndMetadata{偏移=2, 元数据=''}, demo-12=OffsetAndMetadata{offset=2,metadata=''}} 失败:偏移 提交失败并出现可重试异常。您应该重试提交 偏移量。根本错误是:协调器不可用。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:39:34,219] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 1 的连接。经纪人 可能不可用。 (org.apache.kafka.clients.NetworkClient)

再次启动缺少broker后的consumer日志:

[2018-01-09 13:41:21,739] 错误 [消费者 clientId=consumer-1, groupId=test] 分区 demo-0 上偏移量 3 处的偏移量提交失败: 这不是正确的协调员。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:21,739] 警告 [消费者 clientId=consumer-1, groupId=test] 异步自动提交偏移量 {demo-0=OffsetAndMetadata{偏移=3,元数据=''}, demo-1=OffsetAndMetadata{偏移=3, 元数据=''}, demo-2=OffsetAndMetadata{offset=2, 元数据=''}, demo-3=OffsetAndMetadata{offset=2, 元数据=''}, demo-4=OffsetAndMetadata{offset=1, 元数据=''}, demo-5=OffsetAndMetadata{offset=1, 元数据=''}, demo-6=OffsetAndMetadata{offset=3, 元数据=''}, demo-7=OffsetAndMetadata{offset=2, 元数据=''}, demo-8=OffsetAndMetadata{offset=3, 元数据=''}, demo-9=OffsetAndMetadata{offset=2, 元数据=''}, demo-10=OffsetAndMetadata{偏移=3, 元数据=''}, demo-11=OffsetAndMetadata{偏移=2, 元数据=''}, demo-12=OffsetAndMetadata{offset=2,metadata=''}} 失败:偏移 提交失败并出现可重试异常。您应该重试提交 偏移量。根本错误是:这不是正确的 协调员。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,353] 错误 [消费者 clientId=consumer-1, groupId=test] 分区 demo-0 上偏移量 3 处的偏移量提交失败: 这不是正确的协调员。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,354] 警告 [消费者 clientId=consumer-1, groupId=test] 异步自动提交偏移量 {demo-0=OffsetAndMetadata{偏移=3,元数据=''}, demo-1=OffsetAndMetadata{偏移=3, 元数据=''}, demo-2=OffsetAndMetadata{offset=2, 元数据=''}, demo-3=OffsetAndMetadata{offset=2, 元数据=''}, demo-4=OffsetAndMetadata{offset=1, 元数据=''}, demo-5=OffsetAndMetadata{offset=1, 元数据=''}, demo-6=OffsetAndMetadata{offset=3, 元数据=''}, demo-7=OffsetAndMetadata{offset=2, 元数据=''}, demo-8=OffsetAndMetadata{offset=3, 元数据=''}, demo-9=OffsetAndMetadata{offset=2, 元数据=''}, demo-10=OffsetAndMetadata{偏移=3, 元数据=''}, demo-11=OffsetAndMetadata{偏移=3, 元数据=''}, demo-12=OffsetAndMetadata{offset=2,metadata=''}} 失败:偏移 提交失败并出现可重试异常。您应该重试提交 偏移量。根本错误是:这不是正确的 协调员。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Thanks


尝试检查 server-*.properties 文件中的“offsets.topic.replication.factor”

例如:

############################# Internal Topic Settings       
# The replication factor for the group metadata internal topics    
# For anything other than development testing, a value greater than 1 is  recommended for to ensure availability such as 3.
offsets.topic.replication.factor=3

http://kafka.apache.org/documentation/#brokerconfigs

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如果第一个经纪人宕机,Kafka 消费者将无法消费 的相关文章

  • 当我按键对数据进行分区,然后向 Kafka 中的主题添加新分区时,会发生什么?

    当我按键对数据进行分区 然后向 Kafka 中的主题添加新分区时 会发生什么 现有记录是否会发生变化 未来的数据将如何分区 当新分区添加到特定主题时 现有数据的分区不会改变 Kafka 不会尝试重新分发现有记录 此修改只会对新记录产生影响
  • 如何在Golang中创建kafka消费者组?

    可用的库是sarama https github com Shopify sarama 或其扩展萨拉玛簇 https github com bsm sarama cluster 但是没有提供消费者组示例 不在sarama https god
  • 谁在为kafka集群设置授权

    我有一个 3 节点 Kafka 集群和 2 个用于生产者和消费者的 kafka 客户端 我已启用 SSL 身份验证 我想为集群启用授权 我已在代理节点的 server properties 中添加了以下属性 authorizer class
  • TCP 代理:在后端不可用时保持连接

    在 Docker 设置的上下文中 我想使用类似大使的模式来允许某些容器 例如数据库服务器 正常重新启动 而不必重新启动所有依赖的容器 例如 Web 服务器 并且没有错误消息 因为 数据库服务器不可用 因此 我想知道 是否有一个 TCP 代理
  • 使用 Kafka Streams 进行 OpenTracing - 如何?

    我正在尝试将 Jaeger 跟踪集成到 K Streams 中 我计划将跟踪添加到几个最重要的管道中 并且想知道将 Traceid 从一个管道传递到另一个管道的好方法是什么 这是我到目前为止所做的 在流处理管道开始时 我启动一个服务器范围并
  • Kafka中如何实现强一致性?

    尝试了解 Kafka 中的一致性维护 请找出场景并帮助理解 Number of partition 2 Replication factor 3 Number of broker in the cluster 4 那么 为了实现强一致性 需
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • Kafka服务器未远程连接zookeeper服务器

    我正在尝试将 kafka 服务器 在 Windows 系统上 连接到 Zookeeper 服务器 我面临着 Opening socket connection to server 10 160 10 25 10 160 10 25 2181
  • 通过 Kafka 消费者重试维持订单保证

    我正在为基于 Kafka 的数据处理管道中的消费者重试设计一个架构 我们正在使用 Kafka 生产者和消费者 并且正在考虑重试主题 如果消费出错 将在这些主题上发送消息 将会有消费者以一定的节奏运行这些重试主题 我读了很多参考架构 但没有一
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 如何复制或配置kafka connect插件文件?

    我已经从以下位置下载了插件文件https www confluence io connector kafka connect cdc microsoft sql https www confluent io connector kafka
  • Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

    Kafka JDBC Sink Connector 对于具有可选字段 parentId 的模式的消息给出空指针异常 我错过了什么吗 我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector 关于 Kafk
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个

随机推荐