Flink_06_ProcessAPI(个人总结)

2023-11-20

    声明: 1. 本文为我的个人复习总结, 并那种从零基础开始普及知识 内容详细全面, 言辞官方的文章
              2. 由于是个人总结, 所以用最精简的话语来写文章
              3. 若有错误不当之处, 请指出

侧输出流(SideOutput)

即分支流, 可以用来接收迟到数据, 也可以用来将数据分类成多个支流

对于滑动窗口, 有很多窗口重叠, 当迟到数据被所有窗口都不接收时, 它才会进入侧输出流

只有Process这种最底层的API, 才能通过环境上下文去使用侧输出流

案例: 将温度值低于30度的数据输出到 SideOutput

// 定义侧输出流标签, 注意得是其匿名实现类

final OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp") { };

SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>( ) {
    @Override
    public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) {
        if (value.getTemperature( ) < 30) {
            ctx.output(lowTempTag, value);
        } else {
            out.collect(value);
        }
    }
});
DataStream<SensorReading> lowTempStream = highTempStream.getSideOutput(lowTempTag);
highTempStream.print("high");
lowTempStream.print("low");

8种ProcessAPI:

  1. ProcessFunction

  2. KeyedProcessFunction

    得先keyBy,

    会处理流的每一个元素, 以out.collect(xxx)的方式输出任意多个元素

    • .processElement(SensorReading value, Context ctx, Collector<O> out)

      ctx 可以

      1. 访问元素的时间戳

      2. 访问元素的key

      3. 将数据输出到侧输出流

      4. 访问TimerService(ctx.timerService( ))

        TimerService:

        方法:

        1. EventTime相关
          • long currentWatermark( ) 返回当前数据的事件时间
          • void registerEventTimeTimer(long timestamp) 注册当前key的定时器
          • void deleteEventTimeTimer(long timestamp) 删除定时器, 如果没有则不执行
        2. ProcessingTime相关
          • long currentProcessingTime( ) 返回当前数据的处理时间
          • void registerProcessingTimeTimer(long timestamp) 注册当前key的定时器
          • void deleteProcessingTimeTimer(long timestamp) 删除定时器, 如果没有则不执行
        • 当定时器Timer触发后, 会执行回调函数onTimer( )

        • timestamp 是定时器所设定的触发运行的时间戳, 如果注册一个已经过期的时间, 那么当再次输入数据时 它才会触发定时器

        • 若注册窗口关闭时启动的定时器, 最好在WindowEndTime的基础上延迟1s;

          因为到了临界点, 既要触发窗口计算, 又要触发定时器;

          定时器任务又依赖于先窗口计算完毕, 所以给个1s的延迟较好

        案例需求: 如果温度值在10秒钟之内(ProcessingTime)连续上升, 则报警

        public class TempIncreaseWarning extends KeyedProcessFunction<String, SensorReading, String> {
            private Integer interval;
        
            public TempIncreaseWarning(Integer interval) {
                this.interval = interval;
            }
        
            // 记录 上一次温度
            private ValueState<Double> lastTempState;
            // 记录 定时器触发时间
            private ValueState<Long> timerTsState;
        
            @Override
            public void open(Configuration parameters) throws Exception {
                lastTempState = getRuntimeContext( ).getState(new ValueStateDescriptor<Double>("last-temp", Double.class, Double.MIN_VALUE));
                timerTsState = getRuntimeContext( ).getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
            }
        
        
            @Override
            public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
                // 取出状态
                Double lastTemp = lastTempState.value( );
                Long timerTs = timerTsState.value( );
        
                // 每当温度上升时 && 暂无定时器
                if (value.getTemperature( ) > lastTemp && timerTs == null) {
                    long ts = ctx.timerService( ).currentProcessingTime( ) + interval * 1000L;
                    // 注册定时器
                    ctx.timerService( ).registerProcessingTimeTimer(ts);
                    // 为了后续删除定时器能找到注册时间戳
                    timerTsState.update(ts);
                }
                // 每当温度下降时 && 定时器不为空
                else if (value.getTemperature( ) <= lastTemp && timerTs != null) {
                    // 清除定时器,注意不能用ts,我们要找的是注册定时器的那个时间戳才对
                    ctx.timerService( ).deleteProcessingTimeTimer(timerTs);
                    timerTsState.clear( );
                }
                // 更新温度状态
                lastTempState.update(value.getTemperature( ));
            }
        
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
                out.collect("传感器" + ctx.getCurrentKey( ) + "的温度连续" + interval + "秒上升");
                timerTsState.clear( );
            }
        }
        
  3. CoProcessFunction

    connect后的流再.process

    有processElement1( ) 和 processElement2( )

  4. ProcessJoinFunction

  5. BroadcastProcessFunction

    A流有1个分区, B流有4个分区, B流要用到A流的数据, 所以需要将A流1个分区的数据广播到B流的4个分区

    广播后再进行process处理

  6. KeyedBroadcastProcessFunction

  7. ProcessWindowFunction

    如 .aggregate(AggregateFunction<IN, ACC, OUT>aggFunction,ProcessWindowFunction<IN, OUT, KEY, W> windowFunction)

  8. ProcessAllWindowFunction

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink_06_ProcessAPI(个人总结) 的相关文章

  • 数据湖--概念、特征、架构与案例概述

    一 什么是数据湖 数据湖是目前比较热的一个概念 许多企业都在构建或者计划构建自己的数据湖 但是在计划构建数据湖之前 搞清楚什么是数据湖 明确一个数据湖项目的基本组成 进而设计数据湖的基本架构 对于数据湖的构建至关重要 关于什么是数据湖 有如

