Kafka原理

2023-10-27

生产者原理解析

生产者工作流程图:
在这里插入图片描述
一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 。
在主线程中由kafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。
Sender 线程负责从RecordAccumulator 获取消息并将其发送到 Kafka 中;
RecordAccumulator主要用来缓存消息以便Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B ,即32M。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为 60000,即60秒。
主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中,
RecordAccumulator内部为每个分区都维护了一个双端队列,即Deque。
消息写入缓存时,追加到双端队列的尾部;
Sender读取消息时,从双端队列的头部读取。注意:ProducerBatch 是指一个消息批次;
与此同时,会将较小的 ProducerBatch 凑成一个较大 ProducerBatch ,也可以减少网络请求的次数以提升整体的吞吐量。
ProducerBatch 大小和 batch.size 参数也有着密切的关系。当一条消息(ProducerRecord ) 流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch (如果没有则新建),查看 ProducerBatch中是否还可以写入这个ProducerRecord,如果可以写入就直接写入,如果不可以则需要创建一个新的Producer Batch。在新建 ProducerBatch时评估这条消息的大小是否超过 batch.size 参数大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch。
Sender从 RecordAccumulator 获取缓存的消息之后,会进一步将<分区,Deque>的形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群broker节点。对于网络连接来说,生产者客户端是与具体broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成<Node, List>的形式之后, Sender会进一步封装成<Node,Request> 的形式,这样就可以将 Request 请求发往各个Node了,这里的Request是Kafka各种协议请求;
请求在从sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<Nodeld, Deque>,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求(Nodeld 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.request.per. connection ,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response )。通过比较 Deque 的size与这个参数的大小来判断对应的 Node中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续发送请求会增大请求超时的可能。

Producer往Broker发送消息应答机制

kafka 在 producer 里面提供了消息确认机制。我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在构造producer 时通过acks参数指定(在 0.8.2.X 前是通过 request.required.acks 参数设置的)。这个参数支持以下三种值:

  • acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 kafka 。在这种情况下还是有可能发生错误,比如发送的对象不能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,大概率会丢失一些消息。
  • acks = 1:意味着leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 leader,但在消息被复制到 follower 副本之前 leader发生崩溃。
  • acks = all(这个和 request.required.acks = -1 含义一样):意味着 leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
acks 含义
0 Producer往集群发送数据不需要等到集群的确认信息,不确保消息发送成功。安全性最低但是效率最高。
1 Producer往集群发送数据只要 leader成功写入消息就可以发送下一条,只确保Leader 接收成功。
-1或all Producer往集群发送数据需要所有的ISR Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader发送成功和所有的副本都成功接收。安全性最高,但是效率最低。

生产者将acks设置为all,是否就一定不会丢数据呢?
否!如果在某个时刻ISR列表只剩leader自己了,那么就算acks=all,收到这条数据还是只有一个点;
可以配合另外一个参数缓解此情况: 最小同步副本数>=2

