flink state ttl 清理逻辑(截止到flink1.8之前的逻辑)

2023-11-13

在我们开发Flink应用时,许多有状态流应用程序的一个常见要求是自动清理应用程序状态以有效管理状态大小,或控制应用程序状态的访问时间。 TTL(Time To Live)功能在Flink 1.6.0中开始启动,并在Apache Flink中启用了应用程序状态清理和高效的状态大小管理。

在这篇文章中,我们将讨论状态(State)的TTL并且给出用例。 此外,我们将展示如何使用和配置状态的TTL。

状态的暂时性

State只能在有限的时间内维持有两个主要原因。例如,假设一个Flink应用程序为每个用户提取用户登录事件并且存储每个用户的上次登录时间实现下次免登陆来提升用户体验。

控制状态的大小

控制状态的大小,能够有效地管理不断增长的State的规模,这个TTL应用的主要场景。通常来说,数据需要暂时保留,例如用户处在一次访问的session中。当用户访问的事件结束后,我们就没有必要保存该用户的状态,但是用户的State仍占用存储空间。 Flink1.8.0引入了基于TTL的对于过期状态的清理,让我们能够对这些无效数据进行清除。在此以前,开发人员必须采取额外操作来删除无用状态以释放存储空间。这种手动清理程序不仅容易出错,而且效率低下。根据我们上述用户登录的案例,我们不再需要手动去清理。

基于对数据的保密需要

假设我们有对数据的时效性的要求,例如用户在某个时间段内不允许访问。我们都可以通过TTL功能来实现。

对应用状态的持续清理(Continuous Cleanup)

Apache Flink的1.6.0版本引入了State TTL功能。它使流处理应用程序的开发人员配置过期时间,并在定义时间超时(Time to Live)之后进行清理。在Flink 1.8.0中,该功能得到了扩展,包括对RocksDB和堆状态后端(FSStateBackend和MemoryStateBackend)的历史数据进行持续清理,从而实现旧条目的连续清理过程(根据TTL设置)。

在Flink的DataStream API中,应用程序状态由状态描述符(State Descriptor)定义。通过将StateTtlConfiguration对象传递给状态描述符来配置状态TTL。 以下Java示例演示如何创建状态TTL配置并将其提供给状态描述符,该状态描述符将上述案例中的用户上次登录时间保存为Long值:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.state.ValueStateDescriptor;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<Long> lastUserLogin = 
    new ValueStateDescriptor<>("lastUserLogin", Long.class);

lastUserLogin.enableTimeToLive(ttlConfig);

Flink提供了多个选项来配置TTL功能的行为。

什么时候重置?

默认情况下,当数据的状态修改会更新数据的TTL时间。我们还还可以在读取访问数据时对它进行更新,这样做的代价是会出现额外的写入操作以更新时间戳的操作。

已经过期的数据是否可以访问?

State TTL采用惰性策略来清理过期状态。这可能导致我们的应用程序会去尝试读取已过期但处于尚未删除状态的数据。我们可以观察此类读取请求是否返回了过期状态。无论哪种情况,数据被访问后会立即清除过期状态。

哪个时间语义被用于定义TTL?

使用Flink 1.8.0,用户只能根据处理时间(Processing Time)定义状态TTL。未来的Apache Flink版本中计划支持事件时间(Event Time)。

Flink内部,状态TTL功能是通过存储上次相关状态访问的附加时间戳以及实际状态值来实现的。虽然这种方法增加了一些存储开销,但它允许Flink程序在查询数据、checkpointing,数据恢复的时候访问数据的过期状态。

如何避免取出'垃圾数据'

在读取操作中访问状态对象时,Flink将检查其时间戳并清除状态是否已过期(取决于配置的状态可见性,是否返回过期状态)。由于这种延迟删除的特性,永远不会再次访问的过期状态数据将永远占用存储空间,除非被垃圾回收。

那么如何在没有应用程序逻辑明确的处理它的情况下删除过期的状态呢?通常,我们可以配置不同的策略进行后台删除。

完整快照自动删除过期状态

当获取检查点或保存点的完整快照时,Flink 1.6.0已经支持自动删除过期状态。大家注意,过期状态删除不适用于增量检查点。必须明确启用完全快照的状态删除,如以下示例所示:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .cleanupFullSnapshot()
    .build();

