Kafka学习笔记(高级篇)

2023-11-09

目录

高级功能

高效读写

涉及技术

ZooKeeper

自定义拦截器

监控

延迟消费

一些改进手段


高级功能

高效读写

涉及技术

  • 高吞吐量:Kafka 每秒可以处理数百万消息。这是因为 Kafka 消息的处理是以批处理(Batching)的方式来完成的,生产者可以将多个消息一起发送到 Kafka 集群,以减少网络开销以及加速处理速度。
  • 低延迟:Kafka 利用磁盘存储加缓存,可以在微秒级别内完成消息处理。Kafka 具有高效的消息传递能力,也可以在微秒级别内完成消息处理。这是由于 Kafka 的消息存储设计是基于磁盘的,但同时消息缓存也是放在内存里的。也就是说,在处理消息时,Kafka 集群会先将消息写入到磁盘中进行持久化存储,并且在内存中缓存一份消息以便进行更快的消息传递和读取。
  • 分布式架构:Kafka 采用分布式的架构设计,可以通过水平扩展增加集群规模和负载容量。集群中的每个节点都可以独立完成消息处理和写入,可以有效地提高整个系统的吞吐量。
  • 高可靠性:Kafka 在存储消息时,使用了多副本机制,可以保证消息的高可靠性。当消息发送失败或者其中一个节点失效时,可以通过复制副本来实现自动故障转移,以确保消息的可靠性、可用性与一致性。
  • 顺序写:Kafka 内部的消息存储结构是一个连续的、顺序写入的日志文件(Log File)集合,也称“分区”(Partition)。分区中的每一条消息都被分配一个唯一的偏移量(Offset),并且保留在磁盘上直到被消费。通过这种消息存储方式,Kafka 可以实现高效的顺序写入操作。因为 Kafka 可以将流式的消息按顺序追加到 Log 文件的末尾,这避免了随机写入所产生的磁盘寻址和寻道时间,从而大大提高了写入性能,并降低了延迟。此外,由于只有新的消息会追加到 Log 文件中,而没有数据被修改或删除,因此,读取数据时,Kafka 也可以通过顺序扫描磁盘获取最新的消息,这样也大大提高了读取数据的效率。
  • 数据压缩:Kafka 提供了数据压缩功能,可以将传输的消息进行压缩和解压,减少了磁盘和网络带宽的使用。
  • 零拷贝:Kafka 零拷贝技术可以避免在传输数据时进行数据缓冲和复制,从而减少了 CPU 和内存的使用,提高了性能。

ZooKeeper

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线、所有topic的分区副本分配和leader的选举等工作。Controller的工作管理是依赖于zookeeper的。

Partition的Leader的选举过程

Partition的Leader选举流程

自定义拦截器

拦截器原理

Producer拦截器interceptor是在Kafka0.10版本引入的,主要用于Clients端的定制化控制逻辑。对于Producer而言,interceptor使得用户在消息发送之前以及Producer回调逻辑之前有机会对消息做一些定制化需求,比如修改消息的展示样式等,同时Producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链interceptor chain,Interceptor实现的接口为ProducerInterceptor,主要有四个方法:

  • configure(Map<String, ?> configs):获取配置信息和初始化数据时调用
  • onSend(ProducerRecord record):该方法封装在KafkaProducer.send()方法中,运行在用户主线程中,Producer确保在消息被序列化之前及计算分区前调用该方法,并且通常都是在Producer回调逻辑出发之前。
  • onAcknowledgement(RecordMetadata metadata, Exception exception):onAcknowledgement运行在Producer的IO线程中,因此不要再该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
  • close():关闭inteceptor,主要用于执行资源清理工作。

Inteceptor可能被运行到多个线程中,在具体使用时需要自行确保线程安全,另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并紧紧是捕获每个interceptor可能抛出的异常记录到错误日志中而非向上传递。

自定义加入时间戳拦截器

/**
 * @author caoduanxi
 * @Date 2021/1/13 14:15
 * @Motto Keep thinking, keep coding!
 */
public class TimeInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
                "TimeInterceptor:" + System.currentTimeMillis() + "," + record.value());
    }
    // 其余方法省略
}

