Flink 多流转换 (五) 间隔联结(Interval Join)

2023-11-16

顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配

间隔连接通过一个共同的key连接两个流(A&B)中的数据,流 B 的数据具有时间戳位于流 A 中元素的相对时间间隔中。

这也可以更正式地表达成 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 或者 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

a和b是A,B流中有共同key的数据。只要满足下界(lowerBound)小于等于上界(upperBound),下界和上界都是可以是正数或者负数。间隔连接目前只支持内连接。

当一对数据被传递给了ProcessJoinFunction,他们会被指定为两个数据中更大的时间戳(可以通过ProcessJoinFunction.Context访问)。间隔连接目前只支持事件时间。

间隔联结的调用

间隔联结在代码中,是基于 KeyedStream 的联结(join)操作。DataStream 在 keyBy 得到KeyedStream 之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个 KeyedStream,两者的 key 类型应该一致;得到的是一个 IntervalJoin 类型。后续的操作同样是完全固定的:
先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。
通用调用形式如下:

stream1
 .keyBy(<KeySelector>)
 .intervalJoin(stream2.keyBy(<KeySelector>))
 .between(Time.milliseconds(-2), Time.milliseconds(1))
 .process (new ProcessJoinFunction<Integer, Integer, String(){
 @Override
	 public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
 		out.collect(left + "," + right);
 	}
 });

抽象类 ProcessJoinFunction 就像是 ProcessFunction 和 JoinFunction 的结合,内部同样有一个抽象方法.processElement()。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中 left 指的就是第一条流中的数据,right 则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的.processElement()方法,经处理转换之后输出结果。

间隔联结实例

电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近10秒中的浏览数据进行一个联结查询。

下面是一段示例代码:Gitee中代码

public class IntervalJoinTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> orderStream = env.fromElements(
                Tuple2.of("Mary",  5000L),
                Tuple2.of("Alice",  5000L),
                Tuple2.of("Bob",  20000L),
                Tuple2.of("Alice", 20000L),
                Tuple2.of("Cary", 51000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                        return element.f1;
                    }
                })
        );

        SingleOutputStreamOperator<Event> clickStream = env.fromElements(
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 36000L),
                new Event("Bob", "./home", 30000L),
                new Event("Bob", "./prod?id=1", 23000L),
                new Event("Bob", "./prod?id=3", 33000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );

        orderStream.keyBy(data -> data.f0)
                .intervalJoin(clickStream.keyBy(data -> data.user))
                .between(Time.seconds(-5), Time.seconds(10))
                .process(new ProcessJoinFunction<Tuple2<String, Long>, Event, String>() {
                    @Override
                    public void processElement(Tuple2<String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
                        out.collect(right + "=>" + left);
                    }
                }).print();


        env.execute();
    }
}

结果如下:

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 多流转换 (五) 间隔联结(Interval Join) 的相关文章

