[1143]Flink的Checkpoint和Savepoint

2023-11-17

Flink的Checkpoint和Savepoint介绍

第一部分:Flink的Checkpoint

1. Flink Checkpoint原理介绍

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。这里,我们简单理解一下Flink Checkpoint机制,如官网下图所示:

image.png

Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会根据Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。

2. Checkpoint的简单设置

开启Checkpoint功能,有两种方式。其一是在conf/flink_conf.yaml中做系统设置;其二是针对任务再代码里灵活配置。但是我个人推荐第二种方式,针对当前任务设置,设置代码如下所示:

 1 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 2 // 设置保存点的保存路径,这里是保存在hdfs中
 3 env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints"));
 4 CheckpointConfig config = env.getCheckpointConfig();
 5 // 任务流取消和故障应保留检查点
 6 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 7 // 保存点模式:exactly_once
 8 config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 9 // 触发保存点的时间间隔
10 config.setCheckpointInterval(60000);

上面调用enableExternalizedCheckpoints设置为ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。上面代码配置了执行Checkpointing的时间间隔为1分钟。

3. 保存多个Checkpoint

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前。
Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:

state.checkpoints.num-retained: 20
这样设置以后,运行Flink Job,并查看对应的Checkpoint在HDFS上存储的文件目录,示例如下所示:

 hdfs dfs -ls /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/
 Found 22 items
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:23 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-858
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:24 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-859
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:25 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:26 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-861
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:27 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-862
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:28 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-863
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:29 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-864
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:30 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-865
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:31 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-866
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:32 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-867
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:33 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-868
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:34 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-869
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:35 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-870
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:36 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-871
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:37 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-872
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:38 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-873
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:39 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-874
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:40 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-875
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:41 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-876
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:42 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-877
 drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/shared
 drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/taskowned

可见,我们配置了state.checkpoints.num-retained的值为20,只保留了最近的20个Checkpoint。如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。

4.从Checkpoint进行恢复

如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令:

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar

程序正常运行后,还会按照Checkpoint配置进行运行,继续生成Checkpoint数据,如下所示:

 hdfs dfs -ls /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e
 Found 6 items
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:56 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-861
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:57 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-862
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:58 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-863
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:59 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-864
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/shared
 drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/taskowned

从上面我们可以看到,前面Flink Job的ID为582e17d2cc343e6c56255d111bae0191,所有的Checkpoint文件都在以Job ID为名称的目录里面,当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID(这里是11bbc5d9933e4ff7e25198a760e9792e),而对应的Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863等等。


Checkpoint

checkpoint是Flink容错的核心机制。它可以定期的将各个Operator处理的数据进行快照存储(Snapshot)。

如果Flink程序出现宕机,可以重新从这些快照中恢复数据。

Flink容错机制的核心就是持续创建分布式数据流及其状态的一致快照。Flink的checkpoint是通过分布式快照实现的,

所以在flink中这两个词是一个意思。

checkpoint用来保证任务的错误恢复。任务失败可以从最新的checkpoint恢复。

checkpoint机制需要一个可靠的可以回放数据的数据源(kafka,RabbitMQ,HDFS…)和一个存放state的持久存储(hdfs,S3)

1、checkpointConfig

通过调用StreamExecutionEnvironment.enableCheckpointing(internal,mode) 启用checkpoint 。

internal 默认是 -1,表示checkpoint不开启,mode默认是EXACTLY_ONCE模式。

可设置checkpoint timeout,超过这个时间 checkpoint 没有成功,checkpoint 终止。默认 10分钟。

可设置 chekpoint 失败任务是否任务也失败,默认是true

可设置同时进行的checkpoint数量,默认是1.

2、barrier

将barrier插入到数据流中,作为数据流的一部分和数据一起向下流动。Barrier不会干扰正常数据,数据流严格有序。

一个barrier把数据流分割成两部分:一部分进入到当前快照,另一部分进入到下一个快照。

每一个barrier都带有快照ID,并且barrier之前的数据都进入了此快照。Barrier不会干扰数据流处理,所以非常轻量。

多个不同快照的多个barrier会在流中同时出现,即多个快照可能同时创建。

Barrier在数据源端插入,当 snapshot 的barrier 插入后,系统会记录当前snashot 位置值n (用Sn表示。)

例如:在 Apache Kafka中,这个变量表示某个分区中最后一条数据的偏移量。这个位置值Sn 会被发送到一个称为 checkpoint coordinator的模块。

