Flink State 和 Fault Tolerance详解

2023-11-17

有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态,这就使得状态在整个Flink的精细化计算中有着非常重要的地位:

  • 记录数据从某一个过去时间点到当前时间的状态信息。
  • 以每分钟/小时/天汇总事件时,状态将保留待处理的汇总记录。
  • 在训练机器学习模型时,状态将保持当前版本的模型参数。

Flink在管理状态方面,使用Checkpoint和Savepoint实现状态容错。Flink的状态在计算规模发生变化的时候,可以自动在并行实例间实现状态的重新分发,底层使用State Backend策略存储计算状态,State Backend决定了状态存储的方式和位置(后续章节介绍)。

Flink在状态管理中将所有能操作的状态分为Keyed StateOperator State,其中Keyed State类型的状态同key一一绑定,并且只能在KeyedStream中使用。所有non-KeyedStream状态操作都叫做Operator State。Flink在底层做状态管理时,将Keyed State和<parallel-operator-instance, key>关联,由于某一个key仅仅落入其中一个operator-instance中,因此可以简单的理解Keyed State是和<operator,key>进行绑定的,采用Key Group机制对Keyed State进行管理或者分类,所有的keyed-operator在做状态操作的时候可能需要和1~n个Key Group进行交互。

Flink在分发Keyed State状态的时候,不是以key为单位,而是以Key Group为最小单元分发

Operator State (也称为 non-keyed state),每一个operator state 会和一个parallel operator instance进行绑定。Keyed StateOperator State 以两种形式存在( managed(管理)和 raw(原生)),所有Flink已知的操作符都支持Managed State,但是Raw State仅仅在用户自定义Operator时使用,并且不支持在并行度发生变化的时候重新分发状态,因此,虽然Flink支持Raw State,但是在绝大多数的应用场景下,一般使用的都是Managed State。

Keyed State

Keyed-state接口提供对不同类型状态的访问,所有状态都限于当前输入元素的key。

类型 说明 方法
ValueState 这个状态主要存储一个可以用作更新的值 update(T)
T value()
clear()
ListState 这将存储List集合元素 add(T)
addAll(List)
Iterable get()
update(List)
clear()
ReducingState 这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供ReduceFunction
add(T)
T get()
clear()
AggregatingState<IN, OUT> 这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供AggregateFunction
add(IN)
T get()
clear()
FoldingState<T, ACC> 这将保留一个值,该值表示添加到状态的所有值的汇总
需要用户提供FoldFunction
add(IN)
T get()
clear()
MapState<UK, UV> 这个状态会保留一个Map集合元素 put(UK, UV)
putAll(Map<UK, UV>)
entries()
keys()
values()
clear()
ValueSate
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {
    var vs:ValueState[Int]=_
    
    override def open(parameters: Configuration): Unit = {
        val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])
        vs=getRuntimeContext.getState[Int](vsd)
    }
    
    override def map(value: (String, Int)): (String, Int) = {
        val histroyCount = vs.value()
        val currentCount=histroyCount+value._2
        vs.update(currentCount)
        (value._1,currentCount)
    }
}).print()
env.execute("wordcount")
AggregatingState<IN, OUT>
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toInt))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Double)] {
    var vs:AggregatingState[Int,Double]=_
    override def open(parameters: Configuration): Unit = {
        val vsd=new AggregatingStateDescriptor[Int,(Double,Int),Double]("avgCount",new AggregateFunction[Int,(Double,Int),Double] {
            override def createAccumulator(): (Double, Int) = {
                (0.0,0)
            }

            override def add(value: Int, accumulator: (Double, Int)): (Double, Int) = {
                (accumulator._1+value,accumulator._2+1)
            }
            
            override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = {
                (a._1+b._1,a._2+b._2)
            }
            
            override def getResult(accumulator: (Double, Int)): Double = {
                accumulator._1/accumulator._2
            }
        },createTypeInformation[(Double,Int)])
        vs=getRuntimeContext.getAggregatingState(vsd)
    }
    override def map(value: (String, Int)): (String, Double) = {
        vs.add(value._2)
        val avgCount=vs.get()
        (value._1,avgCount)
    }
}).print()
env.execute("wordcount")
MapState<UK, UV>
var env=StreamExecutionEnvironment.getExecutionEnvironment
//001 zs 202.15.10.12 日本 2019-10-10
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>Login(ts(0),ts(1),ts(2),ts(3),ts(4)))
.keyBy("id","name")
.map(new RichMapFunction[Login,String] {
    var vs:MapState[String,String]=_
    override def open(parameters: Configuration): Unit = {
        val msd=new MapStateDescriptor[String,String]("mapstate",createTypeInformation[String],createTypeInformation[String])
        vs=getRuntimeContext.getMapState(msd)
    }
    override def map(value: Login): String = {
        println("历史登录")
        for(k<- vs.keys().asScala){
            println(k+" "+vs.get(k))
        }
        var result=""
        if(vs.keys().iterator().asScala.isEmpty){
            result="ok"
        }else{
            if(!value.city.equalsIgnoreCase(vs.get("city"))){
                result="error"
            }else{
                result="ok"
            }
        }
        vs.put("ip",value.ip)
        vs.put("city",value.city)
        vs.put("loginTime",value.loginTime)
        result
    }
}).print()
env.execute("wordcount")
总结
new Rich[Map|FaltMap]Function {
    var vs:XxxState=_ //状态声明
    override def open(parameters: Configuration): Unit = {
        val xxd=new XxxStateDescription //完成状态的初始化
        vs=getRuntimeContext.getXxxState(xxd)
    }
    override def xxx(value: Xx): Xxx = {
       //状态操作
    }
}
  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