随机推荐

  • 上传图片到七牛云

    public JSONObject uploadImgToQiniu RequestParam MultipartFile file HttpServletResponse response HttpServletRequest reque
  • 论文笔记: Masked Autoencoders Are Scalable Vision Learners

    1 整体思路 效仿BERT中MLM的思路 随机mask掉输入图像的部分patch 并重建这些被mask掉的patch 机器学习笔记 ELMO BERT UQI LIUWJ的博客 CSDN博客 模型结构是一个非对称的encoder decod
  • 生命在于折腾——SQL注入的实操(五)less21-25

    一 实操环境 1 操作系统 VMware虚拟机创建的win10系统 内存8GB 硬盘255GB 处理器AMD Ryzen 9 5900HX 2 操作项目 sql lib项目 本篇文章介绍关卡21 25 3 工具版本 phpstudy 8 1
  • 百度翻译接口获取过程

    记百度翻译接口获取过程 coding utf 8 usr bin env python 思路 进入到百度翻译 https fanyi baidu com 首先要找到返回数据的接口 打开f12 输入你要翻译的内容后能看到很多请求如图所示 进入
  • Qt播放音乐报错DirectShowPlayerService::doSetUrlSource: Unresolved error code 0x80070002 ()

    需求 在Qt中播放背景音乐 代码片段如下 1 pro添加组件 QT multimedia 2 使用 QMediaPlayer 对象实现播放音乐 循环播放背景音乐 void ClearApp playBG QMediaPlayer playe
  • Qt自定义图片按钮并设置方向

    Qt自定义图片按钮 设置方向 虽然Qt定义了很多很多控件 但是还是不能满足用户的需要 比如如果想使用ToolButton 需要带文字 又需要文字可以设定位置 显然就不行了 下面的代码就是一个简单的实现ToolButton功能 并且能够设置图
  • 【模拟电路】三极管做开关,各个电阻的作用

    下面介绍用NPN做开关来驱动蜂鸣器的用法 对各个电阻的用法的解释 图一 这个比较简单 R20是限流作用 R21也是限流作用 图二 相同的地方就不说了 不同的地方是在基极和发射极之间加了一个电阻 这个电阻主要有两个作用 作用一 防止三极管因为
  • 神经网络中FLOPs和MACs的计算(基于thop和fvcore.nn)

    以 输入为 1 1 200 3 的张量 卷积取 nn Conv2d 1 64 kernel size 8 1 stride 2 1 padding 0 0 为例 先计算输出的形状 公式为 H上为 200 0 8 2 1 97 W上依然是3
  • Python全栈开发【基础-03】编程语言的分类

    专栏介绍 本专栏为Python全栈开发系列文章 技术包括Python基础 函数 文件 面向对象 网络编程 并发编程 MySQL数据库 HTML JavaScript CSS JQuery bootstrap WSGI Django Flas
  • 阿里巴巴都害怕的区块链电商到底是什么?

    近日 区块链权威机构中国通信工业协会区块链专业委员会 CCIAPCB 发出倡议 联合各界将中共中央政治局10月24日集体学习区块链主席讲话日作为 区块链中国日 此次中央将区块链技术放在了国家战略层面高度上 让区块链一时间成了全民热点 特别是
  • 【python数据挖掘课程】二十七.基于SVM分类器的红酒数据分析

    这是 Python数据挖掘课程 系列文章 前面很多文章都讲解了分类 聚类算法 这篇文章主要讲解SVM分类算法 同时讲解如何读取TXT文件数据并进行数据分析及评价的过程 文章比较基础 希望对你有所帮助 提供些思路 也是自己教学的内容 推荐大家
  • TS装饰器

    一 定义 装饰器本质是一种函数 通过添加标注的方式 对数据 类 方法 属性 参数等 的功能进行增加或者修改 二 使用 准备工作 ts config json文件中 1 基础使用 装饰器名字 例子 function test target a
  • 《塞尔达传说:旷野之息》中设计元素的分析

    塞尔达传说 旷野之息 中设计元素的分析 0 写在前面 关于 塞尔达传说 旷野之息 是否属于中型游戏 检索许多资料后 有一种通识是 塞尔达传说 旷野之息 不属于3A级别游戏 显然也不属于小型游戏 因此我姑且认为它属于中型游戏 这也符合此篇的初
  • crypto-js md5加密和解密

    直接上代码 import CryptoJS from crypto js const encodeFactor zq87dopenf67eg 加密 export function encrypt txt var key CryptoJS e
  • 服务攻防-中间件安全&CVE复现&IIS&Apache&Tomcat&Nginx漏洞复现

    目录 一 导图 二 ISS漏洞 中间件介绍 gt 1 短文件 2 文件解析 3 HTTP SYS 4 cve 2017 7269 三 Nignx漏洞 中间件介绍 gt 1 后缀解析漏洞 2 cve 2013 4547 3 cve 2021
  • openstack平台搭建笔记(容器云)

    openstack平台搭建笔记 容器云 一 根据要求准备好配置环境 节点IP 角色 备注 192 168 100 30 Master Kubernetes 集群 master 节点 Harbor 仓库节点 192 168 100 31 Wo
  • C# 快速写入日志 不卡线程 生产者 消费者模式

    有这样一种场景需求 就是某个方法 对耗时要求很高 但是又要记录日志到数据库便于分析 由于访问数据库基本都要几十毫秒 可在方法里写入BlockingCollection 由另外的线程写入数据库 可以看到 在我的机子上面 1ms写入了43条日志
  • html5 自动化测试工具,五大最佳自动化测试工具

    对更快交付高质量软件 或 快速质量 的需求要求组织以敏捷 持续集成 CI 和DevOps方法论来寻找解决方案 测试自动化是这些方面的重要组成部分 最新的 2018 2019年世界质量报告 表明 测试自动化是实现 快速质量 的最大瓶颈 因为它
  • 四位数显表头设计

    去年帮别人定制了一个四位数显小表头 可以用于测量4 20mA或者0 5V 0 10V输出的的各种传感器 可设置显示范围 上下限报警灯 由于后面更改方案 此方案暂时搁置不用 今天来分享一下软硬件的设计过程 1 硬件设计 1 1电源 电源采用一
  • Flink_06_ProcessAPI(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 侧输出流 SideOutput 即分支流 可以用来接收迟到数据 也可