然后 barrier 继续向下移动,当一个 operator 从其输入流接收到所有标识 snapshot n 的 barrier时,它会向其所有输入流插入一个标识 snapshotn 的barrier。当sink operator (DAG流的终点) 从其输入流接收到所有 barrier n时,它向 the checkpoint coordinator 确认 snapshot n 已完成。

当所有 sink 都确认了这个快照,快照就被标识为完成。


第二部分: Flink的Savepoint

1.Flink的Savepoint原理介绍

Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpoint机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中。

Flink程序中包含两种状态数据,一种是用户定义的状态(User-defined State),他们是基于Flink的Transformation函数来创建或者修改得到的状态数据;另一种是系统状态(System State),他们是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录。为了能够在创建Savepoint过程中,唯一识别对应的Operator的状态数据,Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。当然,如果我们不指定Operator ID,Flink也会我们自动生成对应的Operator状态ID。
而且,强烈建议手动为每个Operator设置ID,即使未来Flink应用程序可能会改动很大,比如替换原来的Operator实现、增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。另外,保存的Savepoint状态数据,毕竟是基于当时程序及其内存数据结构生成的,所以如果未来Flink程序改动比较大,尤其是对应的需要操作的内存数据结构都变化了,可能根本就无法从原来旧的Savepoint正确地恢复。

下面,我们以Flink官网文档中给定的例子,来看下如何设置Operator ID,代码如下所示:

 DataStream<String> stream = env.
   // 有状态的source ID (例如:Kafka)
   .addSource(new StatefulSource())
   .uid("source-id") // source操作的ID
   .shuffle()
   // 有状态的mapper ID
   .map(new StatefulMapper())
   .uid("mapper-id") // mapper的ID
   // 无状态sink打印
   .print(); // 自动生成ID
2.创建Savepoint

创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定:
一种是,需要配置Savepoint的默认路径,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,设置Savepoint存储目录,例如如下所示:

state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints

另一种是,在手动执行savepoint命令的时候,指定Savepoint存储目录,命令格式如下所示:

bin/flink savepoint :jobId [:targetDirectory]

例如,正在运行的Flink Job对应的ID为40dcc6d2ba90f13930abce295de8d038,使用默认state.savepoints.dir配置指定的Savepoint目录,执行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

可以看到,在目录hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints/savepoint-40dcc6-4790807da3b0下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。

为正在运行的Flink Job指定一个目录存储Savepoint数据,执行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints

可以看到,在目录 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。

3.从Savepoint恢复

现在,我们可以停掉Job 40dcc6d2ba90f13930abce295de8d038,然后通过Savepoint命令来恢复Job运行,命令格式如下所示:

bin/flink run -s :savepointPath [:runArgs]

以上面保存的Savepoint为例,恢复Job运行,执行如下命令:

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar

可以看到,启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd。

4. Savepoint目录结构

下面,我们看一下Savepoint目录下面存储内容的结构,如下所示:

hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b
Found 5 items
-rw-r--r--   3 hadoop supergroup       4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6
-rw-r--r--   3 hadoop supergroup       4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159
-rw-r--r--   3 hadoop supergroup       4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata
-rw-r--r--   3 hadoop supergroup       4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c
-rw-r--r--   3 hadoop supergroup       4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a

如上面列出的HDFS路径中,11bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串,然后savepoint-11bbc5-bd967f90709b作为存储此次Savepoint数据的根目录,最后savepoint-11bbc5-bd967f90709b目录下面_metadata文件包含了Savepoint的元数据信息,其中序列化包含了savepoint-11bbc5-bd967f90709b目录下面其它文件的路径,这些文件内容都是序列化的状态信息。


flink的checkpoint页面监控

flink web页面中提供了针对Job Checkpoint相关的监控信息。Checkpoint监控页面共有overview、history、summary和configuration四个页签,分别对Checkpoint从不同的角度进行了监控,每个页面中都包含了与Checkpointing相关的指标。

一、overview

overview页签中宏观地记录了flink应用中Checkpoint的数量以及Checkpoint的最新记录,包括失败和完成的Checkpoint记录。

overview页签中包含了一下指标:

  • Checkpoint counts:包含了触发、进行中、完成、失败、重置等Checkpoint状态数量统计。
  • lastest completed Checkpoint:记录了最近一次完成的Checkpoint信息,包括结束时间,端到端市场,状态大小等。
  • lastest faild Checkpoint:记录了最近一次失败的Checkpoint信息。
  • lastest savepoint:记录了最近一次savepoint触发的信息。
  • lastest restore:记录了最近一次重置操作的信息,包括从Checkpoint到savepoint两种数据中重置恢复任务。

二、history