State Time-To-Live(TTL)

基本使用

可以将state存活时间(TTL)分配给任何类型的keyed-state,如果配置了TTL且状态值已过期,则Flink将尽力清除存储的历史状态值。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
  • 案例
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {
    var vs:ValueState[Int]=_
    override def open(parameters: Configuration): Unit = {
        val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])

        val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) //过期时间5s
        .setUpdateType(UpdateType.OnCreateAndWrite)//创建和修改的时候更新过期时间
        .setStateVisibility(StateVisibility.NeverReturnExpired)//永不返回过期的数据
        .build()

        vsd.enableTimeToLive(ttlConfig)

        vs=getRuntimeContext.getState[Int](vsd)
    }
    override def map(value: (String, Int)): (String, Int) = {
        val histroyCount = vs.value()
        val currentCount=histroyCount+value._2
        vs.update(currentCount)
        (value._1,currentCount)
    }
}).print()
env.execute("wordcount")

注意:开启TTL之后,系统会额外消耗内存存储时间戳(Processing Time),如果用户以前没有开启TTL配置,在启动之前修改代码开启了TTL,在做状态恢复的时候系统启动不起来,会抛出兼容性失败以及StateMigrationException的异常。

清除Expired State

在默认情况下,仅当明确读出过期状态时,通过调用ValueState.value()方法才会清除过期的数据,这意味着,如果系统一直未读取过期的状态,则不会将其删除,可能会导致存储状态数据的文件持续增长。

Cleanup in full snapshot

系统会从上一次状态恢复的时间点,加载所有的State快照,在加载过程中会剔除那些过期的数据,这并不会影响磁盘已存储的状态数据,该状态数据只会在Checkpoint的时候被覆盖,但是依然解决不了在运行时自动清除过期且没有用过的数据。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build

只能用于memory或者snapshot状态的后端实现,不支持RocksDB State Backend。

Cleanup in background

可以开启后台清除策略,根据State Backend采取默认的清除策略(不同状态的后端存储,清除策略不同)

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInBackground
.build
  • Incremental cleanup(基于内存backend)
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupIncrementally(100,true) //默认值 5 | false
.build()

第一个参数表示每一次触发cleanup的时候,系统会一次处理100个元素。第二个参数是false,表示只要用户对任意一个state进行操作,系统都会触发cleanup策略;第二个参数是true,表示只要系统接收到记录数(即使用户没有操作状态)就会触发cleanup策略。

  • RocksDB compaction