上述代码会导致本地状态存储大小保持不变,但Flink任务的完整快照的大小减小。只有当用户从快照重新加载其状态到本地时,才会清除用户的本地状态。

由于上述这些限制,FLink应用程序仍需要在Flink 1.6.0中过期后主动删除状态。为了改善用户体验,Flink1.8.0引入了两种自主清理策略,分别针对Flink的两种状态后端类型。

堆状态后端的增量清理

此方法特定于堆状态后端(FSStateBackend和MemoryStateBackend)。它的实现方法是存储后端在所有状态条目上维护一个惰性全局迭代器。某些事件(例如状态访问)会触发增量清理。每次触发增量清理时,迭代器都会向前迭代删除已遍历的过期数据。以下代码示例演示如何启用增量清理:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    // check 10 keys for every state access
    .cleanupIncrementally(10, false)
    .build();

如果启用,则每次进行状态访问都会触发清理步骤。对于每个清理步骤,都会检查一定数量的数据是否过期。

有两个参数:第一个参数是检查每个清理步骤的状态条目数。第二个参数是一个标志,用于数据处理后触发清理步骤,此外对于每次状态访问同样有效。

关于这种方法有两点需要注意:第一个是增量清理所花费的时间增加了数据处理延迟。第二个应该可以忽略不计,但仍然值得一提:如果没有状态访问或没有数据处理记录,则不会删除过期状态。

RocksDB后台压缩可以过滤掉过期状态

如果你的Flink应用程序使用RocksDB作为状态后端存储,则可以启用另一个基于Flink特定压缩过滤器的清理策略。RocksDB定期运行异步压缩以合并状态更新并减少存储。Flink压缩过滤器使用TTL检查状态条目的到期时间戳,并丢弃所有过期值。

激活此功能的第一步是通过设置以下Flink配置选项来配置RocksDB状态后端:

state.backend.rocksdb.ttl.compaction.filter.enabled

配置RocksDB状态后端后,将为状态启用压缩清理策略,如以下代码示例所示:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .cleanupInRocksdbCompactFilter()
    .build();

使用定时器删除(Timers)

手动清除状态的另一种方法是基于Flink定时器。这是社区目前正在评估未来版本的想法。通过这种方法,为每个状态访问注册清理定时器。这种方法更容易预测,因为状态一旦到期就会被删除。但是,这种方法代价很大,因为定时器消耗存储资源,并且会频繁读取状态信息。

未来展望

除了上面提到的基于计时器的清理策略外,Flink社区还计划进一步改进状态TTL功能。可能的改进点包括为事件时间(Event Time)添加TTL支持(目前仅支持Processing Time)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink state ttl 清理逻辑(截止到flink1.8之前的逻辑) 的相关文章

