Kafka 架构及原理分析

2023-11-19

Kafka 架构及原理分析

Kafka适合什么样的场景?

它可以用于两大类别的应用:

  1. 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
  2. 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

为了理解 Kafka 是如何做到以上所说的功能,从下面开始,我们将深入探索Kafka 的特性。

定位

  • 消息中间件
  • 消息引擎
  • 分布式实时流处理平台

简介

使用场景

  • 大数据领域
    • 网站行为分析
    • 日志聚合
    • 应用监控
    • 流式数据处理
    • 在线和离线数据分析
  • 数据集成
    • 消息导入 MaxCompute、OOS、RDS、Hadoop、HBase 等离线数据仓库
  • 流计算集成
    • 集成流计算引擎
      • StreamCompute
      • E-MapReduce
      • Spark
      • Storm

架构

依赖 Zookeeper 实现配置和节点管理

Kafka架构

如上图所示,一个 Kafka 集群架构中:

  • 3 台 Broker 。
  • 两个 Topic : Topic0 和 Topic1 。
    • Topic0 有 2 个分区: partition0 和 partition1 ,每个分区一共 3 个副本。
    • Topic1 只有 1 个分区: partition0 ,每个分区一共 3 个副本。
  • 图中红色字体的副本代表是 Ieader ,黑色字体的副本代表是 follower 。
  • 绿色的线代表是数据同步。蓝色的线是写消息,橙色的线是读消息,都是针对 leader 节点。
  • 有两个消费者组的两个分区。
    • 第一个消费者组,消费了 topic0 的两个分区。
    • 第二个消费者组,既消费 topic0 ,又消费 topic1 :
      • 第 1 个消费者,消费 topic0 的 partition0 ,还消费 topic1 的 partition0 。
      • 第 2 个消费者,消费 topic0 的 partition1。
      • 第 3 个消费者,没有 partition 可以消费。

Broker

message
message
Producer
Broker
Consumer
Broker
  • 只支持一种消费模式poll

  • 支持一次性获取多条(参数传入数目)

Topic

  • 使用Topic的订阅和发送,来实现生产者和消费者的关联,多对多
message
message
message
Producer
Broker
Topic
Topic
Consumer
Topic
  • Topic 分区扩展,增强并发访问能力
message
message
message
Producer
Broker
Topic
Topic
Partition1
Partition2
Partition3
Partition4
Consumer
Topic

副本机制

message
message
message
message
message
message
message
message
Producer
Broker
Broker
Partition0
Partition1
Partition0
Partition1
Consumer
副本数目一定小于等于节点数目
  • 副本同步主节点数据,但是不允许读 follow 节点,避免读写不一致的问题,降低延迟。

存储

存储文件

  • *.index 索引文件
  • *.log 数据文件
  • *.timeindex 时间戳索引文件

数据分段(针对文件过大,超出 1G)

  • segment

消费分组

消费分组

  • 消费组数目小于等于 Topic 数目
  • 消费者可以消费多个分区

消费编号

消费编号

  • 连续消费
  • 切换消费者
  • consumer_offset-[0~49] 保存消费者消费的偏移量

数据多写支持

业务场景:数据同步存储到 mysql、ES

基于 binlog 实现主从复制

主从复制

canal

伪装为 slave 节点,进行数据同步,解析 binlog,可以对接 kafka,实现数据的多写。

数据多写

MYSQL的数据修改,通过 kafka 完成数据变更的自动推送,实现多写操作。

Kafka 的进阶功能

消息幂等性

enable.idempotence=true

  • PID Producer ID 生产者编号

  • sequence number 消费者编号

  • 只能保证单分区、单一会话

事务

操作 API

  • 初始化producer.initTransactions()
  • 开启producer.beginTransaction()
  • 提交producer.commitTransaction()

场景:

  • 发送多条消息

  • 发送消息到多个 topic 或多个 partition

  • 消费以后发出消息 consume-process-produce

实现原理:

  • 2PC
  • Transaction Coordinator 协调者
  • 事务日志:topic记录 __transaction_state
  • 生产者事务 ID:transaction.id 发送前进行参数设置,可以使用 UUID

事务原理