RocksDB是一个嵌入式的key-value存储,其中key和value是任意的字节流,底层进行异步压缩,会将key相同的数据进行compact(压缩),以减少state文件大小,但是并不对过期的state进行清理,因此可以通过配置compactFilter,让RocksDB在compact的时候对过期的state进行排除,RocksDB数据库的这种过滤特性,默认关闭,如果想要开启,可以在flink-conf.yaml中配置 state.backend.rocksdb.ttl.compaction.filter.enabled:true 或者在应用程序的API里设置RocksDBStateBackend::enableTtlCompactionFilter。

在这里插入图片描述

import org.apache.flink.api.common.state.StateTtlConfig 
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000) //默认配置1000
.build()

这里的1000表示,系统在做Compact的时候,会检查1000个元素是否失效,如果失效,则清除该过期数据。

Operator State

如果用户想要使用Operator State,只需要实现通用的CheckpointedFunction 接口或者ListCheckpointed<T extends Serializable>,值得注意的是,目前的operator-state仅仅支持list-style风格的状态,要求所存储的状态必须是一个List,且其中的元素必须可以序列化。

CheckpointedFunction

提供两种不同的状态分发方案:Even-splitUnion

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照
  • initializeState():第一次启动或者从上一次状态恢复的时候,系统会调用initializeState()

Even-split:表示系统在故障恢复时,会将operator-state的元素均分给所有的operator实例,每个operator实例将获取到整个operator-state的sub-list数据。

Union:表示系统在故障恢复时,每一个operator实例可以获取到整个operator-state的全部数据。

案例

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]  with CheckpointedFunction  {
    var listState:ListState[(String,Int)]=_
    val bufferedElements = ListBuffer[(String, Int)]()
    //负责将数据输出到外围系统
    override def invoke(value: (String, Int)): Unit = {
        bufferedElements += value
        if(bufferedElements.size == threshold){
            for(ele <- bufferedElements){
                println(ele)
            }
            bufferedElements.clear()
        }
    }
    //是在savepoint|checkpoint时候将数据持久化
    override def snapshotState(context: FunctionSnapshotContext): Unit = {
        listState.clear()
        for(ele <- bufferedElements){
            listState.add(ele)
        }
    }
    //状态恢复|初始化 创建状态
    override def initializeState(context: FunctionInitializationContext): Unit = {
        val lsd = new ListStateDescriptor[(String, Int)]("buffered-elements",createTypeInformation[(String,Int)])
        listState=context.getOperatorStateStore.getListState(lsd)
        if(context.isRestored){
            for(element <- listState.get().asScala) {
                bufferedElements += element
            }
        }
    }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new BufferingSink(5))
env.execute("testoperatorstate")
  • 启动netcat服务
[root@centos ~]# nc -lk 9999
  • 提交任务

在这里插入图片描述

注意,将并行度设置为1,方便测试

  • 在netcat中输入以下数据
[root@centos ~]# nc -lk 9999
a1 b1 c1 d1
  • 取消任务,并且创建savepoint
[root@centos flink-1.8.1]# ./bin/flink list -m centos:8081
------------------ Running/Restarting Jobs -------------------
17.10.2019 09:49:20 : f21795e74312eb06fbf0d48cb8d90489 : testoperatorstate (RUNNING)
--------------------------------------------------------------
[root@centos flink-1.8.1]# ./bin/flink cancel -m centos:8081 -s hdfs:///savepoints f21795e74312eb06fbf0d48cb8d90489
Cancelling job f21795e74312eb06fbf0d48cb8d90489 with savepoint to hdfs:///savepoints.
Cancelled job f21795e74312eb06fbf0d48cb8d90489. Savepoint stored in hdfs://centos:9000/savepoints/savepoint-f21795-38e7beefe07b.

注意,如果Flink需要和Hadoop整合,必须保证在当前环境变量下有HADOOP_HOME|HADOOP_CALSSPATH

[root@centos flink-1.8.1]# cat /root/.bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
  • 测试状态

在这里插入图片描述

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction接口的一种变体形式,仅仅支持Even-split状态的分发策略。

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
  • snapshotState():调用checkpoint()的时候,系统会调用snapshotState()对状态做快照
  • restoreState():等价于上述CheckpointedFunction中声明的initializeState()方法,用作状态恢复