其他的生产者参数

  • acks
    acks是控制kafka服务端向生产者应答消息写入成功的条件;生产者根据得到的确认信息,来判断消息发送是否成功;

  • max.request.size
    这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 1MB
    一般情况下,这个默认值就可以满足大多数的应用场景了。
    这个参数还涉及一些其它参数的联动,比如 broker 端(topic级别参数)的 message.max.bytes参数(默认1000012),如果配置错误可能会引起一些不必要的异常;比如将 broker 端的 message.max.bytes 参数配置为10B ,而 max.request.size参数配置为20B,那么当发送一条大小为 15B 的消息时,生产者客户端就会报出异常;

  • retries和retry.backoff.ms ==> 间隔时间 避免无效的重试
    retries参数用来配置生产者重试的次数,默认值为2147483647,即在发生异常的时候进行任何重试动作。
    消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、 leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试 。如果将 retries参数配置为非零值,并且 max .in.flight.requests.per.connection 参数配置为大于1的值,那可能会出现错序的现象:如果批次1消息写入失败,而批次2消息写入成功,那么生产者会重试发送批次1的消息,此时如果批次1的消息写入成功,那么这两个批次的消息就出现了错序。
    对于某些应用来说,顺序性非常重要 ,比如MySQL binlog的传输,如果出现错误就会造成非常严重的后果;一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection 配置为1 ,而不是把retries配置为0,不过这样也会影响整体的吞吐。

  • compression.type
    这个参数用来指定消息的压缩方式,默认值为“none",即默认情况下,消息不会被压缩。该参数还可以配置为 “gzip”,“snappy” 和 “lz4”。对消息进行压缩可以极大地减少网络传输、降低网络I/O,从而提高整体的性能 。消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;

  • batch.size
    每个Batch要存放batch.size大小的数据后,才可以发送出去。比如说batch.size默认值是16KB,那么里面凑够16KB的数据才会发送。理论上来说,提升batch.size的大小,可以允许更多的数据缓冲在recordAccumulator里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。但是batch.size也不能过大,要是数据老是缓冲在Batch里迟迟不发送出去,那么发送消息的延迟就会很高。一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。

  • linger.ms
    这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入
    ProducerBatch 时间,默认值为0。生产者客户端会在ProducerBatch填满或等待时间超过linger.ms 值时发送出去。

  • enable.idempotence
    是否开启幂等性功能,详见后续原理加强;
    幂等性,就是一个操作重复做,也不会影响最终的结果!
    int a = 1;
    a++; // 非幂等操作
    val map = new HashMap()
    map.put(“a”,1); // 幂等操作
    在kafka中,同一条消息,生产者如果多次重试发送,在服务器中的结果如果还是只有一条,这就是具备幂等性;否则,就不具备幂等性!

  • partitioner.class
    用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner

自定义partitioner需要实现org.apache.kafka.clients.producer.Partitioner接口

消费者组再均衡分区分配策略

会触发rebalance(消费者)的事件可能是如下任意一种:

  • 有新的消费者加入消费组。
  • 有消费者宕机下线,消费者并不一定需要真正下线,例如遇到长时间的 GC 、网络延迟导致消费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator 会认为消费者己下线。
  • 有消费者主动退出消费组(发送LeaveGroupRequest 请求):比如客户端调用了unsubscrible()方法取消对某些主题的订阅。
  • 消费组所对应的 GroupCoorinator节点发生了变更。
  • 消费组内所订阅的任一主题或者主题的分区数量发生变化。
    将分区的消费权从一个消费者移到另一个消费者称为再均衡(rebalance),如何rebalance也涉及到分区分配策略。
    kafka有两种的分区分配策略:range(默认) 和 roundrobin(新版本中又新增了另外2种)
我们可以通过partition.assignment.strategy参数选择 range 或 roundrobin。
partition.assignment.strategy参数默认的值是range。
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor

Range Strategy

  • 先将消费者按照client.id字典排序,然后按topic逐个处理;
  • 针对一个topic,将其partition总数/消费者数得到商n和 余数m,则每个consumer至少分到n个分区,且前m个consumer每人多分一个分区;

Round-Robin Strategy

  • 将所有主题分区组成TopicAndPartition列表,并对TopicAndPartition列表按照其hashCode 排序
  • 然后,以轮询的方式分配给各消费者

Sticky Strategy

对应的类叫做: org.apache.kafka.clients.consumer.StickyAssignor
sticky策略的特点:

  • 要去达成最大化的均衡
  • 尽可能保留各消费者原来分配的分区
    再均衡的过程中,还是会让各消费者先取消自身的分区,然后再重新分配(只不过是分配过程中会尽量让原来属于谁的分区依然分配给谁)

Cooperative Sticky Strategy

对应的类叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
sticky策略的特点:

  • 逻辑与sticky策略一致
  • 支持cooperative再均衡机制(再均衡的过程中,不会让所有消费者取消掉所有分区然后再进行重分配)

消费者组再均衡流程

消费组在消费数据的时候,有两个角色进行组内的各事务的协调;
角色1: Group Coordinator (组协调器) 位于服务端(就是某个broker)
组协调器的定位:

coordinator在我们组记偏移量的__consumer_offsets分区的leader所在broker上
查找Group Coordinator的方式:
先根据消费组groupid的hashcode值计算它应该所在__consumer_offsets 中的分区编号;   分区数
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount为__consumer_offsets的分区总数,这个可以通过broker端参数offset.topic.num.partitions来配置,默认值是50;
找到对应的分区号后,再寻找此分区leader副本所在broker节点,则此节点即为自己的Grouping Coordinator;

角色2: Group Leader (组长) 位于消费端(就是消费组中的某个消费者)

再均衡流程

eager协议的再均衡过程整体流程如下图:
在这里插入图片描述

Cooperative协议的再均衡过程整体流程如下图:
在这里插入图片描述

再均衡监听器

代码示例:

package com.doitedu;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;


/**
 * 消费组再均衡观察
 */

public class ConsumerDemo2 {
    public static void main(String[] args) {
        //1.创建kafka的消费者对象,附带着把配置文件搞定
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g01");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //2.订阅主题(确定需要消费哪一个或者多个主题)
        //我现在想看看如果我的消费者组里面,多了一个消费者或者少了一个消费者,他有没有给我做再均衡
        consumer.subscribe(Arrays.asList("reb-1", "reb-2"), new ConsumerRebalanceListener() {
            /**
             * 这个方法是将原来的分配情况全部取消,或者说把所有的分区全部回收了
             * 这个全部取消很恶心,原来的消费者消费的好好的,他一下子就给他全部停掉了
             * @param collection
             */
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                System.out.println("我原来的均衡情况是:"+collection + "我已经被回收了!!");
            }
            /**
             * 这个方法是当上面的分配情况全部取消以后,调用这个方法,来再次分配,这是在均衡分配后的情况
             * @param collection
             */
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                System.out.println("我是重新分配后的结果:"+collection);
            }
        });

        while (true){
            consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
        }


    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka原理 的相关文章

