如果leader没有死但是无法接收Kafka中的消息会发生什么?单点故障?

2024-03-23

我有 3 个经纪人,3 个分区。每个代理都是一个分区的领导者和所有分区的 ISR。 假设我已经在端口上运行了代理19092,29092,39092分别。

19092 - partition 0
29092 - partition 1
39092 - partition 2

半经纪人测试:

我想这样命名!因为它只允许 OUTPUT 而不允许 INPUT

现在,我添加了以下 iptables 规则:

iptables -A INPUT -p tcp --dport 29092 -j DROP

并在生产者中:

bin/kafka-console-producer --broker-list 10.54.8.172:19092 --topic ftest

上述 iptables 规则会阻止 INPUT 访问,但不会限制代理通过 Zookeeper 更新其活动状态。 所以zookeeper不会认为它已经死了,因此不会为分区1进行领导者选举。

但是,由于规则的原因,生产者无法连接到它,因此会抛出错误。

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for ftest-1: 1778 ms has passed since batch creation plus linger time

我已手动完成此操作,但可能还有其他原因导致 INPUT 访问被阻止(某些恶意软件、DDoS 或其他任何原因)。

在 iptables 规则之前:

Metadata for ftest (from broker 1: 10.54.8.172:19092/1):

 3 brokers:

  broker 2 at 10.54.8.172:29092

  broker 1 at 10.54.8.172:19092

  broker 3 at 10.54.8.172:39092

 1 topics:

  topic "ftest" with 3 partitions:

    partition 2, leader 3, replicas: 3,1,2, isrs: 3,1,2

    partition 1, leader 2, replicas: 2,3,1, isrs: 2,3,1

    partition 0, leader 1, replicas: 1,2,3, isrs: 1,2,3

iptables 规则之后:

Metadata for ftest (from broker 1: 10.54.8.172:19092/1):

 3 brokers:

  broker 2 at 10.54.8.172:29092

  broker 1 at 10.54.8.172:19092

  broker 3 at 10.54.8.172:39092

 1 topics:

  topic "ftest" with 3 partitions:

    partition 2, leader 3, replicas: 3,1,2, isrs: 3,1,2

    partition 1, leader 2, replicas: 2,3,1, isrs: 2

    partition 0, leader 1, replicas: 1,2,3, isrs: 1,2,3

因为,只有一位领导者,而且是dead(从某种意义上说,它无法接收任何消息),不是单点故障?

我认为,理想情况下 Zookeeper 之间必须有 2 种方式的通信 和卡夫卡经纪人。不是吗?卡夫卡允许吗?如果是这样,怎么办?

此外,当 29092 被阻止输入访问时,其 ISR 会缩小 至 1。

可能是因为无法接收任何消息 (心跳)来自其他 2 个经纪人。

如果它可以连接(输出已启用),那么它可以写入它们并且 为了使复制得到确认,它需要 INPUT 访问权限。

所以 INPUT 和 OUTPUT 也应该在这里。

经纪人29092在这里就没什么用了。让系统处于不可恢复的状态!


您的问题可能最好通过了解 Kafka 如何利用 Zookeeper 原语来维护和组织集群状态来回答。

在 Kafka 中,领导选举是由充当控制器的经纪人之一精心策划的。只有一个控制器,它是使用zookeeper从代理中选举出来的。

现在,每个代理将自己注册为 Zookeeper 中的“临时节点”。因此,发起 zK 会话的代理通过使用定期心跳(zK 术语中的刻度)来维护成员资格。如果代理未能在超时间隔内勾选,zookeeper 会删除该节点和已注册以接收该事件通知的 Kafka 控制器(通过zK手表 https://zookeeper.apache.org/doc/r3.4.0/zookeeperProgrammers.html#ch_zkWatches) 收到通知。如果失败的代理是分区的领导者,这将触发新的领导者选举。控制器处理领导者选举并通知所有经纪人。

所以,是的,Kafka 和 zK 之间存在双向通信 - 但就分区领导者选举而言,这并不是每个代理和 zK 之间的直接双向通信。有一个中间人作为控制者。

在您的测试中,由于控制器永远不会收到代理 2 发生故障的通知,因此该代理仍然是分区 1 的领导者。

从现在开始,我推测

输入被阻止的代理 2 无法接收元数据更新,因此它通过将 ISR 缩小到自身来保护自己。这有可能help https://cwiki.apache.org/confluence/display/KAFKA/KIP-343%3A+Add+a+Controller+Heartbeat+Mechanism#KIP-343以及。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如果leader没有死但是无法接收Kafka中的消息会发生什么?单点故障? 的相关文章

  • 如何在Golang中创建kafka消费者组?

    可用的库是sarama https github com Shopify sarama 或其扩展萨拉玛簇 https github com bsm sarama cluster 但是没有提供消费者组示例 不在sarama https god
  • Kafka Streams 在 HDFS 上查找数据

    我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应
  • GCP Dataproc 作业未找到存储在存储桶中的 SSL pem 证书

    我有一个 GCP Dataproc 集群 我正在尝试部署一个 pyspark 作业 该作业使用 SSL 生成一个主题 pem 文件存储在存储桶 gs dataproc kafka code code 中 我正在使用下面所示的代码访问 pem
  • WARN 获取相关 ID 为 1 的元数据时出错:{MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

    当我使用 kafka 运行以下命令时0 9 0 1 我收到这些警告 1 你能告诉我我的主题有什么问题吗 我正在与在 ec2 中运行的 kafka 经纪人交谈 kafka console consumer sh new consumer bo
  • Kafka服务器未远程连接zookeeper服务器

    我正在尝试将 kafka 服务器 在 Windows 系统上 连接到 Zookeeper 服务器 我面临着 Opening socket connection to server 10 160 10 25 10 160 10 25 2181
  • 通过 Kafka 消费者重试维持订单保证

    我正在为基于 Kafka 的数据处理管道中的消费者重试设计一个架构 我们正在使用 Kafka 生产者和消费者 并且正在考虑重试主题 如果消费出错 将在这些主题上发送消息 将会有消费者以一定的节奏运行这些重试主题 我读了很多参考架构 但没有一
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • 如何处理Kafka流中的不同时区?

    因此 我正在评估 Kafka Streams 及其功能 看看它是否适合我的用例 因为我需要每 15 分钟 每小时 每天聚合传感器数据 并发现它由于其窗口功能而很有用 因为我可以通过应用创建窗口windowedBy on KGroupedSt
  • 如何在 Spring Kafka 中以编程方式设置 Jsonserializer Type Value 方法

    所以我无法仅使用 yaml 为 JsonSerializer 配置 JavaType 方法 还不确定原因 但与此同时 我如何以编程方式设置它 我在文档中看到了它的代码 但是该代码到底需要在哪里运行 Spring Kafka JsonDese
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • 使用Spring Cloud Stream Kafka动态更改instanceindex

    如同 在运行时更改 spring cloud stream 实例索引 计数 https stackoverflow com questions 37579939 changing spring cloud stream instance i
  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的

随机推荐