history页面记录了历史触发Checkpoint的详情,包括Checkpoint的ID、状态、触发时间,最后一次Acknowledgement信息等,通过点击More details对应的链接可以查看子task对应的Checkpoint数据

三、summary

summary页面中记录了所有完成的Checkpoint统计指标的最大值、最小值,以及平均值等,指标中包含端对端的持续时间、状态大小,以及分配过程中缓冲的数据大小。

四、configuration

  • configuration中包含Checkpoint中所有的基本配置,具体配置如下:
  • Checkpoint mode:标记Checkpoint是exactly once 还是 at least once的模式。
  • interval:Checkpoint触发的时间间隔,时间间隔越小意味着越频繁的Checkpoint。
  • timeout:Checkpoint触发超时时间,超过指定时间JobManager会取消当次Checkpoint,并重新启动新的Checkpoint。
  • minimum pause between Checkpoint:配置两个Checkpoint之间最短时间间隔,当上一次Checkpoint结束后,需要等待该时间间隔才能触发下一次Checkpoint,避免触发过多的Checkpoint导致系统资源被消耗。
  • persist Checkpoint externally:如果开启Checkpoint,数据将同时写到外部持久化存储中

flink checkpoint状态储存三种方式选择

Flink 提供了三种可用的状态后端:MemoryStateBackend,FsStateBackend,和RocksDBStateBackend。

image.png

MemoryStateBackend

MemoryStateBackend 是将状态维护在 Java 堆上的一个内部状态后端。键值状态和窗口算子使用哈希表来存储数据(values)和定时器(timers)。当应用程序 checkpoint 时,此后端会在将状态发给 JobManager 之前快照下状态,JobManager 也将状态存储在 Java 堆上。默认情况下,MemoryStateBackend 配置成支持异步快照。异步快照可以避免阻塞数据流的处理,从而避免反压的发生。

使用 MemoryStateBackend 时的注意点:

默认情况下,每一个状态的大小限制为 5 MB。可以通过 MemoryStateBackend 的构造函数增加这个大小。
状态大小受到 akka 帧大小的限制,所以无论怎么调整状态大小配置,都不能大于 akka 的帧大小。也可以通过 akka.framesize 调整 akka 帧大小(通过配置文档了解更多)。
状态的总大小不能超过 JobManager 的内存。

何时使用 MemoryStateBackend:

本地开发或调试时建议使用 MemoryStateBackend,因为这种场景的状态大小的是有限的。
MemoryStateBackend 最适合小状态的应用场景。例如 Kafka consumer,或者一次仅一记录的函数 (Map, FlatMap,或 Filter)。

FsStateBackend

FsStateBackend 需要配置的主要是文件系统,如 URL(类型,地址,路径)。举个例子,比如可以是:
“hdfs://namenode:40010/flink/checkpoints” 或
“s3://flink/checkpoints”
当选择使用 FsStateBackend 时,正在进行的数据会被存在 TaskManager 的内存中。在 checkpoint 时,此后端会将状态快照写入配置的文件系统和目录的文件中,同时会在 JobManager 的内存中(在高可用场景下会存在 Zookeeper 中)存储极少的元数据。

默认情况下,FsStateBackend 配置成提供异步快照,以避免在状态 checkpoint 时阻塞数据流的处理。该特性可以实例化 FsStateBackend 时传入 false 的布尔标志来禁用掉,例如:
new FsStateBackend(path, false);

使用 FsStateBackend 时的注意点:

当前的状态仍然会先存在 TaskManager 中,所以状态的大小不能超过 TaskManager 的内存。

何时使用 FsStateBackend:

FsStateBackend 适用于处理大状态,长窗口,或大键值状态的有状态处理任务。
FsStateBackend 非常适合用于高可用方案。

RocksDBStateBackend

RocksDBStateBackend 的配置也需要一个文件系统(类型,地址,路径),如下所示:

“hdfs://namenode:40010/flink/checkpoints” 或
“s3://flink/checkpoints”
RocksDB 是一种嵌入式的本地数据库。RocksDBStateBackend 将处理中的数据使用 RocksDB 存储在本地磁盘上。在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。RocksDB 默认也是配置成异步快照的模式。

使用 RocksDBStateBackend 时的注意点:

RocksDB 支持的单 key 和单 value 的大小最大为每个 2^31 字节。这是因为 RocksDB 的 JNI API 是基于 byte[] 的。
我们需要强调的是,对于使用具有合并操作的状态的应用程序,例如 ListState,随着时间可能会累积到超过 2^31 字节大小,这将会导致在接下来的查询中失败。

何时使用 RocksDBStateBackend:

RocksDBStateBackend 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。
RocksDBStateBackend 非常适合用于高可用方案。
RocksDBStateBackend 是目前唯一支持增量 checkpoint 的后端。增量 checkpoint 非常使用于超大状态的场景。
当使用 RocksDB 时,状态大小只受限于磁盘可用空间的大小。这也使得 RocksDBStateBackend 成为管理超大状态的最佳选择。使用 RocksDB 的权衡点在于所有的状态相关的操作都需要序列化(或反序列化)才能跨越 JNI 边界。与上面提到的堆上后端相比,这可能会影响应用程序的吞吐量。


聊聊flink的checkpoint配置

本文主要研究下flink的checkpoint配置

实例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
  • 使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode);interval用于指定checkpoint的触发间隔(单位milliseconds),而CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE,也可以指定为CheckpointingMode.AT_LEAST_ONCE

  • 也可以通过StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以

  • checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉

  • minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1

  • maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)

  • enableExternalizedCheckpoints用于开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state

  • failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行

flink-conf.yaml相关配置

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# &lt;class-name-of-factory&gt;.
#
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
# state.backend.incremental: false
  • state.backend用于指定checkpoint state存储的backend,默认为none

  • state.backend.async用于指定backend是否使用异步snapshot(默认为true),有些不支持async或者只支持async的state backend可能会忽略这个参数

  • state.backend.fs.memory-threshold,默认为1024,用于指定存储于files的state大小阈值,如果小于该值则会存储在root checkpoint metadata file

  • state.backend.incremental,默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置

  • state.backend.local-recovery,默认为false

  • state.checkpoints.dir,默认为none,用于指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见

  • state.checkpoints.num-retained,默认为1,用于指定保留的已完成的checkpoints个数

  • state.savepoints.dir,默认为none,用于指定savepoints的默认目录

  • taskmanager.state.local.root-dirs,默认为none

小结

  • 可以通过使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode)

  • checkpoint的高级配置可以配置checkpointTimeout(用于指定checkpoint执行的超时时间,单位milliseconds),minPauseBetweenCheckpoints(用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint),maxConcurrentCheckpoints(用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数大于1的值不起作用),enableExternalizedCheckpoints(用于开启checkpoints的外部持久化,在job failed的时候externalized checkpoint state无法自动清理,但是在job canceled的时候可以配置是删除还是保留state)

  • 在flink-conf.yaml里头也有checkpoint的相关配置,主要是state backend的配置,比如state.backend.async、state.backend.incremental、state.checkpoints.dir、state.savepoints.dir等


来源:
Flink的Checkpoint和Savepoint介绍:https://www.shuzhiduo.com/A/q4zVy2D2dK/
Flink的CheckPoint:https://www.shuzhiduo.com/A/xl56Y1XxJr/
flink的checkpoint页面监控:https://www.shuzhiduo.com/A/Gkz13xY65R/
flink checkpoint状态储存三种方式选择:https://www.shuzhiduo.com/A/nAJv0qYQdr/
聊聊flink的checkpoint配置:https://www.shuzhiduo.com/A/kjdwp8NEzN/

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

[1143]Flink的Checkpoint和Savepoint 的相关文章