自定义消息发送统计拦截器

/**
 * @author caoduanxi
 * @Date 2021/1/13 14:18
 * @Motto Keep thinking, keep coding!
 */
public class CounterInterceptor implements ProducerInterceptor<String, String> {
    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        // 输出结果,结束输出
        System.out.println("Sent successful:" + successCounter);
        System.out.println("Sent failed:" + errorCounter);
    }
}

在CustomProducer中加入拦截器

// 加入拦截器
List<Object> interceptors = new ArrayList<>();
interceptors.add(TimeInterceptor.class);
interceptors.add(CounterInterceptor.class);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

注意:拦截器的close()方法是收尾的,一定要调用Producer.close()方法,否则拦截器的close()方法不会被调用。

监控

Eagle

Eagle是开源的额可视化和管理软件,允许查询、可视化、提醒和探索存储在任何地方的指标,简而言之,Eagle为您提供了将Kafka集群数据转换为漂亮的图形和可视化的工具。

实质: 一个运行在tomcat上的web应用

延迟消费

kafka目前默认可支持1h以内的延迟消费。

使用方式:consumer启动参数增加 --delay-time-seconds n 设置消费延迟时间,单位秒,默认不延迟消费。仅能拉取到消费延迟时间之前的消息。

注意:此参数默认限制最大值为3600s,超过限制可能导致consumer启动失败。如有调整最大延迟时间的需求,请联系李锦涛(KIM:lijintao)

注意:消息拉取可能有分钟级别的误差。

注意:由于目前每4kb数据构建一次时间索引,如果最后一批数据的size不够4kb,可能导致这些数据不能被延迟消费到。

一些改进手段

  • Rebalance优化
  • Federation架构应用
  • 存算分离等等

相关推荐文章:

Kafka学习笔记(基础篇)_Cat凯94的博客-CSDN博客

看完这篇Kafka,你也许就会了Kafka_心的步伐的博客-CSDN博客

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka学习笔记(高级篇) 的相关文章

