Kafka消息分区&producer拦截器&无消息丢失(八)

2023-11-08

上篇文章说了,acks,1代表什么都不管,即使配置了回调也不会起作用,0代表不会等待replic副本里的不会持久化,只要broker leader持久化成功则返回给producer。-1代表all,则表示全部持久化成功才返回成功给producer,Retries,batch.size:kafka,linger.ms,buffer.memory,compression.type等参数。

producer参数---Kafka从入门到精通(七)https://blog.csdn.net/ke1ying/article/details/126089250

  • 消息分区机制

producer发送过程有个很重要的步骤,就是确定发送的消息在哪个topic分区中。Producer提供了分区策略和对应的分区器(partitioner)供用户使用。新版本的会把相同key的消息发送到partition上,如果没有指定key,则会通过轮询分配均匀在topic所在分区,而对于旧版本的无法分配均匀。

自定义分区机制:

对于有key的消息,java版本的producer会通过自己的算法计算key的哈希值,然后在总分区取模分配到目标分区。但有的时候用户想实现自己的分区策略,而这又是默认partitioner无法实现的,那么此刻就可以用producer提供的自定义分区策略。


/**
 * @author keying
 */
public class AuditPartitioner implements Partitioner {

    private Random random;

    @Override
    public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String key = (String) keyObj;
        List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
        int auditPartition = partitionInfoList.size() - 1;
        return key == null || key.isEmpty() ||
                !key.contains("audit") ? random.nextInt(partitionInfoList.size() - 1) : auditPartition;
        //return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        random = new Random();
    }
}

若自定义分区机制,则需要做两件事:

  1. 先定义一个类,实现org.apache.kafka.clients.producer.Partitioner接口,主要重写partition方法。
  2. 在构造kafkaProducer的时候propertites设置partitioner参数。

Partition方法里主要接受参数有topic,key和value,还有集群元数据信息,一起来确定目标分区,而close方法则是用于关闭partitioner的,主要是为了关闭那些创建partitioner时初始化的系统资源等。

举个例子如何实现自定义的partitioner呢,假设我们有个类似审计功能,审计功能发送kafka的时候可以给他分配字符串“audit”,我们想让这类消息发到topic最后一个分区上,便于后续统一处理,而对于相同topic下的其他消息则采用随机发送的策略发送到其他分区上。

所以,用户可以根据key来指定一些策略,还可以根据value信息做一些定制化分区策略。

  • 消息序列化

网络中发送数据都是以字节的方式,kafka也不例外,它可以是字符串,一个整数,一个数组或者其他任意对象类型。序列化器(serializer)负责在producer发送将消息转换成字节数组,而与之相反,解序列化器(deserializer)则用于将consumer接受到的字节数组转换成相应的对象。

Kafka1.0.0默认提供十几种序列化器,常见的serializer用的是StringSerializer,然后其他的还有LongSerializer,IntegerSerializer等。如果是复杂的类型,比如Avro则需要自定义序列化。

  • Producer拦截器

Producer拦截器相当于一个新的功能,他可以在producer发送消息之后以及回调之前有机会对消息做些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor

按序作用于同一条消息从而形成一个拦截器,intercetpor的实现接口是producerInterceptor,其定义方法如下:

onSend(producerRecord):该方法封装进kafkaProducer.send方法中,即他运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法对消息做任何处理,但最好不要修改消息的所属topic和分区,否则影响分区计算。

onAcknowledgement(recordMetadata,Exception):该消息会在被应答之前或者消息发送失败时候调用,并且通常在producer回调触发之前调用。OnAcknoewledgement运行在producer的I/O线程中,因此不要在该方法放入很重的逻辑,否则会拖慢producer的消息发送效率。

Close:关闭interceptor,主要做一些资源清理工作。

如前所述,interceptor可能运行在多个线程中,因此具体实现时候需要用户自行确认保护线程安全。若指定多个interceptor,则producer将按照指定顺序调用他们,同时把每个interceptor中捕获的异常记录到错误日志中而不是向上传递。


/**
 * @author keying
 * @date 2022-08-07 17:24:21
 */
public class OneInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord record) {
        return new ProducerRecord(record.topic(), record.partition(), 
                record.timestamp(), System.currentTimeMillis() + "," + record.value().toString());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}




/**
 * @author keying
 * @date 2022-08-07 17:27:40
 */
public class TwoInterceptor implements ProducerInterceptor<String, String> {

    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return null;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("成功:"+successCounter);
        System.out.println("失败:"+errorCounter);
    }
}

上面例子是实现一个简单的双inteceptor组成的拦截器,第一个拦截器会在消息发送前将时间戳加入到value,第二个拦截器则会统计成功和失败的次数。

  • 无消息丢失配置