随机推荐

  • 高防IP是什么?有什么作用?

    最近不少来咨询的客户都问什么是高防IP 客户接入这个高防IP的话需要做些什么准备 今天就给大家科普一下什么是高防IP 他的原理是什么 客户接入需要做些什么 高防IP是指高防机房所提供的IP段 主要是针对网络中的DDOS攻击进行保护 在网络世
  • CompiledEffect Direct3D9 Sample fxc.exe

    转载于 https www cnblogs com Agravity p 5138141 html
  • 微信小程序——自定义日期时间组件实现

    目前 微信小程序选择器提供了日期选择器 时间选择器等 但没有日期时间一体的选择器 当项目中需要进行日期时间选择时 我们只有自定义组件了 首先 我们需要先了解一下小程序的picker组件 详细使用见链接 https developers we
  • UDP通过广播的形式发送、接收结构体

    UDP是一种无连接的 面向数据包的传输协议 通过广播的方式可以向多个接收方发送数据 在C 中 可以使用socket库来实现UDP广播的发送和接收 下面是一篇CSDN博文 介绍了如何使用C 通过广播的形式发送和接收结构体 一 UDP广播的发送
  • 电商的1000+篇文章总结

    电商的1000 篇文章总结 本文收集和总结了有关电商的1000 篇文章 由于篇幅有限只能总结近期的内容 想了解更多内容可以访问 http www ai2news com 其分享了有关AI的论文 文章 图书 query 6 合并大舞台的背后
  • cocos2dx:瓦片地图加载失败及黑线问题

    问题 1 瓦片地图加载失败 运行时 获取瓦片地图的层失败 调试发现 获取的层是一个 NULL 遇到这个问题 我第一反应是图片路径有问题 但经过检查发现 路径没有问题 然我就怀疑 是我的代码有问题 然而并没有 调试无果 我就把目光转向瓦片地图
  • 23种设计模式之模板方法模式

    文章目录 模板方法模式 模板方法模式的优缺点 使用场景 模板方法模式 模板方法模式法 Template Method 定义一个操作中的算法骨架 而将算法的一些步骤延迟到子类中 使得子类可以不改变该算法结构的情况下重定义该算法的某些特定步骤
  • pushd命令

    1 功能pushd命令常用于将目录加入到栈中 加入记录到目录栈顶部 并切换到该目录 若pushd命令不加任何参数 则会将位于记录栈最上面的2个目录对换位置2 语法 1 格式 pushd 目录 N N n 2 选项目录 将该目录加入到栈顶 并
  • 边界框回归的魔法:揭秘精准高效的MPDIoU损失函数

    文章目录 摘要 1 简介 2 相关工作 2 1 目标检测和实例分割 2 2 场景文本识别 2 3 边界框回归的损失函数 3 点距最小的并集交点 4 实验结果 4 1 实验设置 4 2 数据集 4 3 评估协议 4 4 目标检测的实验结果 4
  • 图神经网络与因果推理

    传统的因果推理基于线性结构方程模型 深度因果推理模型 这是基于图神经网络的模型 利用扁粉自动编码机来学习模型 其中 网络结构 因果推理模型为
  • iOS的终端命令和linux命令,iOS 终端 shell 操作,Mac 操作快捷键

    shell 操作命令 简单的shell 命令操作指令 pwd 当前工作目录 cd 不加参数 进root cd folder 进入文件夹 cd 上级目录 cd 返回root cd 返回上一个访问的目录 rm 文件名 删除 文件 rm 删除当前
  • vscode 配置文件

    将设置放入此文件中以覆盖默认设置 editor fontSize 18 editor fontFamily Source Code Pro Noto Sans CJK SC Consolas editor rulers 120 editor
  • 监听文件读取进度,中断文件读取

  • 使某个dom元素匀速滑动到容器顶部

    需求 我有个侧面的菜单 点击对应的菜单标题 可以让左侧的容易里对应标题滑到最顶上 于是我封装了一个缓动函数 这个函数接受3个参数 需要滚动的最外层的容器 滚动到什么距离 滚动时间 注意点 第一个参数的最外层的容器 需要滚动条 没滚动条是没有
  • Java AOP有5种增强方式注解——前置@Before,后置@After,返回@AfterReturning,异常@AfterThrowing,环绕@Around

    执行顺序 前置增强 gt 目标函数 gt 后置增强 gt 返回增强 异常增强 注意 Before After AfterRunning和 AfterThrowing修饰的方法没有返回值 而 Around修饰的方法必须有返回值 Aspect
  • 使用html制作3D循环相册

    使用 html 制作简单3D循环相册 注 img 标签中的 src 属性为你图片资源路径
  • typescript学习笔记

    typescript学习笔记 一 简介 typescript是js的超集 主要学习ts里面的原始类型 字面量类型 数组类型 函数类型 类类型 接口类型 类型别名 联合交叉类型 枚举类型 泛型等类型元素 以及类型推断 类型断言 类型缩小 类型
  • 开发BLE蓝牙手机上位机软件竟是如此的简单

    做安卓手机APP通常使用高端大气的Android Studio 开发语言是Java 该开发IDE就是体积大 学习成本高 入手慢 这里给大家推荐一款国产的软件 体积小 运行和编译速度快 最重要的是编程语言是中文 这个软件就是E4A 本次我将向
  • 【手把手】如何使用置信学习cleanlab对数据集进行去噪

    开源代码 https gitee com qq874455953 cleanlab nlp keras 目前可能是数据集选的不好 数据集本身没什么噪声 所以有点小问题 不过总体框架是搭建起来的 前言 在做一个项目的时候 发现数据集噪声非常多
  • Flink 多流转换 (五) 间隔联结(Interval Join)

    文章目录 间隔联结的调用 间隔联结实例 顾名思义 间隔联结的思路就是针对一条流的每个数据 开辟出其时间戳前后的一段时间间隔 看这期间是否有来自另一条流的数据匹配 间隔连接通过一个共同的key连接两个流 A B 中的数据 流 B 的数据具有时