随机推荐

  • linux如何查看软件权限,Linux下使用RPM查看安装的软件和权限操作

    RPM 的介绍和应用 1 查看是否安装了gcc 命令 rpm ql gcc rpm qa grep gcc 参数 q 询问 a 查询全部 l 显示列表 2 权限 安装和删除只有root和有安装权限的用户才可以进行 查询是每个用户都可以进行操
  • zkServer.cmd点击无反应,报错:JAVA_HOME is not set

    window版本中的zk安装过程中遇到一些问题 zkServer cmd点击无反应 现象 window本地准备zk启动的时候 点击zkServer cmd 不弹启动日志的窗口 无反应 以下是排查过程 在zk的安装目录下 我的是D apach
  • Faster-RCNN遇到的问题

    编译过程挺顺利的 完全参照之前编译的caffe的makefile进行就好 运行 experiments scripts faster rcnn alt opt sh 0 VGG16 pascal voc 遇到的问题做个小汇总 1 pytho
  • MyBatis学习(一):一个简单的演示

    MyBatis是支持普通SQL查询 存储过程和高级映射的优秀持久层框架 MyBatis消除了几乎所有的JDBC代码和参数的手工设置以及结果集的检索 他是使用简单的XML或是注解用于配置和原始映射 将接口和java的对象映射为数据库中的记录
  • 数据结构-线性表

    线性表 线性表是典型线性数据结构 线性表是由n n 0 个数据元素组成的一个有限序列 线性表中数据元素的个数n称为线性表的长度 当n 0时 称为空表 非空线性表满足线性结构的三个特性 线性表的数据元素可以是由一个数据项组成的简单数据元素 也
  • pmc是什么职位?

    答案一 PMC代表Product Material Control的缩写形式 意思为生产及物料控制 通常它分为两个部分 PC 生产控制或生产管制 台 日资公司俗称生管 主要职能是生产的计划与生产的进度控制 MC 物料控制 俗称物控 主要职能
  • windows下的gcc使用

    文章目录 缘起 正文 安装gcc 使用gcc来编译c语言程序 直接编译生成 exe文件 在cmd里面使用gcc编译器编译c文件流程 利用gcc编译多个c语言源文件 第一步建立文件 编译文件 链接 运行 gcc基本参数 参考文献 缘起 在wi
  • Python bs4库 爬取小说

    学习目标 利用bs4库爬取小说 笔趣阁 学习内容 bs4库 from bs4 import BeautifulSoup 可以将网页源码转化为对象 soup BeautifulSoup Html lxml 对对象进行解析 利用网页中的标签 s
  • face++人脸识别初探

    依然是实训第一周 由于选定了face 作为api 我摸索了一下face 的人脸识别api的情况 下面我来为大家分析一下face api的情况 face 人脸识别有 1 人脸检测 2 人脸别对 3 人脸搜索 4 人脸关键点 5 人脸属性 6
  • MYSQL服务无法启动:InnoDB: .\ibdata1 can't be opened in read-write mode

    今天在那做实验倒腾mysql数据库 后来发现服务无法启动 查看日志报错如下 2015 01 07 17 48 54 9136 ERROR InnoDB ibdata1 can t be opened in read write mode 2
  • File_operations结构----将驱动里的功能函数与系统调用关联起来

    File operations结构体 结构体file operations在头文件 linux fs h中定义 在驱动程序中 用来存储对设备进行各种操作的函数的指针 可以看做是这些函数与系统调用的对应关系表 系统调用发生时 系统会读取fil
  • 电子企业MES管理系统架构分析

    随着电子制造行业的快速发展 MES生产管理系统的应用越来越普遍 许多制造企业购买或自主研发了适合自己工厂的MES 旨在实现智能工厂 车间 的目标 作为智能制造的核心 MES管理系统解决方案在企业智能化转型升级中发挥着越来越重要的作用 然而
  • ES搜索框架--低配置服务器部署ES导致崩溃的解决

    省流 修改jvm options 降低堆大小 一 服务器情况 最近es会突然stop 查看日志后发现经常是因为报错 Native controller process has stopped no new native processes
  • ubuntu18.04安装mysql5.7

    sudo apt install mysql server sudo apt update Server version 5 7 36 0ubuntu0 18 04 1 Ubuntu sudo mysql show databases us
  • centOS 7 无法启动网络原因(service network start)+ifconfig找不到IP地址

    linux 中 安装 centos 配置静态地址却发现网络服务启动不了 试了好久终于好了 出个攻略记录一下修改历程 出现问题 Job for network service failed because the control proces
  • XILINX XDMA pcie 使用

    前段时间在公司项目中调试了PCIE 正好做一个总结 那些介绍XDMA PCIE之类的多余的东西网上能搜到很多 我这里就不多说 我写的只是自己的一些想法 以及自己的设计思路 同每一个刚开始调试PCIE的人一样 作为初学者大家都是先去网上搜集大
  • 服务器建文件夹,服务器建立文件夹

    服务器建立文件夹 内容精选 换一换 本指导适用于用户做网页301重定向时参考使用 装有IIS的服务器做301重定向在IIS里把网站正常发布 例如域名为www aaa com 在硬盘上建一个空文件夹 在IIS里建一个网站 例如域名为aaa c
  • Apicloud之如何实现一次上传多张图片

    前提 1 APIcloud做前端开发工具 2 加入了UIMediaScanner模块 代码使用安卓系统 ios系统的可以参考一下 代码讲解 1 UIMediaScanner模块选择多张图片以后 返回的是一个list集合 2 而路径是在lis
  • STM32在Keil5中硬件仿真问题记录汇总

    STM32在Keil5中硬件仿真问题记录汇总 问题描述 使用平台 处理方式 硬件上接线问题 问题描述 0x1FFFF3AE 0549 LSLS r1 r1 21 0x1FFFF3B0 D5FB BPL 0x1FFFF3AA 0x1FFFF3
  • Kafka学习笔记(高级篇)

    目录 高级功能 高效读写 涉及技术 ZooKeeper 自定义拦截器 监控 延迟消费 一些改进手段 高级功能 高效读写 涉及技术 高吞吐量 Kafka 每秒可以处理数百万消息 这是因为 Kafka 消息的处理是以批处理 Batching 的