日志存储

2023-10-27

文件目录布局

一个分区副本在节点上对应一个log文件夹,同时kafka为了防止文件过大引入了logSegment概念。将log切分成了多个logSegment,相当于将一个大文件切分成多个小文件。logSegment又对应磁盘上多个文件,一般有一个日志文件和两个索引文件,以及可能的其他文件。
关系图
向log中追加消息时是顺序写入,只有最后一个logSegment才能执行昔日如操作,此前的所有logSegment度不能写入数据。
为了便于消息的检索,每个logSegment中的日志文件(以.log结尾)对有对应的两个索引文件:偏移量索引文件(以.index结尾)和时间戳索引文件(以.timeindex结尾)。每个logSegment都有一个基准偏移量baseOffset,用来表示当前logSegment中第一条小消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量命名的,名称固定为20位数字,没有达到的位数则用0填充。
除了以上文件外在一个kafka服务第一次启动的时候,默认的根目录下就会创建一下5个文件:
cleaner-offset-checkpoint,log-start-offset-checkpoint,meta-properties,recovery-point-offset-checkpoint,replication-offset-checkpoint

日志格式的演变

0.8-0.10版本之间为v0版本的日志文件格式
在这里插入图片描述

  • crc32(4B) 校验值。校验范围为magic至value之间
  • magic(1B) 消息格式版本号,此版本的magic值为0
  • attributes(1B) 消息的属性。占1个字节,低3位表示压缩类型:0表示NONE,1表示GZIP,2表示SNAPPY,3表示LZ4,其余保留
  • key length(4B) 表示消息的key的长度。如果为-1则表示没有设置key
  • key 可选
  • value length(4B) 世纪消息体的长度,如果没有则为-1
  • value 消息体
    查看日志可以使用如下脚本
kafka-run-class.sh kafka.tool.DumpLogSegments --file $KAFKA_LOG/topic-create-0/000000000000000.log

0.10-0.11版本之间为v1版本的日志格式
这个版本比v0版本就多了一个timestamp字段,表示消息的时间戳
在这里插入图片描述
这个版本的attributes字段中的低3位和v0版本的一样,还是压缩类型,而第4个位也被利用起来了:1表示timestamp类型为createTime,而1表示timestamp类型为logAppendTime,其余保留。时间戳类型可由broker端参数log.message.timestamp.type来配置,默认为CreateTime,即采用生产者创建消息是的时间戳。
如果在创建ProducerRecord是没有显式指定消息的时间戳,那么KafakProducer也会在发送这条消息前自动添加上。下面是KafkaProducer中与此对应的一句关键代码

	long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

0.11版本后使用的是v2版本的日志
v2版本二中消息集称为Record Batch,而不是先前的Message Set。
在这里插入图片描述
v2版本的消息格式去掉了crc32,另外增加了legnth,timestamp delta,offset delta和headers,并且attributes字段被弃用了

  • length 消息长度
  • attributes 弃用,但还是在消息格式中占据1B
  • timestamp delta 时间戳增加
  • offset delta 位移增量,保存与RecordBatch其实位移的差值,可以节省占用的字节数
  • headers 该这段主要用于支持应用级别的扩展。
    v1版本消息,如果用户指定的timestamp类型是logAppendtime而不是createTime,那么消息从生产者进入broker后,timestamp字段会被更新,此时消息的crc值江北重新计算,而此值在生产者中已经被计算过一次。再者,broker端在进行消息格式转换时(比如v1版本转成v0版的消息格式)也会重新计算crc值,所以v2将crc的字段从Record移到了RecordBatch中

消息压缩

Kafka日志中使用哪种压缩方式是通过参数compression.type来配置的,默认值为"producer",表示保留生产者使用的压缩方式。这个参数还可以配置为gzip,snappy和lz4,如果参数compression.type为uncompressed则表示不压缩,这个是在broker端设置的,和生产者端的compression.type不一样。
当消息压缩是是将整个消息集进行压缩作为内层消息,内层消息整体作为外层的value
在这里插入图片描述
压缩的外层消息中的key为null,value字段中保存的是多条压缩消息,其中Record表示的是从crc32到value的消息格式。当生产者创建压缩消息的时候,对内层压缩消息设置的offset从0开始为每个内层消息分配offset,可以理解为是这个消息集中的相对offset。而外层offset的值为内层消息中最后一条消息的绝对offset。
计算公式
RO = IO_of_a_message - IO_of_the_last_message
AO = AO_of_last_inner_message + RO