Producer采用的是异步发送消息机制,kafkaProducer.send方法仅仅把消息放入缓冲区,由一个专属的I/O线程负责提取缓冲区的消息并封装到batch中,然后发送出去。显然,整个过程存在数据丢失的窗口,若I/O线程在发送之前崩溃,则数据会丢失。

另一个问题则是消息会乱序,比如客户端依次发送两条消息到不同的分区:

Producer.send(records1);和producer.send(records2);

若此刻某些原因,网络出现瞬时抖动,导致records1发送失败,同时kafka又配置了重试机制,max.in.flight.requests.per.connection大于1(默认是5),这样会造成消息乱序,而实际场景很多情况需要包装按顺序消费。

所以这两个问题,kafka该如何规避呢?首先消息丢失很容易想到kafka的同步发送,但这样性能会很差,并不在实际场景中推荐使用。如何配置保证消息不会丢失呢?

Block.on.buffer.full = true

Acks=all 或者 -1

Retries=Integer.MAX_VALUE

Max.in.flight.request.per.connection=1

使用回调机制的send发送消息

CallBack逻辑中显式立即关闭producer,使用close(0)

Unclean.leader.election.enable=false

Replication.factor=3

Min.insync.replicas = 2

Replication.factor>min.insync.replicas

Enable.auto.commit=false

Producer端配置:

Block.on.buffer.full = true,实际上这个参数在kafka0.9.0版本已经被标记为deprecated的,并且使用max.block.ms替代,但还是推荐用户显示的设置它为true,使得内存缓冲区被填满时producer处于阻塞状态,并且停止接受新消息而不是抛出异常。否则producer生产速度过快会耗尽缓冲区,新版本0.10.0.0不用管这个参数,直接设置max.block.ms参数。

Acks = all很好理解,就是所有leader broker和副本replict里的follower都收到消息,才回复producer消息成功发送。

Retries=Integer.MAX_VALUE:这里设置无限大有点极端,想表达的是无线重试,但放心这里不会重试那些无法恢复的错误,只会重试那些可恢复的异常,所以可以放心的设置比较大的值,保证消息不会丢失。

max.in.flight.request.per.connection=1:设置为1防止消息在topic下乱序,这个设置的效果限制了producer在单个broker上连续发送的未响应请求数量。因此如果设置成1,则producer在某个broker发送响应之前将无法再给broker发送producer请求。

使用带回调的send,普通的send官方解释是fire and forget,只管把消息发出去,不管后续,如果发送失败,不会收到任何通知,这里肯定要带回调的send发送。

CallbackBack逻辑中显式处理立刻关闭producer:在calllback失败处逻辑立即使用kafkaProcuer.close(0),这样做的目的就是为了防止消息乱序问题。若不使用close关闭,默认情况下producer会被允许将未完成的消息发送出去,这样可能造成消息乱序。

Broker端配置:

Unclean.leader.election.eable = false:关闭unclean leader选举,即不允许非isr中的副本被选举成leader,从而避免broker端因为日志水位截断造成数据丢失。

Replication.factor>=3 :设置成3主要参考业界的三备份原则,强调多个副本才好。

Min.insync.replias>1:用于控制某条消息至少被写入ISR中多个副本才算成功,大于1代表提升持久性,只有在acks设置成-1或者all的时候才生效。

确保 replication.factors>min.insync.replicas :若两者相等,则只要有一个副本挂掉,则分区无法正常使用,虽然持久性很高,但可用性被降低,建议 replication.factory = min.insync.replicas + 1。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka消息分区&producer拦截器&无消息丢失(八) 的相关文章

