【Flink】处理函数Process

2023-11-17

整体框架:
在这里插入图片描述

处理函数

基本处理函数 ProcessFunction

处理函数的功能

ProcessFunction:能拿到别的API拿不到的东西。

处理函数提供了一个定时服务TimerService,可以用它访问流中的事件event、时间戳timestamp,水位线watermark,注册定时事件。

处理函数继承自AbstractRichFunction,拥有富函数类的特性,可以访问状态state和其他运行时信息。
处理函数可以直接将数据输出到侧输出流side output中。

处理函数是DataStream API的底层逻辑。

ProcessFunction解析

ProcessFunction类中:
1. processElement(输入类型,上下文,输入类型Collector)
	上下文中能获取到:时间戳、侧输出流、"timerService"
	timerService中能获取到:currentProcessingTime处理时间、currentWatermark事件时间、registerProcessingTimeTimer注册定时器、删除定时器。
	
2. onTimer()
	注册了定时器后,到点时会触发这个回调,这是定时到了后的处理方法。
	但是,只有基于KeyedStream才能定义定时器。

处理函数的分类

1.	ProcessFunction
2.	KeyedProcessFunction----"重要"
3.	ProcessWindowFunction
4.	ProcessAllWindowFunction
5.	CoProcessFunction
6.	ProcessJoinFunction
7.	BroadcastProcessFunction
8.	KeyedBroadcastProcessFunction

按键分区处理函数 KeyedProcessFunction

stream.keyBy()
	.process(new MyKeyedProcessFunction())

定时器Timer 和定时服务 TimerService

处理时间:

stream.keyBy(data->data.user)
	.process(new KeyedProcessFunction<String, Event, String>(){
		@Override
		public void processElement(Event value, Context ctx, Collector<String> out) throws Exception(){
			Long currTs = ctx.timerService().currentProcessingTime();
			out.collect(ctx.getCurrentKey() + " 数据到达,到达时间:" + new Timestamp(currTs));
			// 注册一个10s后的定时器
			ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
		}
		
		@Override
		public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception() {
			out.collect(ctx.getCurrentKey() + " 定时器触发,触发时间:"+new Timestamp(timestamp));
		}
	
	})

事件时间:

stream.keyBy(data->data.user)
	.process(new KeyedProcessFunction<String, Event, String>(){
		@Override
		public void processElement(Event value, Context ctx, Collector<String> out) throws Exception(){
			Long currTs = ctx.timestamp();
			out.collect(ctx.getCurrentKey() + " 数据到达,时间戳:" + new Timestamp(currTs) + " watermark:"+ ctx.timerService().currentWatermark());
			// 注册一个10s后的定时器
			ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
		}
		
		@Override
		public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception() {
			out.collect(ctx.getCurrentKey() + " 定时器触发,触发时间:"+new Timestamp(timestamp)  + " watermark:"+ ctx.timerService().currentWatermark());
		}
	
	})

窗口处理函数

窗口处理函数的使用
ProcessWindowFunction解析

应用案例 TopN

要求:统计每隔10s的最受欢迎的URL的前两名,每隔5s更新一次结果。

使用ProcessAllWindowFunction
该方法数据量大的时候,把所有数据放在一个窗口里,不靠谱。

使用KeyedProcessFunction

// 1.按照URL分组,统计窗口内每个URL的访问量
SingleOutputStreamOperator<UrlViewCount> urlCountStream = 
	stream
	.keyBy(data -> data.url)
	.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
	.aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.xxxx());

// 2.对于同一窗口统计出的访问量,进行收集和排序
urlCountStream.keyBy(data -> data.windowEnd) // 窗口关闭时钟
		.process(new TopNProcessResult(2))
		.print();

// 实现TopNProcessResult
public static class TopNProcessResult extends KeyedProcessFunction<Long, UrlViewCount, String>{
	// 定义属性N
	private int n;
	// 定义列表状态
	private ListState<UrlViewCount> urlViewCountListState;
	
	public TopNProcessResult(int n){
		this.n = n;
	}
	
	// 在运行环境中获取状态
	@Override
	public void open(Configuration parameters) throws Exception{
		urlViewCountListState = getRuntimeContext().getListState(
			new ListStateDescriptor<UrlViewCount>("url-count-list", Types.POJO(UrlViewCount.class))	;
		);
	}
	

	@Override
	public void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception{
		// 来了数据后,将数据保存到状态中
		urlViewCountListState.add(value);
		// 注册windowEnd + 1ms 的定时器
		ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);	
	}
	@Override
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception{
		// 从状态中获取数据
		ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList();
		for(UrlViewCount u : urlViewCountListState.get()){
			urlViewCountArrayList.add(u);
		}
		
		// 排序
		urlViewCountArrayList.sort(
			new Comparator<UrlViewCount>(){
				@Override
				public int compare(UrlViewCount o1, UrlViewCount o2){
					return o2.count.intValue() - o1.count.intValue();
				}
			}
		);
		
		// 包装信息打印输出
		StringBuilder result = new StringBuilder();
		result.append("窗口结束时间:" + new Timestamp(ctx.getCurrentKey()));

		// 取前两个
		for(int i=0; i < 2; i++){
			UrlViewCount currTuple = urlViewCountArrayList.get(i);
			String info = "No. "+(i+1)+" "
				+ "url: "+ currTuple.url + " "
				+ "访问量: "+ currTuple.count + "\n";
			result.append(info);
		}

		out.collect(result.toString());
	}
}

侧输出流

可以用来做分流操作

// 定义侧输出流标签
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

public void processElement(){
	// 转换成Long, 输出到主流中
	out.collect(Long.valueof(value));

	// 转换成String, 输出到侧输出流中
	ctx.output(outputTag, String.valueof(value));
}