案例

import java.lang.{Long => JLong} //修改类别名
import scala.{Long => SLong} //修改类别名
class CustomStatefulSourceFunction extends ParallelSourceFunction[SLong] with ListCheckpointed[JLong]{
  @volatile
  var isRunning:Boolean = true
  var offset = 0L
    
  override def run(ctx: SourceFunction.SourceContext[SLong]): Unit = {
    val lock = ctx.getCheckpointLock
    while(isRunning){
       Thread.sleep(1000)
       lock.synchronized({
         ctx.collect(offset)
         offset += 1
       })
    }
  }

  override def cancel(): Unit = {
    isRunning=false
  }

  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JLong] = {
    Collections.singletonList(offset) //存储的是 当前source的偏移量,如果状态不可拆分,用户可以使Collections.singletonList
  }

  override def restoreState(state: util.List[JLong]): Unit = {
    for (s <- state.asScala) {
      offset = s
    }
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.addSource[Long](new CustomStatefulSourceFunction)
.print("offset:")
env.execute("testOffset")

广播状态

支持Operator State的第三种类型是广播状态。引入广播状态以支持用例,其中需要将来自一个流的某些数据广播到所有下游任务,广播的状态将存储在本地,用于处理另一个流上所有传入的元素。

A third type of supported operator state is the Broadcast State. Broadcast state was introduced to support use cases where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally and is used to process all incoming elements on the other stream.

non-keyed
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
class UserBuyPathBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends BroadcastProcessFunction[UserBuyPath,Rule,String]{
    //处理的是UserBuyPath,读取广播状态
    override def processElement(value: UserBuyPath,
                                ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#ReadOnlyContext,
                                out: Collector[String]): Unit = {
        val broadcastState = ctx.getBroadcastState(msd)
        if(broadcastState.contains(value.channel)){//如果有规则,尝试计算
            val threshold= broadcastState.get(value.channel)
            if(value.path >= threshold){//将满足条件的用户信息输出
                out.collect(value.id+" "+value.name+" "+value.channel+" "+value.path)
            }
        }
    }
    //处理的是规则 Rule 数据 ,记录修改广播状态
    override def processBroadcastElement(value: Rule, ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#Context,
                                         out: Collector[String]): Unit = {
        val broadcastState = ctx.getBroadcastState(msd)
        broadcastState.put(value.channel,value.threshold)//更新状态

        println("=======rule======")
        for(entry <- broadcastState.entries().asScala){
            println(entry.getKey+"\t"+entry.getValue)
        }
        println()
        println()
    }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手机 view
// 001 mack 手机 view
// 001 mack 手机 addToCart
// 001 mack 手机 buy
val userStream = fsEnv.socketTextStream("centos", 9999)
    .map(line => line.split("\\s+"))
    .map(ts => UserAction(ts(0), ts(1), ts(2), ts(3)))
    .keyBy("id", "name")
    .map(new UserActionRichMapFunction)

val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],
                                           createTypeInformation[Int])
// channel 阈值
// 手机类   10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888)
    .map(line => line.split("\\s+"))
    .map(ts => Rule(ts(0), ts(1).toInt))
    .broadcast(msd)

userStream.connect(broadcastStream)
.process(new UserBuyPathBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")
case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
case class UserBuyPath(id:String,name:String,channel:String,path:Int)
class UserActionRichMapFunction extends RichMapFunction[UserAction,UserBuyPath]{
  var buyPathState:MapState[String,Int]=_
  override def open(parameters: Configuration): Unit = {
   val msd= new MapStateDescriptor[String,Int]("buy-path",createTypeInformation[String],createTypeInformation[Int])
   buyPathState=getRuntimeContext.getMapState(msd)
  }
  override def map(value: UserAction): UserBuyPath = {
    val channel = value.channel
    var path=0
    if(buyPathState.contains(channel)){
      path=buyPathState.get(channel)
    }
    if(value.action.equals("buy")){
      buyPathState.remove(channel)
    }else{
      buyPathState.put(channel,path+1)
    }
    UserBuyPath(value.id,value.name,value.channel,buyPathState.get(channel))
  }
}
keyed
class UserBuyPathKeyedBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends KeyedBroadcastProcessFunction[String,UserAction,Rule,String]{
  override def processElement(value: UserAction,
                              ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#ReadOnlyContext,
                              out: Collector[String]): Unit = {
    println("value:"+value +" key:"+ctx.getCurrentKey)
    println("=====state======")
    for(entry <- ctx.getBroadcastState(msd).immutableEntries().asScala){
      println(entry.getKey+"\t"+entry.getValue)
    }
  }

  override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#Context, out: Collector[String]): Unit = {
     println("Rule:"+value)
    //更新状态
    ctx.getBroadcastState(msd).put(value.channel,value.threshold)
  }
}