随机推荐

  • Android Context 上下文 你必须知道的一切

    转载请标明出处 http blog csdn net lmj623565791 article details 40481055 本文出自 张鸿洋的博客 本文大多数内容翻译自 http www doubleencore com 2013 0
  • maven(四):一个基本maven项目的pom.xml配置

    继续之前创建的test项目 一个基本项目的pom xml文件 通常至少有三个部分 第一部分 项目坐标 信息描述等
  • Python爬虫从入门到精通:(27)scrapy框架04_scrapy数据解析_Python涛哥

    scrapy数据解析 这节课 我们来看下scrapy中的数据解析 创建工程 爬取内容 爬取段子网中的段子 https duanzixing com 段子 回顾上节课所学的知识 创建scrapy 我们先来创建工程和爬虫文件 1 scrapy
  • asp.net 根据url获取参数值

    以下是 根据url获取参数值 Uri myUri new Uri http 10 0 0 75 7003 SitePages t1 aspx CurrentFolder http 10 0 0 75 7003 test docs 人力资源部
  • 实现基于SSL的主从复制

    准备 两台主机 1 在主服务器上生成证书 mkdir etc my cnf d ssl cd etc my cnf d ssl openssl genrsa 2048 gt cakey pem openssl req new x509 ke
  • Web安全之中间件安全

    中间件 中间件 英语 Middleware 又译中间件 中介层 是一类提供系统软件和应用软件之间连接 便于软件各部件之间的沟通的软件 应用软件可以借助中间件在不同的技术架构之间共享信息与资源 中间件位于客户机服务器的操作系统之上 管理着计算
  • Conda安装及使用手册

    Conda 是一个开源的软件包管理系统和环境管理系统 用于安装和管理多个软件包及其依赖项 它可以帮助用户创建虚拟环境 使得不同的项目可以使用不同的软件包和版本 从而避免版本冲突和不兼容性问题 下面是 Conda 的安装和使用步骤 安装 Co
  • 鳄鱼笔记(一) --> C#_入门<--[认识C#]

    c 语言和特点 c 语言及其特点 一 特性 二 认识 net Framework net core 三 个人理解 End c 语言及其特点 一 特性 c 是微软发布的 运行于 net Framework 和 net core 之上的高级语言
  • 面向对象设计的SOLID原则

    S O L I D是面向对象设计和编程 OOD OOP 中几个重要编码原则 Programming Priciple 的首字母缩写 SRP The Single Responsibility Principle 单一责任原则 OCP The
  • matlab练习程序(图像滤波时的边界处理)

    我们在写滤波程序时一般会用矩阵模板与原图像做卷积 这时候在做图像边界的处理是一般都选择忽略边缘 不过要是模板比较大 那么处理的效果就不好了 图像四周就会是原图像 中间才是滤波后的结果 虽然用Matlab的imfilter就能解决 不过还是自
  • 人脸识别对齐,向量搜索

    人脸对齐的概念 1 查找人脸 我们可以使用dlib来查找人脸 也就是所谓的侦测人脸 可以从下面github的地址去拿到models 人脸查找的models dnnFaceDetector dlib cnn face detection mo
  • #cmakedefine真实含义

    cmakedefine 用于configure file 中用于生成头文件的文件中 只有当CMakeLists txt中的同名变量为真时才会在生成的头文件中定义 区别于 define无论何时都会定义
  • 中介者模式-C++实现

    跟我在公司搭的框架好像 MediatorPattern cpp 定义控制台应用程序的入口点 include stdafx h include
  • buck变换器设计matlab_开关电源控制环路设计,非常实用!

    欢迎加入技术交流QQ群 2000人 电力电子技术与新能源 1105621549 高可靠新能源行业顶尖自媒体 在这里有电力电子 新能源干货 行业发展趋势分析 最新产品介绍 众多技术达人与您分享经验 欢迎关注微信公众号 电力电子技术与新能源 M
  • RichErp - vue 使用总结 - data 和 props

    data仅代表自己的内部的状态数据 所以如果一个Component仅仅是自身改变状态 然后把状态反馈给外界的话 理论上说只用data就可以了 显然组件通常不会这样 而是需要一种可进可出的状态 也就是允许外界对组件内部的数据进行修改 同时组件
  • R语言的pairs函数和ggpairs函数在数据可视化中扮演着重要的角色,能够实现散点图矩阵图的可视化

    R语言的pairs函数和ggpairs函数在数据可视化中扮演着重要的角色 能够实现散点图矩阵图的可视化 本文将介绍这两个函数的用法 并通过源代码演示如何使用它们进行数据可视化 1 R语言的pairs函数 pairs函数是R语言中一个强大的数
  • React 进阶: useSyncExternalStore API 外部状态管理

    React 进阶 useSyncExternalStore API 外部状态管理 文章目录 React 进阶 useSyncExternalStore API 外部状态管理 完整代码示例 动机 关于状态的思考 方案一 自行接入外部状态 外部
  • 分类器概念篇

    分类器是数据挖掘中对样本进行分类的方法的统称 包含决策树 逻辑回归 朴素贝叶斯 神经网络等 分类器的构造和实施大体会经过以下几个步骤 选定样本 包含正样本和负样本 将所有样本分成训练样本和测试样本两部分 在训练样本上执行分类器算法 生成分类
  • 以违停检测为示例的利用微软云AIOT技术加速项目落地

    AIoT即融合了AI 人工智能 和IoT 物联网 的技术 图形图像处理是人工智能领域中重要的一个分支 在日常生活中也存在大量基于图形图像的处理的场景 比如交通违章抓拍 基于视觉的司机防疲劳监测 家用摄像机的老人摔倒报警等功能 对于物联网则在
  • Kafka消息分区&producer拦截器&无消息丢失(八)

    上篇文章说了 acks 1代表什么都不管 即使配置了回调也不会起作用 0代表不会等待replic副本里的不会持久化 只要broker leader持久化成功则返回给producer 1代表all 则表示全部持久化成功才返回成功给produc