1.initTansactions API 注册事务 ID
3.生产者把消息写入
4.事务完成后更新消息状态为已提交
4'.事务回滚
Producer
Coordinator
2.记录事务日志
目标分区
事务状态
删除分区中冗余数据
消费者消费消息

特性

  • 高吞吐、低延迟
  • 高伸缩性(Partition 分区)
  • 持久性、可靠性(副本机制,持久化方案)
  • 容错性(副本机制、节点选举)
  • 高并发

原理分析

生产者原理

KafkaProducer

生产者

  • ProducerInterceptor 拦截器
    • onSend
    • onAcknowledgement
    • close
    • configure
  • Serializer 序列化器
    • key、value
    • Protobufer
  • Partition 分区器

分区路由

  • 指定分区:使用指定的分区
  • 没有指定分区
    • 定义了分区器:按自定义的规则
    • 未定义分区器:
      • key 非空:hash 以后取余
      • key 为空: 整数自增取模

累加器

累加器

  • 先放入累加器
  • 满了或新建状态,唤醒发送线程

服务端响应

ack

  • 奇数副本节点,确保投票、同步成功(半数工作正常的节点确定 )

    • Leader 选举
    • 数据同步是否完成

    哪些节点等待同步完成?

    • 服务端 ISR: in-sync replica set
      • 动态节点,保留所有工作正常的节点信息
      • 移除规则:
        • 和 Leader 节点保持同步的最大时间间隔 replica.lag.time.max.ms
        • 大于间隔,移除;反之,加入。

    如何判断消息发送成功?

    => 见下一节 服务端 ACK

服务端 ACK

不等待 ACK

=> props.put(“acks”,“0”)

效率最高,可靠性低

不等待 ack

默认:Leader 落盘成功则返回 ACK

=> props.put(“acks”,“1”)

可靠性较低,还是有丢失消息风险

落盘 ack

Leader 和全部 Follpwers 落盘返回 ACK

=> props.put(“acks”,"-1") 或 props.put(“acks”,“all”)

性能最差,可靠性高,但是还是有可能会带来问题,最后环节响应 ACK 失败,发送端如果设置了 retries 重发参数,会发生消息重复的问题。

可靠 ack

分区存储

存储形式

特点

  • 分区存储对应一个目录 xxx-[分区下标]

分区

副本的存储

如图:topic 对应 3 个分区,每个分区一共 3 个副本。

副本

  • 为什么没有像 MySQL 那样设置读写分离?
    • 单调的读写一致性
    • 不用考虑复杂的读写一致性问题

查看副本情况

查看副本

  • 3个分区

  • 3个副本

  • ISR: 当前和 Leader 节点保持同步的节点集合

如图:topic 对应 4 个分区,每个分区 2 个副本

副本2

查看副本情况

副本2

  • 4个分区
  • 2个副本

副本分配规则

AdminUtils.scala -> assignReplicasToBrokers

  • 副本因子不能大于 Broker 的个数
  • 第一个分区(编号为0)的第一个副本放置位置式随机从 brokerList 选择的
  • 其他分区的第一个副本放置位置相对于第 0 个分区依次往后移 nextReplicaShift

分配规则

日志存储格式

日志格式

segment 分段

分割方式

  • 大小分割:log.segment.bytes 单个日志段的最大大小,默认 1073741824 -> 1G

  • 时间分割:

    • log.roll.hours 新日志段轮转时间间隔(小时为单位),次要配置为log.roll.ms
    • log.roll.ms 新日志段轮转时间间隔(毫秒为单位),如果未设置,则使用log.roll.hours
  • 索引写满:log.index.size.max.bytes offset 索引的最大字节数,默认10485760 -> 10M

索引

offset index 偏移量索引

索引

如上图,通过kafka-dump-log.sh脚本查看索引文件。

索引特点:

  • Hash 索引结构
  • 稀疏索引:
    • 并非为每一条数据建立索引
    • 建立条件:log.index.interval.bytes 添加 offset 索引字段大小间隔,默认 4096, 4KB
      • 设置的越大,代表扫描的速度越快,索引越稀疏,也更加耗内存(查找数据和维护索引的开销增大)
      • 设置的越小,代表扫描的速度越慢,索引越密集,也更加省内存
      • 时间复杂度:O(log2n)+O(m) n 表示文件个数,m 表示稀疏程度