case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手机 view
// 001 mack 手机 view
// 001 mack 手机 addToCart
// 001 mack 手机 buy
val userKeyedStream = env.socketTextStream("centos", 9999)
.map(line => line.split("\\s+"))
.map(ts => UserAction(ts(0), ts(1), ts(2), ts(3)))
.keyBy(0)//只可以写一个参数

val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],
                                           createTypeInformation[Int])
// channel 阈值
// 手机类 10
// 电子类 10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888)
.map(line => line.split("\\s+"))
.map(ts => Rule(ts(0), ts(1).toInt))
.broadcast(msd)
userKeyedStream.connect(broadcastStream)
.process(new UserBuyPathKeyedBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")

CheckPoint & SavePoint

CheckPoint是Flink实现故障容错的一种机制,系统会根据配置的检查点定期自动对程序计算状态进行备份。一旦程序在计算过程中出现故障,系统会选择一个最近的检查点进行故障恢复。

SavePoint是一种有效的运维手段,需要用户手动触发程序进行状态备份,本质也是在做CheckPoint。

实现故障恢复的先决条件:

  • 持久的数据源,可以在一定时间内重播记录(例如,FlinkKafkaConsumer)
  • 状态的永久性存储,通常是分布式文件系统(例如,HDFS)
var env=StreamExecutionEnvironment.getExecutionEnvironment
//启动检查点机制
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//配置checkpoint必须在2s内完成一次checkpoint,否则检查点终止
env.getCheckpointConfig.setCheckpointTimeout(2000)
//设置checkpoint之间时间间隔 <=  Checkpoint interval
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5)
//配置checkpoint并行度,不配置默认1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//一旦检查点不能正常运行,Task也将终止
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
//将检查点存储外围系统 filesystem、rocksdb,可以配置在cancel任务时候,系统是否保留checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以写一个参数
.sum(1)
.print()
env.execute("testoperatorstate")

State Backend

State Backend决定Flink如何存储系统状态信息(Checkpoint形式),目前Flink提供了三种State Backend实现。

  • Memory (JobManagwer):这是Flink的默认实现,通常用于测试,系统会将计算状态存储在JobManager的内存中,但是在实际的生产环境中,由于计算的状态比较多,使用Memory 很容易导致OOM(out of memory)。
  • FileSystem:系统会将计算状态存储在TaskManager的内存中,因此一般用作生产环境,系统会根据CheckPoin机制,将TaskManager状态数据在文件系统上进行备份。如果是超大规模集群,TaskManager内存也可能发生溢出。
  • RocksDB:系统会将计算状态存储在TaskManager的内存中,如果TaskManager内存不够,系统可以使用RocksDB配置本地磁盘完成状态的管理,同时支持将本地的状态数据备份到远程文件系统,因此,RocksDB Backend 是推荐的选择。

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html

每一个Job都可以配置自己状态存储的后端实现

var env=StreamExecutionEnvironment.getExecutionEnvironment
val fsStateBackend:StateBackend = new FsStateBackend("hdfs:///xxx") //MemoryStateBackend、FsStateBackend、RocksDBStateBackend
env.setStateBackend(fsStateBackend)

如果用户不配置,则系统使用默认实现,默认实现可以通过修改flink-conf-yaml文件进行配置

