【Flink系列】- RocksDB增量模式checkpoint大小持续增长的问题及解决

2023-11-17

背景


Flink版本:1.13.5

一个使用FlinkSQL开发的生产线上任务, 使用Tumble Window做聚和统计,并且配置table.exec.state.ttl为7200000,设置checkpoint周期为5分钟,使用rocksdb的增量模式。

正常情况下,任务运行一段时间以后,新增和过期的状态达到动态的平衡,随着RocksDB的compaction,checkpoint的大小会在小范围内上下起伏。

实际观察到,checkpoint大小持续缓慢增长,运行20天以后,从最初了100M左右,增长到了2G,checkpoint的时间也从1秒增加到了几十秒。

源码分析


我们看一下RocksIncrementalSnapshotStrategy.RocksDBIncrementalSnapshotOperation类中的get()方法:

public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
            boolean completed = false;
            SnapshotResult<StreamStateHandle> metaStateHandle = null;
            Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap();
            HashMap miscFiles = new HashMap();
            boolean var15 = false;

            SnapshotResult var18;
            try {
                var15 = true;
                metaStateHandle = this.materializeMetaData(snapshotCloseableRegistry);
                Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
                Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");
                this.uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
                synchronized(RocksIncrementalSnapshotStrategy.this.materializedSstFiles) {
                    RocksIncrementalSnapshotStrategy.this.materializedSstFiles.put(this.checkpointId, sstFiles.keySet());
                }

                IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot());
                DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
                SnapshotResult snapshotResult;
                if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
                    IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, this.checkpointId, directoryStateHandle, RocksIncrementalSnapshotStrategy.this.keyGroupRange, (StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles.keySet());
                    snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
                } else {
                    snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
                }

                completed = true;
                var18 = snapshotResult;
                var15 = false;
            } finally {
                if (var15) {
                    if (!completed) {
                        List<StateObject> statesToDiscard = new ArrayList(1 + miscFiles.size() + sstFiles.size());
                        statesToDiscard.add(metaStateHandle);
                        statesToDiscard.addAll(miscFiles.values());
                        statesToDiscard.addAll(sstFiles.values());
                        this.cleanupIncompleteSnapshot(statesToDiscard);
                    }

                }
            }

重点关注uploadSstFiles()方法的实现细节:

            Preconditions.checkState(this.localBackupDirectory.exists());
            Map<StateHandleID, Path> sstFilePaths = new HashMap();
            Map<StateHandleID, Path> miscFilePaths = new HashMap();
            Path[] files = this.localBackupDirectory.listDirectory();
            if (files != null) {
                this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
                sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
                miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
            }

进入到createUploadFilePaths()方法:

        private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {
            Path[] var5 = files;
            int var6 = files.length;

            for(int var7 = 0; var7 < var6; ++var7) {
                Path filePath = var5[var7];
                String fileName = filePath.getFileName().toString();
                StateHandleID stateHandleID = new StateHandleID(fileName);
                if (!fileName.endsWith(".sst")) {
                    miscFilePaths.put(stateHandleID, filePath);
                } else {
                    boolean existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);
                    if (existsAlready) {
                        sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
                    } else {
                        sstFilePaths.put(stateHandleID, filePath);
                    }
                }
            }

        }

  这里是问题的关键,我们可以归纳出主要逻辑:

1. 扫描rocksdb本地存储目录下的所有文件,获取到所有的sst文件和misc文件(除sst文件外的其他所有文件);

2. 将sst文件和历史checkpoint上传的sst文件做对比,将新增的sst文件路径记录下来;

3. 将misc文件的路径记录下来;

这里就是增量checkpoint的关键逻辑了, 我们发现一点,增量的checkpoint只针对sst文件, 对其他的misc文件是每次全量备份的,我们进到一个目录节点看一下有哪些文件被全量备份了:

[hadoop@fsp-hadoop-1 db]$ ll
总用量 8444
-rw-r--r-- 1 hadoop hadoop       0 3月  28 14:56 000058.log
-rw-r--r-- 1 hadoop hadoop 2065278 3月  31 10:17 025787.sst
-rw-r--r-- 1 hadoop hadoop 1945453 3月  31 10:18 025789.sst
-rw-r--r-- 1 hadoop hadoop   75420 3月  31 10:18 025790.sst
-rw-r--r-- 1 hadoop hadoop   33545 3月  31 10:18 025791.sst
-rw-r--r-- 1 hadoop hadoop   40177 3月  31 10:18 025792.sst
-rw-r--r-- 1 hadoop hadoop   33661 3月  31 10:18 025793.sst
-rw-r--r-- 1 hadoop hadoop   40494 3月  31 10:19 025794.sst
-rw-r--r-- 1 hadoop hadoop   33846 3月  31 10:19 025795.sst
-rw-r--r-- 1 hadoop hadoop      16 3月  30 19:46 CURRENT
-rw-r--r-- 1 hadoop hadoop      37 3月  28 14:56 IDENTITY
-rw-r--r-- 1 hadoop hadoop       0 3月  28 14:56 LOCK
-rw-rw-r-- 1 hadoop hadoop   38967 3月  28 14:56 LOG
-rw-r--r-- 1 hadoop hadoop 1399964 3月  31 10:19 MANIFEST-022789
-rw-r--r-- 1 hadoop hadoop   10407 3月  28 14:56 OPTIONS-000010
-rw-r--r-- 1 hadoop hadoop   13126 3月  28 14:56 OPTIONS-000012

1. CURRENT、IDENTIFY、LOCK、OPTIONS-*, 这些文件基本是固定大小,不会有变化;

2. LOG文件, 这个文件是rocksdb的日志文件,默认情况下,flink设置的rocksdb的日志输出级别是HEAD级别,几乎不会有日志输出,但是如果你配置了state.backend.rocksdb.log.level,比如说配置为了INFO_LEVEL,那么这个LOG文件会持续输出并且不会被清理;

3. MANIFEST-*,这是rocksdb的事务日志,在任务恢复重放过程中会用到, 这个日志也会持续增长,达到阈值以后滚动生成新的并且清楚旧文件;

原因总结


在增量checkpoint过程中,虽然sst文件所保存的状态数据大小保持动态平衡,但是LOG日志和MANIFEST文件仍然会当向持续增长,所以checkpoint会越来越大,越来越慢。

解决办法


1. 在生产环境关闭Rocksdb日志(保持state.backend.rocksdb.log.level的默认配置即可);

2. 设置manifest文件的滚动阈值,我设置的是10485760byte;

问题解决。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Flink系列】- RocksDB增量模式checkpoint大小持续增长的问题及解决 的相关文章