随机推荐

  • Shell编程中脱字符(^)的用法

    cat configs signatures tmp 将configs signatures tmp文件内容作为grep命令的输入 grep v v是grep排除的参数 将configs signatures tmp除去空行的内容作为sor
  • 蓝桥杯在哪下载准考证

    点击自己的头像 gt 我的大赛 gt 会出现如下 gt 点击Java软件开发 根据自己报的方向 gt 可以看到考试信息 gt 下载准考证即可 转载于 https blog 51cto com 13534640 2090954
  • Kafka消息阻塞

    转自 http jis117 iteye com blog 2279519 hi all 大家都很关心kafka消息阻塞的情况 感谢RoctetMQ给我们的教训 Kafka上线也有一段时间了 确实有出现过消息阻塞的情况 虽然不影响业务而且用
  • 判断逻辑关系的运算符 && 与,

    package com test basic chapter2 功能 判断逻辑关系的运算符 与 或 非 与 同为真即真 或 有真即真 非 public class LogicalOperators public static void ma
  • Java中map集合,你真的了解吗?

    在Java编程中 map集合是一个非常重要的数据结构 它可以存储键值对 并且可以根据键快速查找对应的值 今天 我们就来详细介绍一下Java中的map集合 首先 让我们来了解一下map集合的基本概念 在Java中 map集合是一个接口 它有多
  • padding的四个参数_Google Flutter 布局(二)-Padding、Align、Center详解

    1 1 简介 Padding在Flutter中用的也挺多的 作为一个基础的控件 功能非常单一 给子节点设置padding属性 写过其他端的都了解这个属性 就是设置内边距属性 内边距的空白区域 也是widget的一部分 Flutter中并没有
  • 深入理解Java——从入门到精通

    标题 深入理解Java 从入门到精通 Java是一门广泛应用于软件开发领域的高级编程语言 它的特点包括强大的跨平台性 面向对象的编程范式 丰富的类库以及良好的安全性 无论是在企业级应用开发 移动应用开发还是嵌入式系统开发中 Java都扮演着
  • [分享]如何使用Angular中的自定义校验器

    在Angular的开发中表单是必不可少的 由此引出的表单校验也是万万不可缺少的 解决方法一 全部使用ngModel 双向绑定数据 在最后的submit点击时进行所有表单的校验 这样做固然没有什么问题 而且代码写的也和我们之前使用jQuery
  • 单元格法近似求解多边形最大内接矩形问题【思路讲解+java实现】

    文章目录 问题描述 问题解决方案 多边形网格化 区分每个单元格是在多边形内部还是外部 根据已标记单元格寻找最大内接矩形 剪枝优化 多角度旋转 案例测试 代码实现 说明 问题描述 给定一个多边形的点集 希望找出多边形内部面积最大的矩形 该问题
  • SPI机制是什么?

    一 SPI机制是什么 spi全称为 Service Provider Interface 是JDK内置的一种服务提供发现机制 SPI是一种动态替换发现的机制 一种解耦非常优秀的思想 spi的工作原理 就是ClassPath路径下的META
  • 已知三角形三边长怎么求面积_已知三角形三边求面积的公式——海伦公式

    海伦公式又译作希伦公式 海龙公式 希罗公式 海伦 秦九韶公式 传说是古代的叙拉古国王希伦 Heron 也称海龙 二世发现的公式 是一个利用三角形的三条边长直接求三角形面积的公式 下面我们利用初中的知识进行推导 注意 公式推导过程的方法比公式
  • BigDecimal:比double更精确的小数类

    BigDecimal概述 BigInteger是Number的子类 它用来对超过16位有效位的数进行精确的运算 双精度浮点型变量double可以处理16位有效数 在实际应用中 需要对更大或者更小的数进行运算和处理 float和double只
  • Wordpess百度自动推送代码

    直接将代码中的token换成百度站长中自己的即可 将下列代码放入functions php中就行了 WordPress 百度快速收录 API 提交代码 if function exists Baidu Submit function Bai
  • 操作系统实验进程调度模拟

    操作系统实验 实验一 实验1 基于优先数的时间片轮转调度算法调度处理模拟程序设计 一 实验目的 1 对进程调度的工作做进一步的理解 2 了解进程调度的任务 3 通过编程掌握基于优先数的时间片轮转调度算法具体实现过程 二 实验内容及实验要求
  • 遗传算法的实现

    请用遗传算法实现如下最大化问题 首先先来学习下 遗传算法的本质和过程 遗传算法是计算数学中用于解决最佳化的搜索算法 是进化算法的一种 进化算法最初是借鉴了进化生物学中的一些现象而发展起来的 这些现象包括遗传 突变 自然选择以及杂交等 遗传算
  • C#关键字 abstract,override,virtual的用法

    什么是抽象类 abstract关键字修饰的类称为抽象类 抽象类不能被实例化 抽象类是派生类的基类 关键字 abstract 语法 public abstract class 类名 1 一个抽象类可以同时包含抽象方法和非抽象方法 但不能实例化
  • IDEA工具快捷键---补全返回值

    Ctrl alt v 自动提示
  • 接口测试开发之:一篇搞懂 Cache、Cookie及Session的爱恨情仇

    Cashe Cookie与Session 1 引言 2 Cache 2 1 缓存定义 2 1 1 缓存概念 2 1 2 缓存优点 2 2 浏览器缓存 2 2 1 存储路径 2 2 2 缓存优点 2 2 3 缓存弊端 2 2 4 原理图 2
  • 【习题三】【数据库原理】

    文章目录 一 单选题 二 填空题 一 单选题 1 X Y能从推理规则导出的充分必要条件是 正确答案 B 2 设有关系模式R A B C D E 函数依赖集F A B B C C D D A AB BC AD 是R上的一个分解 那么分解 相对
  • [1143]Flink的Checkpoint和Savepoint

    文章目录 Flink的Checkpoint和Savepoint介绍 第一部分 Flink的Checkpoint 1 Flink Checkpoint原理介绍 2 Checkpoint的简单设置 3 保存多个Checkpoint 4 从Che