日志索引

日志索引

kafka的索引文件以稀疏索引的方式构建消息的索引,每当写入一定量(有broker端参数log.index.interval.bytes指定,默认值为4096)时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。
稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间索引同理,当时间索引中记录的是时间和偏移量的对应值,而偏移量索引记录的是偏移量和磁盘物理值的对应关系。
日志分段文件切分包含以下几个条件,满足一个就可以了

  • 当前日志分段文件的大小超过broker端参数log.segment.bytes配置的值,默认1GB
  • 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于log.roll.ms或log.roll.hours参数配置的值。ms更优先,可以理解为在一定时间内没有消息写入后就另起一个segment日志分段
  • 偏移量索引文件或时间戳索引文件的大小达到broker端参数log.index.size.max.bytes的配置,默认10MB
  • 追加的消息的偏移量与当前日志分段的基准偏移量之间差值大于Integer.MAX_VALUE
    kafka在创建索引文件的时候会为其预分配log.index.size.max.bytes大小的空间,只用当索引文件切分的事就,kafka才会把索引文件裁剪到实际的数据大小

偏移量索引

消息的偏移量占用8个字节,由relativeOffset和position各4B来占用,除了文件名记录了该segment的第一条消息的偏移量后,索引文件中的都是相对偏移量。所以当相对偏移量超过了Integer.MAX_VALUE时将不再是4B了。

时间戳索引

时间戳偏移量占用了12B,时间戳占8个字节,相对索引占4B。
查询流程

  • 将目标时间戳和每个日志分段中的最大时间戳逐一对比,知道找打不小于目标时间戳的最大时间戳所对应的日志分段。
  • 找到相应的日志分段之后,在时间戳索引文件中使用二分查找来查找到不大于目标时间戳的最大时间戳对应的日志的时间戳索引项,然后去找其对应的偏移量索引文件该日志的物理地址
  • 在偏移量索引文件中使用二分查找法来找
  • 确定日志分段的目标位置

日志清理

由broker端参数log.cleanup.policy确定,有两个值delete和compact,默认delete
主题级别的是cleanup.policy

日志删除

在kafka日志管理中会有一个专门的日志删除任务来周期性地检查和删除不符合保留条件的日志分段文件,这个周期由broker端参数log.retention.check.interval.ms来配置,默认300000.
日志删除有3中保留策略:时间,日志大小和日志其实偏移量

  • 基于时间
    由broker端参数log.retention.hours、log.retention.minutes和log.retention.ms。起哄ms等级最高,现在默认是用hours且默认值为168,即7天。
    删除过程,先找每个日志分段的最大时间戳(一般默认为找时间戳索引文件的最后一条),如果该日志分段是activeSegment或不是全部日志都过期就先切分日志在执行删除。
    删除日志分段时,先从Log对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程能操作该日志分段,然后将该日志分段对应所有文件添加一个".delete"的后缀,最后交由一个一"delete-file"命名的延迟任务来删除。
  • 基于日志大小
    当日志的大小超过了设定的阈值(broker端参数log.retention.bytes),将超出的删除掉。这种可能会导致频繁删除。删除过程同上
  • 基于偏移量
    先确定一个logStartOffset,如果该日志分段的下一个日志分段的基准日志offset小于这个就删除该日志分段。

日志压缩

