Flink之IntervalJoin介绍

2023-11-13

InterValJoin算子
间隔流,一条流去join另一条流去过去一段时间内的数据,该算子将keyedStream与keyedStream转化为DataStream;再给定的时间边界内(默认包含边界),相当于一个窗口,按指定的key对俩个KeyedStream进行Join操作,把符合join条件的俩个event拉倒一起,然后咋么处理右用户来决定。
1、key1 == key2 && e1.timestamp +lowerBound <= e2.timestamp +upperBound
2、场景:把一定时间范围内相关的分组数据拉成一个宽表

语法规则:

leftKeyedStream
.intervalJoin(rightKeyedStream)
//时间间隔,设定下界和上界
.between(Time.minutes(-10),Time.seconds(0))
//不包含下界
.lowerBoundExclusive()
//不包含上界
.upperBoundExclusive()
//自定义ProcessJoinFunction 处理join到的元组
.process(ProcessJoinFunction) 

该算子的注意事项:
1、俩条流都缓存在内部state中。leftElement到达,去获取State中rightElement响应时间范围内的数据,然后执行ProcessJoinFunciton进行Join操作;
2、时间间隔:leftElement默认和【leftElementEventTime + lowerBound,leftElementEventTime +upperBound】时间范围内的rightElement join;
3、举例:leftElementEventTime = 2019-11-16 17:30:00,lowerBound=-10minute,upperBound=0,则这条leftElement按Key和【2019-11-16 17:20:00,2019-11-16 17:30:00】时间范围内的rightElementJoin;
4、IntervalJoin目前只支持EventTime;
5、数据量比较大,可能使用RocksDBStateBackend

demo案列:

package Flink_API;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Properties;

public class TestInterViewJoin {

        public static void main(String[] args) throws Exception {
            //创建运行环境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink是以数据自带的时间戳字段为准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //设置并行度
            env.setParallelism(1);

            //1、获取第一个流,获取用户的浏览信息
            DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
            //2、获取用户的点击信息
            DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);

            //打印结果
            browseStream.print();
            clickStream.print();

            //核心:双流进行IntervalJoin操作:每个用户的点击信息Join这个用户最近10分钟内的浏览信息
            //browseStream(左流)关联clickStream(右流)
            KeyedStream<UserClickLog,String> userClickLogStringKeyedStream = clickStream.keyBy(new KeySelector<UserClickLog,String>(){

                @Override
                public String getKey(UserClickLog userClickLog) throws Exception {
                    return userClickLog.userID;
                }
            });
            KeyedStream<UserBrowseLog,String> userBrowseLogStringKeyedStream1=browseStream.keyBy(new KeySelector<UserBrowseLog,String>(){
                @Override
                public String getKey(UserBrowseLog userBrowseLog) throws Exception {
                    return userBrowseLog.userID;
                }
            });
            //每个用户的点击Join这个用户最近的10分钟内的浏览
            DataStream<String> processData = userClickLogStringKeyedStream.intervalJoin(userBrowseLogStringKeyedStream1)
                    .between(Time.minutes(-10),Time.seconds(0))//下界:10分钟,上界:当前EventTime时刻(左流去右流10分钟之前去找数据)
                    .process(new ProcessJoinFunction<UserClickLog, UserBrowseLog, String>() {
                        //leftElement到达,去获取State中rightElement响应范围内的数据,然后执行ProcessJoinFunction进行Join操作:
                        @Override
                        public void processElement(UserClickLog left, UserBrowseLog right, Context context, Collector<String> collector) throws Exception {
                            collector.collect(left+"<IntevalJoin>"+right);
                        }
                    });
            processData.print();

