Apache Flink Checkpoint 应用实践

2023-11-19

Checkpoint 与 state 的关系

Checkpoint 是从 source 触发到下游所有节点完成的一次全局操作。下图可以有一个对 Checkpoint 的直观感受,红框里面可以看到一共触发了 569K 次 Checkpoint,然后全部都成功完成,没有 fail 的。

state 其实就是 Checkpoint 所做的主要持久化备份的主要数据,看下图的具体数据统计,其 state 也就 9kb 大小 。

什么是 state

我们接下来看什么是 state。先看一个非常经典的 word count 代码,这段代码会去监控本地的 9000 端口的数据并对网络端口输入进行词频统计,我们本地行动 netcat,然后在终端输入 hello world,执行程序会输出什么?

答案很明显,(hello, 1) 和 (word,1)

那么问题来了,如果再次在终端输入 hello world,程序会输入什么?

答案其实也很明显,(hello, 2) 和 (world, 2)。为什么 Flink 知道之前已经处理过一次 hello world,这就是 state 发挥作用了,这里是被称为 keyed state 存储了之前需要统计的数据,所以帮助 Flink 知道 hello 和 world 分别出现过一次。

回顾一下刚才这段 word count 代码。keyby 接口的调用会创建 keyed stream 对 key 进行划分,这是使用 keyed state 的前提。在此之后,sum 方法会调用内置的 StreamGroupedReduce 实现。

什么是 keyed state

对于 keyed state,有两个特点:

  • 只能应用于 KeyedStream 的函数与操作中,例如 Keyed UDF, window state

  • keyed state 是已经分区/划分好的,每一个 key 只能属于某一个 keyed state

 

对于如何理解已经分区的概念,我们需要看一下 keyby 的语义,大家可以看到下图左边有三个并发,右边也是三个并发,左边的词进来之后,通过 keyby 会进行相应的分发。例如对于 hello word,hello 这个词通过 hash 运算永远只会到右下方并发的 task 上面去。

什么是 operator state

  • 又称为 non-keyed state,每一个 operator state 都仅与一个 operator 的实例绑定。

  • 常见的 operator state 是 source state,例如记录当前 source 的 offset

 

再看一段使用 operator state 的 word count 代码:

这里的fromElements会调用FromElementsFunction的类,其中就使用了类型为 list state 的 operator state。根据 state 类型做一个分类如下图:

除了从这种分类的角度,还有一种分类的角度是从 Flink 是否直接接管:

  • Managed State:由 Flink 管理的 state,刚才举例的所有 state 均是 managed state

  • Raw State:Flink 仅提供 stream 可以进行存储数据,对 Flink 而言 raw state 只是一些 bytes

 

在实际生产中,都只推荐使用 managed state,本文将围绕该话题进行讨论。

下图就前文 word count 的 sum 所使用的StreamGroupedReduce类为例讲解了如何在代码中使用 keyed state:

下图则对 word count 示例中的FromElementsFunction类进行详解并分享如何在代码中使用 operator state:

Checkpoint 的执行机制

在介绍 Checkpoint 的执行机制前,我们需要了解一下 state 的存储,因为 state 是 Checkpoint 进行持久化备份的主要角色。

Statebackend 的分类

下图阐释了目前 Flink 内置的三类 state backend,其中MemoryStateBackendFsStateBackend在运行时都是存储在 java heap 中的,只有在执行 Checkpoint 时,FsStateBackend才会将数据以文件格式持久化到远程存储上。

RocksDBStateBackend则借用了 RocksDB(内存磁盘混合的 LSM DB)对 state 进行存储。

对于HeapKeyedStateBackend,有两种实现:

  • 支持异步 Checkpoint(默认):存储格式 CopyOnWriteStateMap

  • 仅支持同步 Checkpoint:存储格式 NestedStateMap

 

特别在 MemoryStateBackend 内使用HeapKeyedStateBackend时,Checkpoint 序列化数据阶段默认有最大 5 MB数据的限制

 

对于RocksDBKeyedStateBackend,每个 state 都存储在一个单独的 column family 内,其中 keyGroup,Key 和 Namespace 进行序列化存储在 DB 作为 key。

Checkpoint 执行机制详解

本小节将对 Checkpoint 的执行流程逐步拆解进行讲解,下图左侧是 Checkpoint Coordinator,是整个 Checkpoint 的发起者,中间是由两个 source,一个 sink 组成的 Flink 作业,最右侧的是持久化存储,在大部分用户场景中对应 HDFS。

a. 第一步,Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint;。

b. 第二步,source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint。

c. 第三步,当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator。

d. 第四步,下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。

e. 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。

f. 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。

Checkpoint 的 EXACTLY_ONCE 语义

为了实现 EXACTLY ONCE 语义,Flink 通过一个 input buffer 将在对齐阶段收到的数据缓存起来,等对齐完成之后再进行处理。而对于 AT LEAST ONCE 语义,无需缓存收集到的数据,会对后续直接处理,所以导致 restore 时,数据可能会被多次处理。下图是官网文档里面就 Checkpoint align 的示意图:

需要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。

Savepoint 与 Checkpoint 的区别

作业恢复时,二者均可以使用,主要区别如下:

Savepoint Externalized Checkpoint
用户通过命令触发,由用户管理其创建与删除 Checkpoint 完成时,在用户给定的外部持久化存储保存
标准化格式存储,允许作业升级或者配置变更 当作业 FAILED(或者CANCELED)时,外部存储的 Checkpoint 会保留下来
用户在恢复时需要提供用于恢复作业状态的 savepoint 路径 用户在恢复时需要提供用于恢复的作业状态的 Checkpoint 路径

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Flink Checkpoint 应用实践 的相关文章

  • Flink之IntervalJoin介绍

    InterValJoin算子 间隔流 一条流去join另一条流去过去一段时间内的数据 该算子将keyedStream与keyedStream转化为DataStream 再给定的时间边界内 默认包含边界 相当于一个窗口 按指定的key对俩个K
  • 车联网项目学习笔记

    学习目标 了解车联网大数据行业 了解车联网项目系统架构 理解车联网数据量计算方法 掌握json数据解析 掌握复杂json解析方法 能掌握的技能 1 车联网领域大数据系统设计与开发 2 车联网业务类型与指标设计 3 实时数据ETL开发 4 实
  • 大数据技术Flink详解

    一 有状态的流式处理 Apache Flink 是一个分布式流处理器 具有直观和富有表现力的API 可实现有状态的流处理应用程序 它以容错的方式有效地大规模运行这些应用程序 Flink 于2014 年4 月加入Apache 软件基金会作为孵
  • flink源码解析

    一 启动流程解析 flink的启动从命令行提交开始 yooh hadoop101 bin pwd home yooh app flink 1 11 1 bin yooh hadoop101 bin cat flink 上边都是获取环境配置相
  • Macbook Pro 鼠标卡顿问题

    Macbook Pro 鼠标卡顿问题 目前无解 只能改善 该问题最早能追溯到 2015年 https jingyan baidu com article ff42efa93632c5c19e220208 html 原因 据说是无线频段冲突
  • Flink 多流转换 (五) 间隔联结(Interval Join)

    文章目录 间隔联结的调用 间隔联结实例 顾名思义 间隔联结的思路就是针对一条流的每个数据 开辟出其时间戳前后的一段时间间隔 看这期间是否有来自另一条流的数据匹配 间隔连接通过一个共同的key连接两个流 A B 中的数据 流 B 的数据具有时
  • flink中AggregateFunction 执行步骤以及含义全网详细解释

    package operator import org apache flink api common functions AggregateFunction import org apache flink api common funct
  • 【Flink】处理函数Process

    目录 处理函数 基本处理函数 ProcessFunction 处理函数的功能 ProcessFunction解析 处理函数的分类 按键分区处理函数 KeyedProcessFunction 定时器Timer 和定时服务 TimerServi
  • [1143]Flink的Checkpoint和Savepoint

    文章目录 Flink的Checkpoint和Savepoint介绍 第一部分 Flink的Checkpoint 1 Flink Checkpoint原理介绍 2 Checkpoint的简单设置 3 保存多个Checkpoint 4 从Che
  • Flink学习27:驱逐器

    import org apache flink api common eventtime SerializableTimestampAssigner WatermarkStrategy import org apache flink api
  • Flink消费kafka出现空指针异常

    文章目录 出现场景 表现 问题 解决 tombstone Kafka中提供了一个墓碑消息 tombstone 的概念 如果一条消息的key不为null 但是其value为null 那么此消息就是墓碑消息 出现场景 双流join时 采用的是l
  • 大数据—— Flink 的优化

    目录 一 Flink内存优化 1 1 Flink 内存配置 二 配置进程参数 2 1 场景 2 2 操作步骤 三 解决数据倾斜 3 1 场景描述 3 2 解决方式 3 2 1 数据源的消费不均匀 调整并发度 3 2 2 数据分布不均匀 四
  • flink 第一个窗口开始时间以及offset作用

    简述窗口开始时间 1 当flink程序启动时 创建的第一个window的开始时间是由程序决定的 具体的算法如下 2 窗口开始时间 第一条记录时间戳 第一条记录时间戳 窗口时长 也就是从utc0时区的1970 01 01 00 00 00 0
  • 流计算框架 Flink 与 Storm 的性能对比

    概述 将分布式实时计算框架 Flink 与 Storm 进行性能对比 为实时计算平台和业务提供数据参考 一 背景 Apache Flink 和 Apache Storm 是当前业界广泛使用的两个分布式实时计算框架 其中 Apache Sto
  • flink-addSource和addSink分别是kafka、自定义数据、mysql、hbase的java实现

    flink主程序 public class FinkTest public static void main String args throws Exception StreamExecutionEnvironment env Strea
  • 在 Linux 中使用 Core Dump 检查点/重新启动

    可以使用进程的核心转储来实现检查点 重启吗 核心文件包含进程的完整内存转储 因此理论上应该可以 将进程恢复到转储核心时的相同状态 是的 这是可能的 GNU Emacs 这样做是为了优化其启动时间 它加载一堆 Lisp 文件来生成图像 然后转
  • Java 检查点

    我希望我的问题不太模糊 但我正在寻找有关 Java 检查点的更多信息 我必须生成一个大搜索树 我希望能够在程序中断后 例如突然重新启动后等 恢复计算 因此我需要检查点 我发现关于这方面的文档很少 而且我的印象是很多开发在 90 年代中期就停
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • java.lang.IllegalStateException:读取增量文件时出错,使用 kafka 进行 Spark 结构化流处理

    我在我们的项目中使用结构化流 Kafka 进行实时数据分析 我使用的是 Spark 2 2 kafka 0 10 2 我在应用程序启动时从检查点进行流式查询恢复期间遇到问题 由于单个 kafka 流点派生有多个流查询 并且每个流查询都有不同
  • 我正在尝试从某个检查点 (Tensorflow) 恢复训练,因为我正在使用 Colab 并且 12 小时还不够

    这是我正在使用的代码的一部分 checkpoint dir training checkpoints1 checkpoint prefix os path join checkpoint dir ckpt checkpoint tf tra

随机推荐