随机推荐

  • soap development issue

    description No Deserializer found to deserialize a xxx using encoding style yyy reason the requesting envelope xml doesn
  • Flutter 状态栏图标颜色方案

    方案一 使用 AppBar 配置 文章目录 方案一 使用 AppBar 配置 方案二 通过 AnnotatedRegion 控制 注意点 在 AppBar 中配置属性 brightness 其取值 Brightness dark AppBa
  • python如何输出一个数组_python中实现将多个print输出合成一个数组

    python中实现将多个print输出合成一个数组 比如有下面一段代码 for i in range 10 print s f list i name 该代码段的执行 会生成如下的10行 name 属性的字符串 f1 f2 f3 f4 f5
  • 根据请求动态设置 @Value 注入的属性值

    先说一下可以使用的场景 项目中有一些功能类使用了 Value修饰 这种属性取值通常要么是读取 yml 的配置文件 要么是读取配置中心 在我们在本地调试的时候Controller时 如果 如果Service层用到了 Value修饰 的属性时
  • JMeter:使用Docker进行分布式负载测试

    概述 单个的JMeter实例可能无法生成足够的负载来对应用程序进行压力测试 如本网站所示 一个JMeter实例将能够控制多个远程JMeter实例 并在你的应用程序上产生更大的负载 JMeter使用Java RMI Remote Method
  • Angular6 学习笔记——指令

    angular6 x系列的学习笔记记录 仍在不断完善中 学习地址 https www angular cn guide template syntax http www ngfans net topic 12 post 2 系列目录 1 组
  • 骰子【概率dp】

    题目链接 P1409 骰子 因为会有人被弹出队列 所以我设置的期望dp为 表示当现在队列中有i个人的时候 第j个人获胜的概率 于是有当只剩一个人的时候 那个人必胜 再往下 先看它在队首的情况 也就是直接获胜的概率加上它被弹到队尾时候的概率
  • IntelliJ IDEA 2023.2 新版本,拥抱 AI

    IntelliJ IDEA 近期连续发布多个EAP版本 官方在对用户体验不断优化的同时 也新增了一些不错的功能 尤其是人工智能助手补充 AI Assistant 相信在后续IDEA使用中 会对开发者工作效率带来不错的提升 以下是官方对AI
  • LeetCode:动态规划中的0-1背包问题【快来直接套模板啦】

    PS 0 1背包问题无疑是动态规划题目里面的非常经典的一类题目了 下面给出这类题目的一种解题模板 本文是参考代码随想录做的一些笔记 完整版本请戳链接 标准0 1背包问题 二维数组求解 标准的背包问题 有n件物品和一个最多能背重量为w的背包
  • 106 letcode - 重建二叉树

    class Solution 内存条里 有两个区域 堆和栈 其中 栈是我们函数跳转的关键 顺序是先进后出 通过压栈出栈 可以实现递归 1 当到达递归终止条件时候 则开始返回 例如 先序遍历二叉树中 每个节点都要执行三个操作 根 左 右 当对
  • Java基于 SpringBoot 的车辆充电桩系统

    博主介绍 程序员徐师兄 7年大厂程序员经历 全网粉丝30W Csdn博客专家 掘金 华为云 阿里云 InfoQ等平台优质作者 专注于Java技术领域和毕业项目实战 文章目录 1 效果演示 效果图 技术栈 2 前言介绍 完整源码请私聊 3 主
  • 关于今年五一调休。。

    作者主页 爱笑的男孩 的博客 CSDN博客 深度学习 YOLO 活动领域博主爱笑的男孩 擅长深度学习 YOLO 活动 等方面的知识 爱笑的男孩 关注算法 python 计算机视觉 图像处理 深度学习 pytorch 神经网络 opencv领
  • unity 渲染帧率优化-OnDemandRendering

    FixedUpdate更新速率设置 OnDemandRendering 相关的API 1 OnDemandRendering renderFrameInterval 3 解释说明 在一些静态UI的时候把OnDemandRendering r
  • Message": "请求的资源不支持 http 方法“GET”

    今天用postman测试后端api 总是报错 下面是问题解决方案 一 测试方法 public ApiResult Get int id ApiResult result new ApiResult result data 我是Get方法返回
  • Java调用jython

    Java调用jython 因为工作需要 需要在Java Jvm 进程内调用Python脚本 下了Jython练练手 脚本语言看着真别扭啊 若干年前写自动化测试工具时也用过python一小阵子 但基本忘光光了 好了 直奔主题 前提 1 sun
  • Linux如何给服务器增加白名单

    1 查看系统白名单配置 iptables L n 2 增加白名单 19 40 145 140 是需要增加的服务器IP iptables I INPUT s 19 40 145 140 32 p tcp j ACCEPT 注 I I是i的大写
  • oracle 函数使用方法----replace函数

    例 sql语句如下 select from cen sys TB DIC JDLX t 查询结果如下 需求 需要获取字段 PID 的值并 新增一个字段 PNAME PNAME的值为字段PID去掉 市平台前置机 剩下的字段 实现 select
  • 后端返回parentId,前端处理成children嵌套数据

    rouyi 的 vuetree函数结合elementui el table组件使用 把有parentId和id结构的数据处理成children嵌套数据 字段名称不一致 可以设置 vuetree函数 构造树型结构数据 param data 数
  • html 调用ActiveX

    html网页调用ActiveX控件时 要获取到ActiveX的ClassID 这个ClassID是注册到系统里的 而不是工程中的uuid 下图为uuid 正确的是在注册表的HKEY CLASSES ROOT中查找你的工程名的 项 找到后 其
  • flink state ttl 清理逻辑(截止到flink1.8之前的逻辑)

    在我们开发Flink应用时 许多有状态流应用程序的一个常见要求是自动清理应用程序状态以有效管理状态大小 或控制应用程序状态的访问时间 TTL Time To Live 功能在Flink 1 6 0中开始启动 并在Apache Flink中启