            //程序的入口类
            env.execute("TestInterViewJoin");

        }

        private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env) {
            Properties consumerProperties = new Properties();
            consumerProperties.setProperty("bootstrap.severs","page01:9002");
            consumerProperties.setProperty("grop.id","browsegroup");

            DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

            DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>() {
                @Override
                public void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception {
                    try{
                        UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserClickLog.class);
                        if(browseLog !=null){
                            collector.collect(browseLog);
                        }
                    }catch(Exception e){
                        System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                    }
                }
            });
            //设置watermark
            return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0)){
                @Override
                public long extractTimestamp(UserClickLog userBrowseLog) {
                    DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                    DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                    //用数字表示时间戳,单位是ms,13位
                    return dateTime.getMillis();
                }
            });
        }

        private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) {
            Properties consumerProperties = new Properties();
            consumerProperties.setProperty("bootstrap.severs","page01:9001");
            consumerProperties.setProperty("grop.id","browsegroup");

            DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));

            DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {
                @Override
                public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {
                    try{
                        UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                        if(browseLog !=null){
                            collector.collect(browseLog);
                        }
                    }catch(Exception e){
                        System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                    }
                }
            });
            //设置watermark
            return processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(UserBrowseLog userBrowseLog) {
                    DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
                    DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);
                    //用数字表示时间戳,单位是ms,13位
                    return dateTime.getMillis();
                }
            });
        }

        //浏览类
        public static class UserBrowseLog implements Serializable {
            private String userID;
            private String eventTime;
            private String eventType;
            private String productID;
            private Integer productPrice;

            public String getUserID() {
                return userID;
            }

            public void setUserID(String userID) {
                this.userID = userID;
            }

            public String getEventTime() {
                return eventTime;
            }

            public void setEventTime(String eventTime) {
                this.eventTime = eventTime;
            }

            public String getEventType() {
                return eventType;
            }

            public void setEventType(String eventType) {
                this.eventType = eventType;
            }

            public String getProductID() {
                return productID;
            }

            public void setProductID(String productID) {
                this.productID = productID;
            }

            public Integer getProductPrice() {
                return productPrice;
            }

            public void setProductPrice(Integer productPrice) {
                this.productPrice = productPrice;
            }

            @Override
            public String toString() {
                return "UserBrowseLog{" +
                        "userID='" + userID + '\'' +
                        ", eventTime='" + eventTime + '\'' +
                        ", eventType='" + eventType + '\'' +
                        ", productID='" + productID + '\'' +
                        ", productPrice=" + productPrice +
                        '}';
            }
        }
        //点击类
        public static class UserClickLog implements Serializable{
            private String userID;
            private String eventTime;
            private String eventType;
            private String pageID;

            public String getUserID() {
                return userID;
            }

            public void setUserID(String userID) {
                this.userID = userID;
            }

            public String getEventTime() {
                return eventTime;
            }

            public void setEventTime(String eventTime) {
                this.eventTime = eventTime;
            }

            public String getEventType() {
                return eventType;
            }

            public void setEventType(String eventType) {
                this.eventType = eventType;
            }

            public String getPageID() {
                return pageID;
            }

            public void setPageID(String pageID) {
                this.pageID = pageID;
            }

            @Override
            public String toString() {
                return "UserClickLog{" +
                        "userID='" + userID + '\'' +
                        ", eventTime='" + eventTime + '\'' +
                        ", eventType='" + eventType + '\'' +
                        ", pageID='" + pageID + '\'' +
                        '}';
            }
        }

}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink之IntervalJoin介绍 的相关文章

  • ELK配置记录(filebeat+kafka+Logstash+Elasticsearch+Kibana)

    一 简介 elk日志平台 日志收集 分析和展示的解决方案 满足用户对 志的查询 排序 统计需求 elk架构 filebeat 采集 kafka Logstash 管道 Elasticsearch 存储 搜索 Kibana 日志应用 各组件功
  • Flink on Zeppelin-2

    Flink Interpreter类型 首先介绍下Zeppelin中的Flink Interpreter类型 Zeppelin的Flink Interpreter支持Flink的所有API DataSet DataStream Table
  • Kafka入门基础知识学习笔记-Kafka只是消息引擎吗

    学习极客时间 Kafka核心技术与实战 入门 03 05 作者 胡夕 Apache Kafka 的一名代码贡献者 目前在社区的 Patch 提交总数位列第 22 位 应该说算是国内比较活跃的贡献者了 胡夕老师 赠言 聪明人也要下死功夫 最近
  • 数据中台-让数据用起来-6

    文章目录 第六章 数据开发 数据价值提炼工厂 6 1 数据计算能力的4种类型 6 1 1 批计算 6 1 2 流计算 6 1 3 在线查询 6 1 4 即席分析 6 2 离线开发 1 作业调度 2 基线控制 3 异构存储 4 代码校验 5
  • flink大数据处理流式计算详解

    flink大数据处理 文章目录 flink大数据处理 二 WebUI可视化界面 测试用 三 Flink部署 3 1 JobManager 3 2 TaskManager 3 3 并行度的调整配置 3 4 区分 TaskSolt和parall
  • 流计算框架 Flink 与 Storm 的性能对比

    概述 将分布式实时计算框架 Flink 与 Storm 进行性能对比 为实时计算平台和业务提供数据参考 一 背景 Apache Flink 和 Apache Storm 是当前业界广泛使用的两个分布式实时计算框架 其中 Apache Sto
  • 【基础】Flink -- ProcessFunction

    Flink ProcessFunction 处理函数概述 处理函数 基本处理函数 ProcessFunction 按键分区处理函数 KeyedProcessFunction 定时器与定时服务 基于处理时间的分区处理函数 基于事件时间的分区处
  • 【Docker安装部署Kafka+Zookeeper详细教程】

    Docker安装部署Kafka Zookeeper Docker拉取镜像 Docker拉取zookeeper的镜像 docker pull zookeeper Docker拉取kafka的镜像 docker pull wurstmeiste
  • 大数据简介

    预备篇 目录 知识 大数据简介 计算机单位 大数据的五个 v Hadoop Hadoop概述 Hadoop的历史 Hadoop三大发行版本 1 Apache Hadoop 2 Cloudera Hadoop 3 Hortonworks Ha
  • 大数据笔记--ELK(第一篇)

    一 ELK介绍 1 什么是ELK ELK 是elastic公司提供的一套完整的日志收集以及展示的解决方案 是三个产品的首字母缩写 分别是ElasticSearch Logstash 和 Kibana 1 1 E ELASTICSEARCH
  • Flink_05_状态(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 状态 状态就是一块内存 一个变量 如果要访问历史窗口 或批次 的数据
  • 华为云,站在数字化背后

    一场新的中国数字化战斗 正在被缓缓拉开帷幕 作者 裴一多 出品 产业家 如果说最近的讨论热点是什么 那无疑是互联网云 在数字化进入纵深的当下 一种市面上的观点是互联网的云业务由于盈利等问题 正在成为 被抛弃 的一方 互联网公司开始重新回归T
  • 关于Yarn的一些个人总结

    文章目录 前言 一 Yarn是什么 二 Yarn由什么组成 三 Yarn用来做什么 四 Yarn的优势是什么 五 Yarn解决了什么问题 总结 前言 在前面我们可以得出Yarn是Hadoop生态圈中一个重要得组成部分 主管资源管理 但是具体
  • 2021 CCF大数据与计算智能大赛个贷违约预测top 73 解决方案

    目录 一 概述 二 解题过程 2 1 数据 2 2 构建基线 2 3 进阶思路一 2 4 进阶思路二 2 5 进阶思路三 2 6 融合 2 7 调优提分过程 2 8 其他工作 三 结语 一 概述 这是我第二次参加大数据类型的竞赛 也是第一次
  • 计算机科学丛书(2014-2018.Q1)

    ISBN 名称 作者 出版时间 978 7 111 53451 8 数学设计和计算机体系结构 原书第2版 美 戴维 莫尼 哈里斯 莎拉 L 哈里斯著 978 7 111 44075 8 嵌入式计算系统设计原理 美 Marilyn Wolf著
  • kafka的新API 得到最新一条数据

    业务的需要 需要得到最新的一条消息从kafka中 但是发现ConsumerRecords 这个对象并没有 get index 这种方式的获取并且只能 iterator 或者增强for 循环这种方式来循环 记录 但是有一个count 可以得到
  • MQ - KAFKA 高级篇

    kafak是一个分布式流处理平台 提供消息持久化 基于发布 订阅的方式的消息中间件 同时通过消费端配置相同的groupId支持点对点通信 适用场景 构造实时流数据管道 用于系统或应用之间可靠的消息传输 数据采集及处理 例如连接到一个数据库系
  • 【ranger】CDP环境 更新 ranger 权限策略会发生低概率丢失权限策略的解决方法

    一 问题描述 我们的 kafka 服务在更新 添加 ranger 权限时 会有极低的概率导致 MM2 同步服务报错 报错内容 Not Authorized 但是查看 ranger 权限是赋予的 并且很早配置的权限策略也会报错 相关组件版本
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De

随机推荐

  • JVM Mutex Monitor::lock

    void Monitor lock Thread Self ifdef CHECK UNHANDLED OOPS Clear unhandled oops so we get a crash right away Only clear fo
  • Windows 下PBC库的安装和配置

    背景 PBC库是一个基于双线性对的密码学库 这库在公钥密码学中使用非常广泛 这个库在Linux下的安装非常的简单 有些只会纸上谈兵的人需要在WIN下做 呵呵 但是没办法 需求到了 硬着头皮也要写完 对于一些只会谈兵的人 呵呵 现在主要介绍下
  • jar反编译的.java文件如何可以编辑_修改及反编译可运行Jar包实现过程详解

    将可运行Jar包 反编译成项目 修改代码 再次编译 打包 需要工具 jd gui myeclipse 具体步骤 1 使用jd gui打开原始的Jar包 选择File gt Save All Sources 会生成一个zip压缩包 2 解压这
  • 元素垂直居中的几种方式

    第一种 div div div div boxOne width 200px height 200px background pink display flex justify content center align items cent
  • ps -aux

    查看某个程序的进程号并删除 1 ps aux grep 进程 2 kill 9 进程号 删除该进程
  • visual studio使用教程

    linux疑难问题排查实战 分享了作为公司专家 在项目开发过程中内存优化 堆 栈 代码段 数据段 性能优化 死机 栈越界 堆越界 死锁等疑难问题排查的案例 使用的工具 perf asan strace memleak等 工作经验 大家可以点
  • blender界面基础认识

    blender界面基础认识 自定义功能 Edit gt Preference 界面 主题 视图 灯光 编辑 动画 插件 输入 视图切换 键位映射 系统 保存 加载 文件路径 Blender界面分为以下三个部分 上侧的顶栏 中间的工作区 底部
  • Sklearn专题二 随机森林

    专题二 随机森林 概述 1 集成算法 1 集成算法考虑多个评估器的结果 汇总获取更好的分类 回归表现 2 三种集成算法 装袋法bagging 模型独立 提升法boosting 模型相关 stacking 3 随机森林是一种bagging集成
  • .NET平台常用的开发组件

    工欲善其事 必先利其器 其优雅的编程风格 高效率的开发速度 极度简单的可扩展性 足够强大开发类库 较小的学习曲线 让我对这个平台产生了浓厚的兴趣 在工作和学习中也积累了一些开源的组件 虽然跟Java比Net还是要少 但也足够使用了 其中有一
  • 【C#实现文字转语音功能】

    本文实例为大家分享了C 实现文字转语音的具体代码 供大家参考 具体内容如下 客户提出要求 将文字内容转为语音 因为内网环境 没办法采用联网 在线这种方式 灵机一动 能否写一个简单的例子呢 搜索相关资料还真行 话不多说 有图有真相 关键是 c
  • 网传Spring爆出更大漏洞?别再炒作了…

    之前刚刚过去的log4j2漏洞还历历在目 这次来了个更大的 云舒老大在29日发微博称 出了个超级大漏洞 有吃瓜群众就问 这个瓜有 log4j2 那么好吃吗 云舒大佬的回复是 更大 之后 又有安全大佬sunwear给了一些更细节的信息 所以漏
  • java中类可以包含哪些元素,Java类中包含的元素及作用

    Java类是面试中常考的知识点 是组成Java应用的基本成分 小型和大型的应用都是由类组建而成的 作为合格的Java工程师 一定要清晰了解Java类及其包含的元素及作用 今天学码思Java培训老师就Java类中包含的元素及作用做一个大致讲解
  • springSecurity跨域CORS处理

    续言 之前 知识追寻者写过关于springboot 的跨域处理 并且介绍了跨域相关的概念 具体的可以查看这篇知识追寻者springboot教程系列文章 https zszxz com category springboot article
  • iOS“断点”(Break Point)你不知道多强大

    iOS 断点 Break Point 你不知道多强大 转载自http mp weixin qq com s biz MzA4ODk0NjY4NA mid 230272985 idx 1 sn 045c98bfb2d8dd1ecf7a7321
  • ssh配置config文件,实现vscode免密登陆

    在使用ssh连接服务器时 每一次用vscode连接服务器进行开发 都需要输入密码 相当鸡肋 对config的配置能够实现解决这个问题 step1 生成ssh密钥 如果已经有了就不需要了 使用以下命令 一路回车即可 ssh keygen t
  • 触发connect超时事件

    触发connect超时事件 有关于如何触发connect超时事件 之前相当然的认为在服务器程序accpet函数前阻塞一段事件就好了 这个思路是完全错误的 这是我犯了的一个错误 没有严格的验证自己的程序就将其发布了出来 被小组的小伙伴提问时才
  • shell编程基础: menu drvien script template(菜单脚本模板)

    前言 菜单类脚本 其实就是我们经常使用的交互脚本 在我们安装一个app的时候最常见 我们需要使用交互脚本进行一些配置 在工作中 写一个交互性脚本也同样重要 比如我现在需要写一个数据库授权的脚本 上下文是这样的 我们在开发中都是使用docke
  • LCD和LED屏幕的工作原理总结

    1 点阵取模原理之横向取模与纵向取模 1 1 针式打印机 针式打印机16针是纵向排列 每次打印垂直的16bit 然后右移一bit 继续下列打印 字节的MSB表示最上面的点 字节LSB表示最下面的点 由于汉字字模的点阵是横向排列的 而提供给打
  • 实现单层神经网络

    在前面 我们分别使用逻辑回归和 softmax 回归实现了对鸢尾花数据集的分类 逻辑回归能够实现线性二分类的任务 他其实就是最简单的神经网络 感知机 而softmax回归则实现的是多分类任务 它也可以看做是输出层有多个神经元的单层神经网络
  • Flink之IntervalJoin介绍

    InterValJoin算子 间隔流 一条流去join另一条流去过去一段时间内的数据 该算子将keyedStream与keyedStream转化为DataStream 再给定的时间边界内 默认包含边界 相当于一个窗口 按指定的key对俩个K