随机推荐

  • Maven、pom.xml

    maven库中心 Maven Central Repository Search 搜索可以用的包与版本 目录 Maven 使用方法 1 下载 配置 2 项目中使用 3 生命周期 4 构建插件 MAVEN工程 的目录结构 父子项目 创建父项目
  • dpr-2000 四usb口无线多功能打印服务器,D-Link DPR-2000 超高兼容的打印服务器

    PConline资讯 DPR 2000无线802 11 G多功能打印服务器是一个通用多端口的打印服务器 是办公 学校和商业使用的理想选择 它提供4个USB连接接口可以连接4台USB打印机 本设备给用户提供添加多个打印机 多功能打印机或扫描仪
  • vue项目使用luckyexcel插件预览excel表格

    温馨提示 需要用到luckysheet文件和luckyexcel插件 根据下面步骤一步一步操作会避免踩坑 比如我当时遇到了window luckysheet is not defined控制台报红的问题 第一步 引入luckysheet的相
  • JAVA单元测试框架-14-实现TestNG失败案例重跑

    前面是通过java代码指定重跑 本节是讲解通过实现IAnnotationTransformer接口实现失败案例重跑 创建MyRetry 实现IRetryAnalyzer 接口 package Listener import org test
  • MMsegmentation文档学习

    1 了解配置 config文件结构 config base 下有4种基本组件类型 dataset model schedule default runtime 同一文件夹下的所有配置 建议只具有一个原始配置 所有其他配置从原始配置继承 这样
  • JDK8升级JDK11最全实践干货来了

    1 前言 截至目前 2023年 Java8发布至今已有9年 2018年9月25日 Oracle发布了Java11 这是Java8之后的首个LTS版本 那么从JDK8到JDK11 到底带来了哪些特性呢 值得我们升级吗 而且升级过程会遇到哪些问
  • Ts接口的使用

    TypeScript 的核心原则之一是对值所具有的结构进行类型检查 我们使用接口 Interfaces 来定义对象的类型 接口是对象的状态 属性 和行为 方法 的抽象 描述 接口初探 需求 创建人的对象 需要对人的属性进行一定的约束 id是
  • 工作10年我面试过上百个程序员,真想对他们说…

    V xin ruyuanhadeng获得600 页原创精品文章汇总PDF 一 写在前面 最近收到不少读者反馈 说自己在应聘一些中大型互联网公司的Java工程师岗位时遇到了不少困惑 这些同学说自己也做了精心准备 网上搜集了不少Java面试题
  • Edit Distance

    Given two words word1 and word2 find the minimum number of steps required to convert word1 to word2 each operation is co
  • 【转载】探索推荐引擎内部的秘密

    原网址 https www ibm com developerworks cn web 1103 zhaoct recommstudy1 index html icomments 这是2011年ibm发布的文章 较为通俗易懂 适合想入门推荐
  • 配置msf连接postgresql数据库

    BackTrack 5 R3版本的Metasploit在每次的升级后总会出现奇奇怪怪的错误 主要是Ruby的库出错 网上找了一些解决的办法 但每次更新后又会出错 蛋碎 解决方法 BackTrack 5中默认自动开启端口7337 1 查看Po
  • Zabbix监控MariaDB服务

    文章目录 1 概述监控MariaDB服务主机 2 安装MariaDB服务和配置MariaDB 3 配置Zabbix的userparameter mysql conf 文件模板 4 在Web配置模板 5 在server进行压力测试mysql服
  • svg实现文本的垂直居中对齐样式

    项目中用到表格内画折线趋势图 本人使用的svg绘制简单折线 没有数据的单元格显示文字 为了不影响表格的宽度自适应 就想到在svg上写文字 于是就有了在svg上对文字样式进行垂直居中的需求 上代码
  • Linux教程:在虚拟机中如何配置Linux系统网络环境 ?

    对于很多初学Linux 的同学 大多选择使用虚拟机来展开学习 可以方便的做实验 修改 测试 不必害怕出问题 可以随便折腾 大不了换一个虚拟机 原来的系统不受任何影响 但由于不是实体pc机 使用难免受限 如果配置不好 后期开发必受其累 比如
  • C++Primer(4-8章)

    第四章 表达式 求值顺序 C 中没有明确规定大多数运算符的求值顺序 因此我们要避免 改变了某个运算对象的值 又在表达式其他地方使用这个运算对象 这种情况出现 赋值运算满足右结合律 在输出表达式中使用条件运算符 条件运算符的优先级非常低 因此
  • java修改AD域用户密码使用SSL连接方式

    正常情况下 JAVA修改AD域用户属性 只能修改一些普通属性 如果要修改AD域用户密码和userAccountControl属性就得使用SSL连接的方式修改 SSL连接的方式需要操作以下步骤 1 安装AD域证书服务 2 证书颁发机构中设置以
  • 【C语言】结构体中的函数指针

    目录 一 函数指针是什么 二 结构体中的函数指针 一 函数指针是什么 函数指针是指向函数的指针变量 通常我们说的指针变量是指向一个整型 字符型或数组等变量 而函数指针是指向函数 函数指针可以像一般函数一样 用于调用函数 传递参数 正确形式
  • 2.【Python】分类算法—Logistic Regression

    2 Python 分类算法 Logistic Regression 文章目录 2 Python 分类算法 Logistic Regression 前言 一 Logistic Regression模型 1 线性可分和线性不可分 2 Logis
  • 二.全局定位--开源定位框架livox-relocalization实录数据集测试

    相关博客 二十五 SLAM中Mapping和Localization区别和思考 goldqiu的博客 CSDN博客 二十五 SLAM中Mapping和Localization区别和思考 goldqiu的博客 CSDN博客 基于固态雷达的全局
  • 【Flink系列】- RocksDB增量模式checkpoint大小持续增长的问题及解决

    背景 Flink版本 1 13 5 一个使用FlinkSQL开发的生产线上任务 使用Tumble Window做聚和统计 并且配置table exec state ttl为7200000 设置checkpoint周期为5分钟 使用rocks