[root@centos ~]# cd /usr/flink-1.8.1/
[root@centos flink-1.8.1]# vi conf/flink-conf.yaml
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
 state.backend: rocksdb
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
 state.checkpoints.dir: hdfs:///flink-checkpoints
# Default target directory for savepoints, optional.
#
 state.savepoints.dir: hdfs:///flink-savepoints
 
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
 state.backend.incremental: true

注意,必须在环境变量中出现HADOOP_CLASSPATH

Flink计算发布之后是否还能够修改计算算子?

首先,这在Spark中是不允许的,因为Spark会持久化代码片段,一旦修改代码,必须删除Checkpoint,但是Flink仅仅存储各个算子的计算状态,如果用户修改代码,需要用户在有状态的操作算子上指定uid属性。

env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.uid("kakfa-consumer")
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以写一个参数
.sum(1)
.uid("word-count") //唯一
.map(t=>t._1+"->"+t._2)
.print()

Flink Kafka如何保证精准一次的语义操作?

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink State 和 Fault Tolerance详解 的相关文章

  • Flink 1.17教程:聚合算子(Aggregation)之按键分区(keyBy)

    聚合算子 Aggregation 计算的结果不仅依赖当前数据 还跟之前的数据有关 相当于要把所有数据聚在一起进行汇总合并 这就是所谓的 聚合 Aggregation 类似于MapReduce中的reduce操作 按键分区 keyBy 对于F
  • 大数据技术Flink详解

    一 有状态的流式处理 Apache Flink 是一个分布式流处理器 具有直观和富有表现力的API 可实现有状态的流处理应用程序 它以容错的方式有效地大规模运行这些应用程序 Flink 于2014 年4 月加入Apache 软件基金会作为孵
  • 使用arthas在线诊断flink的那些事

    最近在使用arthas诊断工具 诊断java服务的一些问题 突然想到能不能使用arthas诊断flink的jobManager和taskManager呢 答案是可以的 采用javaagent 在flink启动jobmanager和taskM
  • Caused by: java.lang.NoClassDefFoundError: javax/tools/ToolProvider

    解决方案 在pom文件中的scala maven plugin插件下面加入一个参数 pom xml配置如下
  • Flink CDC(2.0) 如何加速海量数据的实时集成?

    原文 Flink CDC 如何加速海量数据的实时集成 知乎 导读 Flink CDC如何解决海量数据集成的痛点 如何加速海量数据处理 Flink CDC社区如何运营 如何参与社区贡献 今天的介绍会围绕下面四点展开 Flink CDC 技术
  • Apache Flink不止于计算,数仓架构或兴起新一轮变革

    2021 年初 在 InfoQ 编辑部策划的全年技术趋势展望中 我们提到大数据领域将加速拥抱 融合 或 一体化 演进的新方向 本质是为了降低大数据分析的技术复杂度和成本 同时满足对性能和易用性的更高要求 如今 我们看到流行的流处理引擎 Ap
  • Flink自定义实现ElasticSearch Table Source

    Flink版本 1 12 1 ES Maven版本 elasticsearch rest client 6 3 1 FLINK TableSource官方文档 https ci apache org projects flink flink
  • Macbook Pro 鼠标卡顿问题

    Macbook Pro 鼠标卡顿问题 目前无解 只能改善 该问题最早能追溯到 2015年 https jingyan baidu com article ff42efa93632c5c19e220208 html 原因 据说是无线频段冲突
  • flink 问题记录

    文章目录 1 Caused by java lang UnsatisfiedLinkError org apache hadoop util NativeCrc32 nativeComputeChunkedSums IILjava nio
  • Flink Web UI 介绍

    一 提交flink任务到yarn flink run m yarn cluster yn 1 p 2 yjm 1024 ytm 1024 ynm FlinkOnYarnSession MemberLogInfoProducer d c co
  • 深入理解Flink的水位线

    Apache Flink是一个流处理框架 它支持事件时间和处理时间的概念 在处理流数据时 Flink通过水位线 Watermark 来追踪事件时间的进度 从而支持事件时间的操作 水位线是一种特殊的事件 它表示在此时间戳之前的所有事件都已经到
  • 基于Canal与Flink实现数据实时增量同步(一)

    点击上方蓝色字体 关注我 canal是阿里巴巴旗下的一款开源项目 纯Java开发 基于数据库增量日志解析 提供增量数据订阅 消费 目前主要支持了MySQL 也支持mariaDB 准备 配置MySQL的binlog 常见的binlog命令 是
  • 大数据技术之 Flink-CDC

    第1章 CDC简介 1 1 什么是 CDC CDC 是 Change Data Capture 变更数据获取 的简称 核心思想是 监测并捕获数据库的变动 包括数据或数据表的插入 更新以及删除等 将这些变更按发生的顺序完整记录下来 写入到消息
  • Flink学习27:驱逐器

    import org apache flink api common eventtime SerializableTimestampAssigner WatermarkStrategy import org apache flink api
  • flink 1.4版本flink table方式消费kafka写入hive方式踩坑

    最近在搞flink 搞了一个当前比较新的版本试了一下 当时运行了很长时间 hdfs里面查询有文件 但是hive里面查询这个表为空 后面用了很多种方式 一些是说自己去刷新hive表 如下 第一种方式刷新 alter table t kafka
  • 如何在 Flink 1.9 中使用 Hive?

    Flink on Hive 介绍 SQL 是大数据领域中的重要应用场景 为了完善 Flink 的生态 发掘 Flink 在批处理方面的潜力 我们决定增强 FlinkSQL 的功能 从而让用户能够通过 Flink 完成更多的任务 Hive 是
  • 【基础】Flink -- ProcessFunction

    Flink ProcessFunction 处理函数概述 处理函数 基本处理函数 ProcessFunction 按键分区处理函数 KeyedProcessFunction 定时器与定时服务 基于处理时间的分区处理函数 基于事件时间的分区处
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • flink-addSource和addSink分别是kafka、自定义数据、mysql、hbase的java实现

    flink主程序 public class FinkTest public static void main String args throws Exception StreamExecutionEnvironment env Strea
  • 大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化

    文章目录 0 前言 1 环境准备 1 1 flink 下载相关 jar 包 1 2 生成 kafka 数据 1 3 开发前的三个小 tip 2 flink sql 客户端编写运行 sql 2 1 创建 kafka 数据源表