索引结构

time index 时间戳索引

  • 定义消息的时间戳类型:log.message.timestamp.type=CreateTime/logAppendTime
    • 消息创建时间
    • 日志追加时间

索引检索过程

  • 根据 offset 匹配 segment

  • 根据 index 索引文件中的 offset 找到消息的 postion

  • 根据 position 从 log 文件中比较,最终找到消息

为什么不用 B+tree 做索引结构?

  • 业务场景决定,写消息远大于读消息频次
  • 数据量过大,大量写消息对 B+tree 压力过大

消息清理策略

  • 开关:log.cleaner.enable=true 默认为true。这意味着cleanup.policy = compact的主题默认被压缩,根据 log.cleaner.dedupe.buffer.size,128 MB的堆将被分配给清理进程。您可以根据您使用的压缩主题来查看 log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。(0.9.0.1中的显著变化

  • 策略:log.cleanup.policy=delete/compact 超出保留窗口期的日志段的默认清理策略。用逗号隔开有效策略列表。有效策略:deletecompact

    • 删除
    • 压紧,删除重复的 key:Log Cleaner默认启用。这会启动清理的线程池。

    压紧

  • 周期:log.retention.check.interval.ms=300 000 日志清理器检查是否有日志符合删除的频率(以毫秒为单位)

  • 过期定义:

    • log.retention.hours 日志删除的时间阈值(小时为单位) 默认 168 小时,即 1 个星期
    • log.retention.minutes 日志删除的时间阈值(分钟为单位),如果未设置,将使用log.retention.hours的值。
    • log.retention.ms 日志删除的时间阈值(毫秒为单位),如果未设置,将使用log.retention.minutes的值。
  • 文件限制:

    • log.retention.bytes 日志删除的大小阈值。
    • log.segment.bytes 单个日志段文件最大大小。

Leader 选举

  • 谁主持选举?Broker Controller
  • 谁参与选举?AR = ISR + OSR
    • AR :所有副本
    • ISR:保持同步的副本
    • OSR:没有保持同步的副本
    • 参数配置:unclean.leader.election.enable 指定副本是否能够不在 ISR 中选举为 Leader,会导致数据丢失,默认为 false。
  • 主从如何同步?
    • PacificA (Microsoft)

PacificA

  • 优先算法:默认设置 ISR 的第一个副本为 Leader

主从同步

主从

  • LEO:Log End Offset,下一条等待写入的消息的 offset(最新的 offset + 1)
  • HW:High Watermark,ISR 中最小的 LEO => 限制消费者最后可以消费的消息,小于 HW 的消息才可以被消费,确保一致性

LEO1

主从同步

LEO2
LEO3

  • Follower 节点会向 Leader 发送一个 fetch 请求,leader 向 follower 发送数据后,需要更新 follower 的 LEO
  • Follower 接收到数据响应后,依次写入消息并更新 LEO
  • Leader 更新 HW (ISR 最小的 LEO)

故障处理

​ Follower 故障

  • 之前记录的 HW,删除 高于 HW 标识的数据,恢复后重新从 HW 之前的 offset 开始同步数据

Leader 故障

  • 之前记录的 HW,删除 高于 HW 标识的数据,重新从新 Leader 同步数据

  • 问题:

    • 保证了副本间的数据一致性
    • 会发生消息丢失和重复

消费者原理

根据 offset 和时间戳进行消费

消费者

offset 的存储

__consumer_offsets => topic 的存储结构

  • GroupMetadata:保存了消费组中各个消费者的信息(每个消费者有编号)
  • OffsetAndMetadata:保存了消费组和各个 partition 的 offset 位移信息元数据

存储结构

  • group.id 取 hash 后 ,对50取模,获取消费组对应绑定分区的下标

  • 消费策略:auto.offset.reset,默认值为 lastest

    • none: 当前没有找到之前的 offset 时抛出异常
    • earliest: 自动从最早的消息开始消费
    • lastest:最近的 offset 开始消费
  • 提交偏移量,commit 后更新消费组的 offset

    • 自动:enable.auto.commit=true

    • 手动:enable.auto.commit=false

      • API:consumer.commitSync();

消费者分配

消费者分配在这里插入图片描述

  • RangeAssignor:默认分配原则,范围固定分配

在这里插入图片描述

  • RoundRobinAssignor:轮询方式分配


  • StickyAssignor :粘滞策略(相对均匀策略,每次基本都不一样)

分区重分配

rebalance 针对分区少,消费者多的情况

性能分析

  • 无需第三方进行消息存储,采用磁盘文件直接存储
  • 磁盘 I/O 本身速度很慢,Kafka 如何优化实现低延迟、高吞吐的目标?
    • 顺序读写 I/O
    • 索引
    • 批量读写和文件压缩
    • 零拷贝

磁盘寻址

  • 盘面旋转
  • 磁头
  • 磁道
  • 扇区
  • 扇面

Kafka 日志文件顺序存放 -> 磁盘顺序读写。

Sequential disk (磁盘顺序读写) 比 Random SSD (固态的随机读写)要更快。

零拷贝

  • 内核空间、用户空间

  • DMA (Direct Memory Access) 直接内存访问

  • 传统 I/O

    • 四次拷贝
    • 四次用户态和内核台的切换
    • 系统方法的2次调用 read、write
    • 2次 CPU 数据拷贝
  • 零拷贝:Linux sendfile 方法,省去 CPU 拷贝的过程,提升至少一倍的性能。

零拷贝实现代码

传输层封装代码:PlaintextTransportLayer

技术总结

  • 主从选举:会选择一个 broker 作为 “controller”节点。controller 节点负责检测 brokers 级别故障,并负责在 broker 故障的情况下更改这个故障 Broker 中的 partition 的 leadership 。这种方式可以批量的通知主从关系的变化,使得对于拥有大量partition 的broker ,选举过程的代价更低并且速度更快。如果 controller 节点挂了,其他 存活的 broker 都可能成为新的 controller 节点。

  • 分布式:

    • 日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性。
    • 每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。
  • 消费者:

    • 消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例.消费者实例可以分布在多个进程中或者多个机器上。
    • 如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。
    • 如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。
  • Kafka 建立在 JVM 之上:

    • 对象的内存开销非常高,通常是所存储的数据的两倍(甚至更多)。
    • 随着堆中数据的增加,Java 的垃圾回收变得越来越复杂和缓慢。
  • Kafka 对消息的存储和缓存严重依赖于文件系统:

  • 日志存储:

    • 当旧的数据保留时间超过指定时间、日志大达到规定大小后就丢弃
    • 至少保证日志包含每一个key的最终值而不只是最近变更的完整快照。这意味着下游的消费者可以获得最终的状态而无需拿到所有的变化的消息信息。
  • 高速读写:

    • 网络层相当于一个 NIO 服务, sendfile(零拷贝) 的实现是通过 MessageSet 接口的 writeTo 方法完成的.这样的机制允许 file-backed 集使用更高效的 transferTo 实现,而不在使用进程内的写缓存。线程模型是一个单独的接受线程和 N 个处理线程,每个线程处理固定数量的连接。这种设计方式经过大量的测试,发现它是实现简单而且快速的。协议保持简单以允许未来实现其他语言的客户端.

REFERENCES


image-20200927235342666

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 架构及原理分析 的相关文章

  • 内存不一致与线程交错有何不同?

    我正在编写一个多线程程序 正在研究是否应该使用volatile对于我的布尔标志 关于并发性的文档 oracle Trail 没有解释任何关于memory consistency errors以外 当不同的线程有内存一致性错误时 就会发生内存
  • Java 7 watchservice获取文件更改偏移量

    我刚刚尝试使用 Java 7 WatchService 来监视文件的更改 这是我敲出的一些代码 WatchService watcher FileSystems getDefault newWatchService Path path Pa
  • 为什么这不会绘制图像?

    我想做的是 当我运行应用程序时 它会启动线程并且图像显示 3 秒 3000 毫秒 然后线程停止运行 图片路径正确 图片文件存在 线程本身运行 但是 图像似乎没有显示 可能出什么问题了 这是我的代码 package org main impo
  • Spring - 两种不同的 POST 方法,具有相同的 URL,但生成的内容类型不同

    我有以下控制器 RequiredArgsConstructor RestController public class OwnerViewController implements ApiOwnerViewController privat
  • 我如何通过代码在 Anylogic 中创建路径空间标记元素

    我在anyloigic方面完全是菜鸟 现在我正在尝试通过代码创建简单的网络 具有两个点节点的网络 以及链接这些节点的路径 遇到一些问题 当我运行模型时 控制台显示 使用初始化 方法 但我已经知道 初始化方法在较低版本中已被弃用 我使用的是8
  • 正则表达式或用单个空格替换多个空格的方法

    你能告诉我有没有办法在java或spring中用单个空格替换多个空格 有相同的 stringUtils 函数吗 like 1 test test test test 2 test test test test 3 test test tes
  • 清理 IntelliJ 中构建的 Play 框架

    我有一个拼写错误conf routes文件导致 Play Framework 生成错误命名的类 重建项目并运行Invalidate Caches并没有解决 IntelliJ 中的问题 当我手动运行时重新生成了不正确的类文件play clea
  • Spring的@PreDestroy导致随机记录而不记录

    我正在使用 Spring 并且在终止时我让 PreDestroy 清理 bean 我不明白为什么日志记录有时会成功 而有时会失败 Using Log4j2 Logger log LogManager getLogger MyClass cl
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • 如何查找给定字符串中仅出现一次的第一个字符[关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 改造添加带有令牌和 ID 的标头

    我在获取经过身份验证的用户时遇到问题 在此之前我得到了令牌和用户 ID 现在我需要使用访问令牌和 ID 从服务器获取用户 我有标题格式 https i stack imgur com OQ87Y png 现在我尝试使用拦截器添加带有用户令牌
  • Android 改造参数化@Headers

    我正在使用 OAuth 每次发出请求时都需要将 OAuth 令牌放入标头中 我看到 Header注释 但是有没有办法让它参数化 以便我可以在运行时传入 这是概念 Header Authorization OAuth var api vers
  • 将 try catch finally 块放入另一个 finally 块中

    try catch finally try catch finally 上面的代码好不好 是的 你可以这样做 实际上 在处理想要正确关闭的流时 您甚至需要这样做 InputStream in try catch finally try in
  • Android:如何停止监听电话监听器? [复制]

    这个问题在这里已经有答案了 可能的重复 Android 为什么 PhoneCallListener 在活动完成后仍然存在 https stackoverflow com questions 11666853 android why phon
  • 如何在 Google 地图中创建自定义地图?

    我正在尝试创建一个包含我家地图的 Google 地图应用程序 卧室 浴室 厨房等 使用 GPS 我会找到我现在在家里的位置 并尝试获取到我卧室的方向 步行距离 您可以使用Google的API来获取方向 我需要知道的是 如何添加我家的自定义地
  • 为什么我们在同一台服务器上使用多个应用程序服务器实例

    我想这是有充分理由的 但我不明白为什么有时我们会在同一物理服务器上放置例如 5 个具有相同 Web 应用程序的实例 这与多处理器架构的优化有关吗 JVM 或其他允许的最大内存限制 嗯 过了很长一段时间我又看到这个问题了 一台机器上的多个 J
  • 如何将模型从 ML Pipeline 保存到 S3 或 HDFS?

    我正在尝试保存 ML Pipeline 生成的数千个模型 正如答案中所示here https stackoverflow com questions 32121046 run 3000 random forest models by gro
  • 使用从 java 程序调用的 Windows 命令提示符将具有多个连续空格的字符串作为参数传递给 jar 文件

    我想使用在另一个java程序中调用的Windows命令提示符将带有多个连续空格的字符串作为参数传递给jar文件 java 文件是这样的 它打印它的所有参数 package src public class myClass public st
  • 如何在非Spring的构造型类中使用@Autowired

    我想在此类中使用该存储库 但是当我放置像 Component 这样的构造型时 我从 IDE 收到错误 无法自动装配 未找到 身份验证 类型的 bean public class CustomMethodSecurityExpressionR
  • JAAS keytab 配置的相对路径

    我有一个系统 其中 NET 客户端使用 Kerberos 针对 Java 服务器进行身份验证 一切正常 但我正在尝试改进服务器配置 目前一个keytab根目录中需要文件C 因为我的jaas配置文件看起来像这样 Server com sun

随机推荐