随机推荐

  • 【C语言】强符号和弱符号

    1 强符号 弱符号定义 编译器在编译源程序时 无论你是变量名 函数名 在它眼里 都是一个符号而已 用来表征一个地址 编译器会将这些符号集中 存放到一个叫符号表的 section 中 那么对于两个 c文件中存在的同名的变量 编译器该怎么选择呢
  • 切换零感知 H3C H5家庭智慧无线套装牛在哪?

    我头上有犄角 我身后有尾巴 大家看到这句歌词首先想到的是 小青龙 而小编认为这是对传统无线路由器最为真实的写照 犄角 就是无线路由器的外置天线 尾巴 也就是无线路由器的电源线以及连接各个端口的网络线路 随着大家审美的变化 无线路由器这种最为
  • c++网络编程

    网络编程模型 c s 模型 客户端服务器模型 b s 模型 浏览器服务器模型 1 tcp网络流程 服务器流程 1 创建套接字 2 完善服务器网络信息结构体 3 绑定服务器网络信息结构体 4 让服务器处于监听状态 5 accept阻塞等待客户
  • uniapp如何创建项目详细介绍,多环境配置、路由配置、代理配置、请求封装等。完整的搭建一个项目环境。

    1 前期准备及前言 一般可以使用HBuilderX创建项目 为了本次博客的完整性 先讲解一下HBuilderX创建uniapp项目 下载开发工具地址 https www dcloud io hbuilderx html 由于现在项目开发都是
  • 组件间的传值和钩子函数

    组件间的传值和生命周期钩子函数 所有的生命周期钩子自动绑定 this 上下文到实例中 因此你可以访问数据 对属性和方法进行运算 这意味着 你不能使用箭头函数来定义一个生命周期方法 例如 created gt this fetchTodos
  • redis性能测试工具redis-benchmark

    redis自带性能测试工具redis benchmark 在bin目录下 redis benchmark h h ip p 端口 a密码认证 c客户端的连接数 n请求数 d 指定数据大小 q只显示每秒的查询值 redis benchmark
  • gin框架16--如何记录日志

    gin框架16 如何记录日志 介绍 案例 说明 介绍 本文主要介绍如何将日志写入文件中 取消终端输出 案例 源码 package main import github com gin gonic gin io os func main gi
  • mysql 域名访问和ip访问的区别_域名与IP地址的联系与区别

    我们也知道每一台机都有一个唯一ip地址 特别难记 所以出现了今天的DNS 域名 当我们的计算机想要和一个远程机器连接时 我们可以申请连接该机器ip地址下的DNS 例如 www baidu com 连接的时候 DNS会提供一个ip地址 供服务
  • Ubuntu应用拓展(9)——精简Ubuntu系统工具、库缺失问题

    工具缺失问题 1 usr bin time 问题 usr bin time No such file or directory 解决方法 sudo apt install time 2 usr bin mandb 问题 usr bin ma
  • 牛客小白月赛75 D矩阵

    这题的边权有1 2所以不能用0 1bfs 虽然我也不是很会用 这题是可以说是个分层图 我们要利用小根堆进行排序 让边权小的排在前面 实现小根堆有两种方式 第一种是比较巧妙的 因为优先队列默认实现的是大根堆 所以我们可以把元素取反放进去 因为
  • LCD1602液晶显示屏的工作原理图是什么呢?

    本文重点是由深圳市兴宇合电子技术人员为大家介绍LCD1602液晶显示屏的工作原理以及原理图 希望对大家有所帮助 1 LCD1602液晶显示屏工作原理如下 LCD1602液晶显示屏通过电压来改变填充在两块平行板之间的液晶材料内部分子的排列状况
  • SIFT特征提取算法总结

    转自 http www jellon cn index php archives 374 一 综述 Scale invariant feature transform 简称SIFT 是一种图像特征提取与匹配算法 SIFT算法由David G
  • Java教程【01.01】 对象和类

    Java技术栈 对象和类 什么是对象和类 在Java中 对象是具有属性和行为的实体 而类是一组定义操作和属性的规范或蓝图 类包含数据成员 变量 和方法 函数 对象是类的实例化 如何创建一个对象 要创建一个对象 必须先定义一个类 下面是一个简
  • 模块化开发的时候,sqlsession如何配置多个typeAliasesPackage,mapperLocations

    如图 我们进行模块化开发的时候 往往我们每个人的bean和mapper都不在同一个路径包内 如果我们按照以下方式配置的话 就会报异常 大致上是因为不支持
  • VPP代码阅读中文注解--crc32.h

    static always inline u32 clib crc32c u8 s int len u32 v 0 if x86 64 for len gt 8 len 8 s 8 v mm crc32 u64 v u64 s else w
  • 8月热门论文丨AI Agent会是大模型的未来发展方向吗?

    点击蓝字 关注我们 AI TIME欢迎每一位AI爱好者的加入 以下内容来源于AMiner科技 过去的8月 如果让我用一个词来总结 那就是 Agent 大模型的下半场已经拉开序幕 大厂们都纷纷表态入局 Agent OpenAI创始成员Andr
  • 头条面试官问: 100TB文件上传该怎么优化性能?

    目录 一 写在前面 二 原始的文件上传方案 三 HDFS对大文件上传的性能优化 1 Chunk缓冲机制 2 Packet数据包机制 3 内存队列异步发送机制 四 总结 一 写在前面 上一篇文章 我们聊了一下Hadoop中的NameNode里
  • tp5Internal Server Error报错解决办法

    重写apache配置文件
  • ubuntu下java8卸载

    要删除 OpenJDK 如果已安装的话 首先 检查是安装的哪个 OpenJDK包 dpkg list grep i jdk 移除 openjdk包 apt get purge openjdk 卸载 OpenJDK 相关包 apt get p
  • Kafka原理

    生产者原理解析 生产者工作流程图 一个生产者客户端由两个线程协调运行 这两个线程分别为主线程和 Sender 线程 在主线程中由kafkaProducer创建消息 然后通过可能的拦截器 序列化器和分区器的作用之后缓存到消息累加器 Recor