该做法是把每个key里面的最新的value保留,而其余的删除的意思。
在cleaner-offset-checkpoint文件中将整个日志分为clean清理过的和dirty未清理过的。
在这里插入图片描述
firstDirtyOffset表示dirty部分的起始偏移量,而firstUncleanableOffset为dirty的截止偏移量,activeSegment不会参与日志压缩过程,broker端参数log.cleaner.min.compaction.lag.ms(默认为0)来配置消息在被清理前的最小保留时间,默认情况下firstUncleanableOffset等于activeSegment的baseOffset。
dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes) 这个是最小污浊率,为了防止日志补益药的频繁清理操作,kafka还是使用了参数log.cleaner.min.cleanable.ratio来限定可以进行清理操作的最小污浊率。kafka中的__consumer_offsets使用的就是日志压缩策略。
kafka中的每个日志清理线程会使用一个名为SkimpyOffsetMap的对象来构建key和offset的映射关系的哈希表。日志清理需要遍历两次日志文件,第一次遍历吧每个可以的哈细致和最后出现的offset都保留在SkimpyOffsetMap中。第二次遍历会检查每个消息是否符合保留条件。
默认情况下,SkimpyOffsetMap使用MD5来计算key的哈稀值,占用16B空间,为了防止哈希冲突国语频繁,也可以通过broker参数log.cleaner.io.buffer.load.factor(0.9)来调整负载因子。偏移量占用8B空间,股一个映射项占用大小24B。而SkimpyOffsetMap的大小是由log.cleaner.dedupe.buffer.size参数来确定(128M),当然是所有删除线程的总和。SkimpyOffsetMap记录key的个数为128M * 0.9 / 24B = 5033164个key。
在这里插入图片描述
假设该日志没有进行过日志压缩,checkpoint中的记录为0,则将activeSegment前的日志分段经行一次压缩,这样每个非活跃的日志分段的大小都有所缩减,checkpoint的值也有所变化。执行第二次压缩是会合并一些较小的日志分段。压缩过程中将每一个需要保留的消息复制到一个以".clean"为后缀的临时文件中,此临时文件以当前日志分组中第一个日志分度的文件命名,压缩后将".clean"文件修改成".swap"文件,在删除了原来的日志文件后,在去掉后缀。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

日志存储 的相关文章

  • RocketMQ经典高频面试题大全(附答案)

    编程界的小学生 0 彩蛋 1 说说你们公司线上生产环境用的是什么消息中间件 2 多个mq如何选型 3 为什么要使用MQ 4 RocketMQ由哪些角色组成 每个角色作用和特点是什么 5 RocketMQ中的Topic和JMS的queue有什
  • 计算广告读书笔记

    计算广告 广告主 媒体 用户 用户画像 ROI 进化 合约广告 多个合约在线分配问题 gt 竞价广告 交易终端TD 广告网络ADN gt 实时竞价RTB 广告交易平台ADX 需求方平台DSP 品牌广告 效果广告 点击率CTR 点击价值 到达
  • 【CentOS7离线ansible-playbook自动化安装CDH5.16(内附离线安装包地址,及自动化脚本)】

    CentOS7 离线环境 使用ansible自动部署CDH5 16 前言 本文介绍如何使用作者开发的自动化脚本 离线部署cdh集群 只需要简单的配置下yum源和cdh集群节点IP等几个参数 就可实现一键部署cdh集群 省去配置mysql n
  • 面对kafka频发的rebalance,该如何处理?

    Kafka 是我们最常用的消息队列 它那几万 甚至几十万的处理速度让我们为之欣喜若狂 但是随着使用场景的增加 我们遇到的问题也越来越多 其中一个经常遇到的问题就是 rebalance 重平衡 问题 但是要想了解 rebalance 那就得先
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • Kafka消息阻塞

    转自 http jis117 iteye com blog 2279519 hi all 大家都很关心kafka消息阻塞的情况 感谢RoctetMQ给我们的教训 Kafka上线也有一段时间了 确实有出现过消息阻塞的情况 虽然不影响业务而且用
  • 《消息队列高手课》缓存策略:如何使用缓存来减少磁盘IO?

    现代的消息队列 都使用磁盘文件来存储消息 因为磁盘是一个持久化的存储 即使服务器掉电也不会丢失数据 绝大多数用于生产系统的服务器 都会使用多块儿磁盘组成磁盘阵列 这样不仅服务器掉电不会丢失数据 即使其中的一块儿磁盘发生故障 也可以把数据从其
  • 第十四章 kafka专题之日志数据删除策略

    日志数据清理 为了控制磁盘的容量 需要对过去的消息进行清理 1 内部定时任务检测删除日志 默认是5分钟 2 日志清理参数配置 支持配置策略对数据进行清理 以segment为基本单位进行定期清理 当前正在使用的segment不会被清理 启用c
  • Flink设置Source数据源使用kafka获取数据

    流处理说明 有边界的流bounded stream 批数据 无边界的流unbounded stream 真正的流数据 Source 基于集合 package com pzb source import org apache flink ap
  • Flink消费kafka出现空指针异常

    文章目录 出现场景 表现 问题 解决 tombstone Kafka中提供了一个墓碑消息 tombstone 的概念 如果一条消息的key不为null 但是其value为null 那么此消息就是墓碑消息 出现场景 双流join时 采用的是l
  • 附录:kafka源码启动

    本文以源码2 8为例 准备如下 idea 2019 1 4 jdk 1 8 scala 2 12 8 gradle 6 8 1 zookeeper 3 4 10 kafka2 8源码 注意 以下安装都需要装在没有空格的路径上 比如D Pro
  • Kafka生产者模式生成10亿条数据

    生产者生产消息 public class MyProducer2 public static void main String args throws InterruptedException 生产者 Properties properti
  • [分布式] zookeeper集群与kafka集群

    目录 一 Zookeeper 概述 1 1 Zookeeper定义 1 2 Zookeeper 工作机制 1 3 Zookeeper 特点 1 4 Zookeeper 数据结构 1 5 Zookeeper 应用场景 1 6 Zookeepe
  • 公司实战 ElasticSearch+Kafka+Redis+MySQL

    一 需求 前一段时间公司要进行数据转移 将我们ES数据库中的数据转移到客户的服务器上 并且使用定时将新增的数据同步 在这过程中学到了很多 在此记录一下 二 技术栈 Mysql Redis ElasticSearch Kafka 三 方案 为
  • Kafka——Mac搭建kafka环境

    1 下载Kafka安装包 下载地址 将压缩包移动到 usr local mv kafka 2 12 3 1 0 tgz usr local 解压 tar zxvf kafka 2 12 3 1 0 tgz 2 启动 启动zookeeper
  • Kafka 权威指南

    Kafka 权威指南 这本书于 2021 年看完 2022 年又看了一遍 感觉书读百遍 其义自现 这本书侧重于 Kafka 的理论知识 虽然书有点老 但是其中关于 Kafka 的基础知识的章节讲得确实不错 适合学习 Kafka 的新手以及
  • 【ranger】CDP环境 更新 ranger 权限策略会发生低概率丢失权限策略的解决方法

    一 问题描述 我们的 kafka 服务在更新 添加 ranger 权限时 会有极低的概率导致 MM2 同步服务报错 报错内容 Not Authorized 但是查看 ranger 权限是赋予的 并且很早配置的权限策略也会报错 相关组件版本
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比