// 获得侧输出流
DataStream<String> stringStream = longStream.getSideOutput(outputTag);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Flink】处理函数Process 的相关文章

随机推荐

  • Obsidian Tasks插件介绍

    背景 按照之前对 DataView 插件的介绍 对于任务列表的使用其实就可以使用其中的 list 插件完成的 但是 DataView 插件只能完成列表的查询功能 而查询功能只能是任务列表中其中一个功能 因此就使用 DataVIew 插件是不
  • java集合List

    Java集合概况 Java集合一直理解的都是片面的 整理一下 将知识组织成面 更便于理解 上图来自Java 集合系列01之 总体框架 如果天空不死 博客园 虽然博主是基于java1 6整理的 但也不碍于我们学习 理解了上图 对于学习java
  • 排序遍历带前缀的文件名

    排序遍历带前缀的文件名 def getTimeId file fileAttrs file split fileTime fileAttrs 0 return fileTime def CleanUpExpireTar backupDir
  • 树莓派4B安装Batocera V35版本 前言

    前言 说说为什么要写这个 原因比较多 我在大学的时候买了树莓派4B 那时正价购买 刷了个OpenWrt系统在宿舍当软路由再跑 毕业之后买了个X86主机当软路由 树莓派就放着吃灰了 大学时期同样买了个北通的游戏手柄 当初为了玩崩坏3购入的 后
  • DDR2 DDR3的区别

    DDR2 DDR3的区别 功耗进一步减少 DDR2内存的默认电压为1 8V 而DDR3内存的默认电压只有1 5V 因此内存的功耗更小 发热量也相应地会减少 值得一提的是 DDR3内存还新增了温度监控 采用了ASR Automatic sel
  • Unity5.x运行场景直接卡死的问题

    今天遇到一个很奇葩的问题 就是电脑中Unity5 x版本的都不能运行场景了 包括新建空的工程也不行 重装unity软件重装VS环境也不行 神奇的是2017 2018 2019版本的都没有问题 并且界面也没有任何报错 爬各种论坛谷歌都没有找到
  • 山海演武传·黄道·第一卷 雏龙惊蛰 第二十二 ~ 二十四章 真龙之剑·星墟列将...

    我是第一次 请你 请你温柔一点 少女一边娇喘着 一边将稚嫩的红唇紧贴在男子耳边 樱桃小嘴盈溢出如兰香气 这样子 人家骑在上面 她紧紧地依偎在某个男子身上 窈窕的身躯与丰盈的酥胸 伴随着男子身体晃动而滑上滑下 起伏不定 啊 不要晃的那样厉害
  • ios appStore上架审核通过后,appStore搜索不到该应用

    问题描述 前两天上架一款ios App 周一到公司看审核已经通过了 去appStore上搜索一直搜索不到 ios appStore connect点击提示该商品在中国大陆没有上架 解决方法 通过app store connect 最下面的联
  • vue3.0 + element Plus实现页面中引入弹窗组件及defineExpose用法

    1 在需要弹窗显示的页面引入你所写的弹窗组件 index html
  • AI革命:AI+算力,霸主即将诞生!

    随着人工智能技术的飞速发展 AI 算力 的结合应用已成为科技行业的热点话题 甚至诞生出 AI 算力 最强龙头 的网络热门等式 该组合不仅可以提高计算效率 还可以为各行各业带来更强大的数据处理和分析能力 从而推动创新和增长 那么对于这个时下的
  • 二维多孔介质图像的粒度分布研究(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码实现 1 概述 使用流域分割算法对岩石二维二值图像进行粒度
  • Scala深入浅出——从Java到Scala

    本文适合有一定Java基础的 并想系统学习Scala的小伙伴借鉴学习 文章有大量实例 建议自己跑一遍 Scala深入浅出 从Java到Scala Scala 一 介绍 1 什么是Scala 2 特点 3 安装 二 Scala特点 三 sca
  • SecureCRT9.1高亮配色设置

    参考 http zh cjh com qita 1623 html https download csdn net download qq 45698138 88310255 spm 1001 2014 3001 5503 1 创建文件co
  • fork的例子

    以下是下列代码的头文件 forks c Examples of Unix process control include
  • Ruoyi-cloud集成Sa-Token SSO单点登录

    文章目录 服务端 客户端前端 客户端后端 https github com dromara Sa Token Sa Token SSO 模式三 修改本地hosts 127 0 0 1 sa sso server com 127 0 0 1
  • ionic3代码压缩和apk优化

    我们在做ionic打包的时候 通常执行这条命令 ionic cordova build android release prod 使用这个命令生成的apk是ionic项目导出的最优化的apk 但是如果还想继续压缩 那么还可以借助Androi
  • Unity 空气墙Shader

    废话不多说 先上效果图 具体代码如下 Shader Hidden AirWall Properties Color Color Color 1 1 1 1 颜色 Interval Interval float 10 间隔 SubShader
  • springmvc注解和参数传递

    一 SpringMVC注解入门 创建web项目 在springmvc的配置文件中指定注解驱动 配置扫描器 Xml代码 收藏代码
  • FFmpeg 实战指南

    文章目录 表达式 滤镜效果 zoompan 中心视距由远及近 中心视距由近及远 水平视距从左到右 水平视距从右到左 垂直视距从上到下 垂直视距从下到上 rotate 顺时针旋转 PI 6 弧度 逆时针旋转 PI 6 弧度 顺时针旋转 45
  • 【Flink】处理函数Process

    目录 处理函数 基本处理函数 ProcessFunction 处理函数的功能 ProcessFunction解析 处理函数的分类 按键分区处理函数 KeyedProcessFunction 定时器Timer 和定时服务 TimerServi