随机推荐

  • 华为OD机试 - 增强的strstr(Python)

    题目描述 C 语言有一个库函数 char strstr const char haystack const char needle 实现在字符串 haystack 中查找第一次出现字符串 needle 的位置 如果未找到则返回 null 现
  • 锐星服务器怎么上传文件,协议转换器仪表远程配置方法专利_专利申请于2019-06-06_专利查询 - 天眼查...

    1 一种协议转换器仪表远程配置方法 其特征在于 包括以下步骤 步骤1 在平台端开发一个基于页面配置的配置程序 为指定的CAN仪表协议提供配置工具 输出配置文件 该配置文件是由版本信息 报文CAN ID配置语句 车载机使用数据项ID配置语句
  • 【干货】--手把手教你完成文本情感分类

    作者 刘顺祥 个人微信公众号 每天进步一点点2015 前言 2017年12月9日 参加了天善组织的线下沙龙活动 在沙龙中自己分享了如何借助于R语言完成情感分析的案例 考虑的其他网友没能够参与到活动现场 这里通过微信公众号作一个简单的分享 在
  • 【Angular中的HTTP请求】- JSONP 详解

    JSONP JSON with Padding 是JSON的一种 使用模式 可用于解决主流浏览器的跨域数据访问的问题 基于XMLHttpRequest的数据请求会受到同源策略限制 而 JSONP 以
  • 为什么离不开 Stackoverflow

    作为一名程序员 如果没有听过 Stackoverflow 那么你最好去面壁思过一下 程序员最需要阅读的一本编程书籍 其实编程书留下这本就够了 那些还没有读过这本书的程序员 是时候买一本了 如果还在犹豫 那么先看下这篇文章 看看为什么离不开
  • linux创建链接命令

    1 软链接 符号链接 1 软链接文件有类似于Windows的快捷方式 2 在符号连接中 文件实际上是一个文本文件 其中包含的有另一文件的位置信息 3 它只会在你选定的位置上生成一个文件的镜像 不会占用磁盘空间 linux创建链接软命令 具体
  • C语言调用C++函数

    前阵子被问及一个在C中如何调用C 函数的问题 当时简单回答是将函数用extern C 声明 当被问及如何将类内成员函数声明时 一时语塞 后来网上查了下 网上有一翻译C 之父的文章可以作为解答 遂拿来Mark一下 将 C 函数声明为 exte
  • JS 5种遍历对象的方式

    From https blog csdn net qq 53225741 article details 127073295 我根据阮老师的 ES6标准入门 学习并总结了七种遍历对象的方法 我会将分别介绍这七种方法并进行详细的区分 并将从属
  • Linux Ubuntu 能PING IP但不能PING主机域名的解决方法

    vi etc nsswitch conf hosts files dns networks files 改成 hosts files dns wins networks files 如果不一样的话 就在hosts 原来那行后面加个wins
  • Vue2转Vue3快速上手第一篇(共两篇)

    Vue3 v2 v3的学习成本不高 只要有v2基础 基本可以上手vue3 一 setup语法 setup中不能访问v2的配置比如data methods等 二 ref响应数据 使用ref可以创建一个对象 可以是基本类型 也可以是对象 例如
  • SpringBoot获取resources 目录下的文件的方式

    SpringBoot获取resources 目录下的文件的方式在Spring Boot项目中 读取resources目录下文件的方式是非常常见的操作 为了确保项目的稳定性和可靠性 我们需要采取一种高效的方法来获取这些文件 因此 在本文中 我
  • overloading与overriding的区别

    1 overloading 重载 1 方法重载是让类以一种统一的方式处理不同类型数据的手段 多个同名函数同时存在 具有不同参数个数 类型 重载是一个类中多态性的表现 2 java方法重载就是在同一个类中创建多个具有相同的方法名 但是参数类型
  • MAC M1安装VMware 安装windows11

    目录 前言 一 安装包列表 二 VMware安装Windows11过程 总结 前言 最近想着给自己的mac安装windows虚拟机 因为mac是m1芯片的 所以也是从网上找了很多资料 用PD安装了Windows11 在找资料的时候发现VM也
  • Hbuild X 下载以及插件安装

    1 下载 下载地址 https www dcloud io 2 进入Hbuilder 官方网站 3 下载HBuilder 点击下载按钮 Download for Windows 点击后会直接下载 也可以鼠标移动到 more 选择对应的版本点
  • VC使用ActiveX控件常见问题

    转自 http lingchuangsong blog 163 com blog static 126932322008631104133309 一方面 它表示将你联系到Microsoft Internet和业界的新技术的小型快速的可重用组
  • 大数据应用——Hadoop运行模式(本地运行)

    Hadoop运行模式包括 本地模式 伪分布式模式以及完全分布式模式 Hadoop官方网站 http hadoop apache org 4 1本地运行模式 4 1 1 官方Grep案例 1 创建在hadoop 2 7 1文件下面创建一个in
  • pycharm注释、查看函数用法快捷键

    单行或多行注释 选中代码 ctrl 查询函数用法 ctrl 鼠标左击函数名 便可以直接进入原文件查看此函数的定义 自动填充空格 ctrl alt L 将光标置于需要调整的代码行 或者选择一个区域 按下快捷键后 代码会自动填充空格 自动对齐代
  • matlab figure函数的用法

    https blog csdn net qq 30387863 article details 80301996
  • (三) 区块链数据结构 – 交易

    区块由交易组成 区块体中包含若干项交易数据 交易 交易主要包含两类数据 交易输入和交易输出 交易输入用来指明钱的来源 交易输出用来指明钱的去向 除了交易输入和交易输出外 交易中还包含版本号和锁定时间 交易数据的存储结构如下 交易对象中的各个
  • Flink State 和 Fault Tolerance详解

    有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态 这就使得状态在整个Flink的精细化计算中有着非常重要的地位 记录数据从某一个过去时间点到当前时间的状态信息 以每分钟 小时 天汇总事件时 状态将保