随机推荐

  • Python爬虫Scrapy框架IP代理的配置与调试

    在调试爬虫的时候 新手都会遇到关于ip的错误 好好的程序突然报错了 怎么解决 关于ip访问的错误其实很好解决 但是怎么知道解决好了呢 怎么确定是代理ip的问题呢 由于笔者主修语言是Java 所以有些解释可能和Python大佬们的解释不一样
  • SQL中的时间

    前言 Oracle MySQL Postgresql SqlServer中对于时间的处理是不同的 为了便于使用和区分 我在来列一下这常见的四种数据库中对于时间进行处理方式 目录 一 Oracle 1 dd mon yy转换为yyyy mm
  • 自己实现一个 atoi 函数

    atoi ASCII to integer 把字符串转换成整型数 情况一 输入的字符包含不是数字字符的字符 例如 123adc4 针对这种情况 我们只要加上判断就行了 只要遇到不是数字字符的直接返回 情况二 如果在数字字符前面有正负号又该怎
  • 二叉树中的dfs

    上周去华为面试的时候 遇到了一个自己以前积累过的dfs问题 当时觉得dfs的问题不需要搞懂每一步到哪里了 只需要大体上知道怎么弄套模板就可以 后来现场画那个dfs的图 以及每个状态的变化 虽然画出来了 但是觉得还是要好好思考下这些问题 毕竟
  • Argparse 教程

    https docs python org zh cn 3 howto argparse html
  • Qt中的一些常用类

    目录 1 QString 字符串类 2 容器类 2 1 QList 2 2 QMap 3 跨平台数据类型 4 时间和日期处理 5 QTimer 定时器类 1 QString 字符串类 QString是Qt中的字符串类 使用Unicode编码
  • LeetCode 面试题04. 二维数组中的查找(Python)

    题目 注意 本题与主站 240 题相同 https leetcode cn com problems search a 2d matrix ii 来源 力扣 LeetCode 链接 https leetcode cn com problem
  • java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.Double

    报错 java lang ClassCastException java math BigDecimal cannot be cast to java lang Double 原因 类型转换异常 这里要从map里面取出坐标点然后存起来 直接
  • 怎么用matlab保存音频文件,Matlab用图片和音频藏文件(续)

    上一篇文章讲到通过Matlab实现图片隐藏文件 这一篇我使用C 调用Matlab编译的 Net程序集构建了一个小的应用程序 原理很简单 使用上一篇文章中的Matlab代码 写成两个函数 分别用来隐藏和解析 代码如下 function sav
  • 2023年最新前端面试题汇总大全二(含答案超详细,Vue,TypeScript,React,微信小程序,Webpack 汇总篇)-- 持续更新

    HTML篇 CSS篇 JS篇 Vue篇 TpeScript篇 React篇 微信小程序篇 前端面试题汇总大全 含答案超详细 HTML JS CSS汇总篇 持续更新 前端面试题汇总二 逐步更新 五 Vue 篇 1 谈谈你对MVVM开发模式的理
  • conda常用的指令:创建、切换环境、第三方库的安装说明。

    conda在安装完成之后 我想查看一下我是否安装成功了conda 可以通过按下键盘上的window R 打开cmd指令 如下图所示 打开之后 在命令行输入 conda version 即可查看你当前所安装的Anconada的版本号 如下图所
  • 消息队列(message queue)

    消息队列提供了一种在两个不相关的进程之间传递数据的相当简单且有效的办法 消息队列和共享内存优于管道的原因 消息队列机制中 双方通过消息来通信 无需花费精力从字节流中解析出完整的消息 每条消息都有type字段 消息的读取进程可以通过type字
  • 模拟搭建2022网络系统管理比赛Linux模块的环境

    由于国赛通知 今年的Linux模块需要使用centos7以上或者统信UOS系统 之前还准备了很久的debian10 真是心累 准备工作 准备四台centos7 5的虚拟机 ssh工具使用SecureCRT 其中Rserver作为路由转发的服
  • java自学第三天

    vscode配置java运行环境 第一种方法 1 插件市场找debugger forjava extension pack for java 2 以管理员身份运行cmd 将路径转换到jdk安装路径 复制下面的命令运行 生成jre bin j
  • 【icon】:可在Markdown随意插入的表情符号集大全,简单复制粘贴法,2种在线实时检索法,釜底抽薪的快捷键法

    这些表情符号集 图标ICONS 可增强美观性 让读者阅读技术博客时 提升精气神 复制粘贴法 用户可直接复制 并在CSDN文章的任意处进行粘贴 在线检索法 用户进入emojipedia网站后 检索所需 复制 粘贴即可 釜底抽薪法 用户可自由选
  • pyecharts画饼形图,圆形图,环形图(含百分比显示)【python干货】

    很多做数据分析可视化的朋友总会遇到一些烦恼 用pyecharts绘制饼形图 圆形图 的时候 总会报错 废话不多说 下面跟着小编上车吧 教你用pyecharts绘制饼形图 圆形图 环形图从小白到精通 1 饼形图 圆形图 导入模块 from p
  • Oracle11g客户端的安装及配置

    笔记 安装包下载路径 http pan baidu com s 1i31gqTf 一 解压缩之后 点击setup 运行 会弹出一个黑框 耐心等待几秒之后 按照如下地址下一步配置即可 http www 2cto com database 20
  • Sublime Text无法找到Install Package

    问题描述 使用Sublime Text时 ctrl shift p输入Install Package命令失败 解决方案 需要安装Package Control包 进入Package Control安装网站 Installation Pack
  • JAVA学习-限量算法

    一 限流基础知识介绍 为啥要限流 相信就不用我多说了 比如 我周末去饭店吃饭 但是人太多了 我只能去前台拿个号 等号码到我的时候才能进饭店吃饭 如果饭店没有限流怎么办 一到饭点 人都往里冲 而饭店又处理不了这么多人流 很容易就出事故 饭店塞
  • 日志存储

    文件目录布局 一个分区副本在节点上对应一个log文件夹 同时kafka为了防止文件过大引入了logSegment概念 将log切分成了多个logSegment 相当于将一个大文件切分成多个小文件 logSegment又对应磁盘上多个文件 一