flink源码阅读---Flink intervalJoin 使用和原理分析

2023-11-16

1.前言

Flink中基于DataStream的join,只能实现在同一个窗口的两个数据流进行join,但是在实际中常常会存在数据乱序或者延时的情况,导致两个流的数据进度不一致,就会出现数据跨窗口的情况,那么数据就无法在同一个窗口内join。
Flink基于KeyedStream提供的interval join机制,intervaljoin 连接两个keyedStream, 按照相同的key在一个相对数据时间的时间段内进行连接。

2.代码示例

将订单流与订单品流通过订单id进行关联,获得订单流中的会员id。
其中ds1就是订单品流,ds2就是订单流,分别对ds1和ds2通过订单id进行keyBy操作,得到两个KeyedStream,再进行intervalJoin操作;
between方法传递的两个参数lowerBound和upperBound,用来控制右边的流可以与哪个时间范围内的左边的流进行关联,即:
leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound
相当于左边的流可以晚到lowerBound(lowerBound为负的话)时间,也可以早到upperBound(upperBound为正的话)时间。

 

DataStream<OrderItemBean> ds = ds1.keyBy(jo -> jo.getString("fk_tgou_order_id"))
                .intervalJoin(ds2.keyBy(jo -> jo.getString("id")))
                .between(Time.milliseconds(-5), Time.milliseconds(5))
                .process(new ProcessJoinFunction<JSONObject, JSONObject, OrderItemBean>() {

                    @Override
                    public void processElement(JSONObject joItem, JSONObject joOrder, Context context, Collector<OrderItemBean> collector) throws Exception {
                        String order_id = joItem.getString("fk_tgou_order_id");
                        String item_id = joItem.getString("activity_to_product_id");
                        String create_time = df.format(joItem.getLong("create_time"));
                        String member_id = joOrder.getString("fk_member_id");
                        Double price = joItem.getDouble("price");
                        Integer quantity = joItem.getInteger("quantity");
                        collector.collect(new OrderItemBean(order_id, item_id, create_time, member_id, price, quantity));
                    }
                });
ds.map(JSON::toJSONString).addSink(new FlinkKafkaProducer010<String>("berkeley-order-item", schema, produceConfig));

3.Interval Join源码

<1> 使用Interval Join时,必须要指定的时间类型为EventTime

 

 

<2>两个KeyedStream在进行intervalJoin并调用between方法后,跟着使用process方法;
process方法传递一个自定义的 ProcessJoinFunction 作为参数,ProcessJoinFunction的三个参数就是左边流的元素类型,右边流的元素类型,输出流的元素类型。

 

 

 

<3>intervalJoin,底层是将两个KeyedStream进行connect操作,得到ConnectedStreams,这样的两个数据流之间就可以实现状态共享,对于intervalJoin来说就是两个流相同key的数据可以相互访问。
ConnectedStreams的keyby????

<4> 在ConnectedStreams之上执行的操作就是IntervalJoinOperator

 

 

这里有两个参数控制是否包括上下界,默认都是包括的。

a.initializeState()方法
这里面初始化了两个状态对象,

 

 

分别用来存储两个流的数据,其中Long对应数据的时间戳,List<BufferEntry<T1>>对应相同时间戳的数据

b.processElement1和processElement2方法
方法描述的是,当两个流达到之后,比如左边的流有数据到达之后,就去右边的流去查找对应上下界范围内的数据。这两个方法调用的都是processElement方法。

 

private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft) throws Exception {
                
        final THIS ourValue = record.getValue();
        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");
        }

        if (isLate(ourTimestamp)) {
            return;
        }

        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }

(1)获取记录的值和时间戳,判断是否延时,当到达的记录的时间戳小于水位线时,说明该数据延时,不去处理,不去关联另一条流的数据。

 

 

 

    private boolean isLate(long timestamp) {
        long currentWatermark = internalTimerService.currentWatermark();
        return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
    }

(2)将数据添加到对应自己流的MapState缓存状态中,key为数据的时间。
addToBuffer(ourBuffer, ourValue, ourTimestamp);

 

private static <T> void addToBuffer(
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,
            final T value,
            final long timestamp) throws Exception {
        List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
        if (elemsInBucket == null) {
            elemsInBucket = new ArrayList<>();
        }
        elemsInBucket.add(new BufferEntry<>(value, false));
        buffer.put(timestamp, elemsInBucket);
    }

(3)去遍历另一条流的MapState,如果ourTimestamp + relativeLowerBound <=timestamp<= ourTimestamp + relativeUpperBound ,则将数据输出给ProcessJoinFunction调用,ourTimestamp表示流入的数据时间,timestamp表示对应join的数据时间

 

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

对应的collect方法:

 

   private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
        final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

        collector.setAbsoluteTimestamp(resultTimestamp);
        context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

        userFunction.processElement(left, right, context, collector);
    }

设置结果的Timestamp为两边流中最大的,之后执行processElement方法

 

 

 

 

(4)注册定时清理时间

(这一块要明白一个前提,多流join的前提是总的算子的watermark是多个流中较小的那个流的watermark, 也就是说定时时钟注册的时间a,在watermark超过这个时间a之后,代表左右两边流的a之前的数据都进来了。)

 

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }

定时的清理时间,就是当下流入的数据的时间+relativeUpperBound,当watermark大于该时间就需要清理。

 

public void onEventTime(InternalTimer<K, String> timer) throws Exception {

        long timerTimestamp = timer.getTimestamp();
        String namespace = timer.getNamespace();

        logger.trace("onEventTime @ {}", timerTimestamp);

        switch (namespace) {
            case CLEANUP_NAMESPACE_LEFT: {
                long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
                logger.trace("Removing from left buffer @ {}", timestamp);
                leftBuffer.remove(timestamp);
                break;
            }
            case CLEANUP_NAMESPACE_RIGHT: {
                long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
                logger.trace("Removing from right buffer @ {}", timestamp);
                rightBuffer.remove(timestamp);
                break;
            }
            default:
                throw new RuntimeException("Invalid namespace " + namespace);
        }
    }

清理时间逻辑:
假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s,
upperBound为5s,lowerBound为1s
根据 左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s右边时间戳-5s<=左边流时间戳<=右边时间戳-1s
a。如果为左边流数据到达,调用processElement1方法
此时relativeUpperBound为5,relativeLowerBound为1,relativeUpperBound>0,所以定时清理时间为10+5即15s
当时间达到15s时,清除左边流数据,即看右边流在15s时,需要查找的左边流时间范围
10s<=左边流时间戳<=14s,所以watermark>15s时可清除10s的数据。

 

 

b。如果为右边流数据到达,调用processElement2方法
此时relativeUpperBound为-1,relativeLowerBound为-5,relativeUpperBound<0,所以定时清理时间为10s
当时间达到10s时,清除右边流数据,即看左边流在10s时,需要查找的右边流时间范围
11s<=右边流时间戳<=15s,所以可以清除10s的数据。

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink源码阅读---Flink intervalJoin 使用和原理分析 的相关文章

  • 关于Flink Time中的Watermaker案例的详解

    需求 自定义数据源 产出交易订单数据 设置基于事件时间窗口统计 1 交易订单数据 import lombok AllArgsConstructor import lombok Data import lombok NoArgsConstru
  • Flink实战之实时风控规则引擎

    问题导读 1 怎样构建一个风控业务架构 2 风控规则模型有哪些 3 怎样实现Flink CEP 动态更新 一 项目背景 目前钱大妈基于云原生大数据组件 DataWorks MaxCompute Flink Hologres 构建了离线和实时
  • flink state ttl 清理逻辑(截止到flink1.8之前的逻辑)

    在我们开发Flink应用时 许多有状态流应用程序的一个常见要求是自动清理应用程序状态以有效管理状态大小 或控制应用程序状态的访问时间 TTL Time To Live 功能在Flink 1 6 0中开始启动 并在Apache Flink中启
  • Flink 1.17教程:聚合算子(Aggregation)之按键分区(keyBy)

    聚合算子 Aggregation 计算的结果不仅依赖当前数据 还跟之前的数据有关 相当于要把所有数据聚在一起进行汇总合并 这就是所谓的 聚合 Aggregation 类似于MapReduce中的reduce操作 按键分区 keyBy 对于F
  • flink源码阅读---Flink intervalJoin 使用和原理分析

    1 前言 Flink中基于DataStream的join 只能实现在同一个窗口的两个数据流进行join 但是在实际中常常会存在数据乱序或者延时的情况 导致两个流的数据进度不一致 就会出现数据跨窗口的情况 那么数据就无法在同一个窗口内join
  • flink中AggregateFunction 执行步骤以及含义全网详细解释

    package operator import org apache flink api common functions AggregateFunction import org apache flink api common funct
  • 深入理解Flink的水位线

    Apache Flink是一个流处理框架 它支持事件时间和处理时间的概念 在处理流数据时 Flink通过水位线 Watermark 来追踪事件时间的进度 从而支持事件时间的操作 水位线是一种特殊的事件 它表示在此时间戳之前的所有事件都已经到
  • 【Flink系列】配置管理rockmq-flink产生的rocketmq-client日志

    Flink任务集成了rockmq flink用于订阅消费rocketmq的消息 在任务运行过程中发现会在系统的 username logs rocketmqlogs目录下产生rocketmq client log日志 并且这个日志累积和滚动
  • [1143]Flink的Checkpoint和Savepoint

    文章目录 Flink的Checkpoint和Savepoint介绍 第一部分 Flink的Checkpoint 1 Flink Checkpoint原理介绍 2 Checkpoint的简单设置 3 保存多个Checkpoint 4 从Che
  • 【面试真题】今日头条大数据面试100题,收藏备用

    1 简述WordCount 的实现过程 2 简述MapReduce与 Spark 的区别与联系 3 Spark 在客户端与集群运行的区别 4 相同的 SQL 在 HiveSql 与 SparkSQL 的实现中 为什么 Spark 比 Had
  • Flink常用算子总结

    Streaming 算子 Map 将元素处理转换 再输出 map算子对一个DataStream中的每个元素使用用户自定义的Mapper函数进行处理 每个输入元素对应一个输出元素 最终整个数据流被转换成一个新的DataStream 输出的数据
  • flink学习43:基于行的操作map、flatmap、聚合

    Map FlatMap 聚合
  • Flink学习27:驱逐器

    import org apache flink api common eventtime SerializableTimestampAssigner WatermarkStrategy import org apache flink api
  • Flink消费kafka出现空指针异常

    文章目录 出现场景 表现 问题 解决 tombstone Kafka中提供了一个墓碑消息 tombstone 的概念 如果一条消息的key不为null 但是其value为null 那么此消息就是墓碑消息 出现场景 双流join时 采用的是l
  • flink 1.4版本flink table方式消费kafka写入hive方式踩坑

    最近在搞flink 搞了一个当前比较新的版本试了一下 当时运行了很长时间 hdfs里面查询有文件 但是hive里面查询这个表为空 后面用了很多种方式 一些是说自己去刷新hive表 如下 第一种方式刷新 alter table t kafka
  • 如何在 Flink 1.9 中使用 Hive?

    Flink on Hive 介绍 SQL 是大数据领域中的重要应用场景 为了完善 Flink 的生态 发掘 Flink 在批处理方面的潜力 我们决定增强 FlinkSQL 的功能 从而让用户能够通过 Flink 完成更多的任务 Hive 是
  • flink-addSource和addSink分别是kafka、自定义数据、mysql、hbase的java实现

    flink主程序 public class FinkTest public static void main String args throws Exception StreamExecutionEnvironment env Strea
  • flink学习之state

    state作用 保留当前key的历史状态 state用法 ListState
  • Flink_06_ProcessAPI(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 侧输出流 SideOutput 即分支流 可以用来接收迟到数据 也可
  • 在JDK17尝鲜Flink1.17

    在JDK17尝鲜Flink1 17 前言 还没玩明白老版本 Flink1 17就来了 总还是要向前看的 根据官网文档 https nightlies apache org flink flink docs release 1 17 docs

随机推荐

  • DFS遍历框架解决岛屿问题

    补充在开头 这篇题解 是我在做leetcode无意中看到的 原po将此类问题解释的非常清晰 故将其转载并添加上对应题目的解题代码 仅用做个人的学习笔记 不做任何商业用途 引言 我们所熟悉的 DFS 深度优先搜索 问题通常是在树或者图结构上进
  • [UE4笔记] 3DUI空间交互

    在Actor中使用widget的时候制作3DUI 如果直接选择World模式 鼠标和UI是无法产生交互的 只要勾选了红框里的选项 就可以正常交互了 老是忘记 所以写个笔记记录一下 效果 注意 如果你还是点击不到 输入控制台命令showfla
  • 第十题

    第十题 限制元素 修改限制 把3改了即可
  • 【嵌入式基础】串口通信

    目录 1 前言 2 基本概念 2 1 波特率 2 2 起始位 2 3 数据位 2 4 校验位 2 5 停止位 2 6 空闲位 3 工作模式 3 1 单工模式 3 2 半双工模式 3 3 全双工模式 4 同步通信和异步通信 4 1 同步通信
  • 【Mac 教程系列第 18 篇】如何修改 iTerm2 的背景图片

    这是 Mac 教程系列第 18 篇 如果觉得有用的话 欢迎关注专栏 默认终端的效果图是这样的 修改后的效果如下图所示 如果你已经习惯了默认终端的样式 前期看有背景图片的可能会不太适应 不过有时候尝试一下不同的风格 也许会有不同的体验 如何实
  • YOLO5 目标检测

    目录 1项目的克隆和必要的环境依赖 1 1项目的克隆 1 2项目代码结构整体介绍 1 3环境的安装和依赖的安装 2 数据集和预训练权重的准备 2 1利用labelimg标注数据和数据的准备 2 2 获得预训练权重 3训练自己的模型 3 1修
  • TCP详解 (三)Nagle算法和延迟确认

    文章目录 延迟确认 Nagle算法 Nagle算法遇上延迟确认 关闭Nagle算法 一些有关TCP通信量的研究如 Caceresetal 1991 发现 如果按照分组数量计算 约有一半的TCP报文段包含成块数据 如 FTP 电子邮件和 Us
  • unitTest+Ddt数据驱动测试

    我们设计测试用例时 会出现测试步骤一样 只是其中的测试数据有变化的情况 比如测试登录时的账号密码 这个时候 如果我们依然使用一条case一个方法的话 会出现大量的代码冗余 而且效率也会大大降低 此时 ddt模块就能帮助我们解决这个问题 dd
  • 微信王者有ios的服务器吗,王者IOS微信区国服瑶多有钱?凌晨撒4W红包,点开头像傻眼...

    原标题 王者IOS微信区国服瑶多有钱 凌晨撒4W红包 点开头像傻眼 大家好 在王者荣耀这款游戏里面 有一个英雄的地位是非常特殊的 那就是被女玩家们所深爱着的瑶 但她又被男玩家所厌恶 毕竟女玩家和男玩家玩游戏的目的不一样 女玩家是为了快乐 她
  • 深度讲解一下远程控制软件哪家好?推荐一款免费不限速的好软件给大家!

    小编今天要推荐一款较为小众的远程控制软件 通过远程桌面可以极大地方便我们进行远程技术支持 远程办公 然而我们熟知 QQ 远程 windows自带的远程协助 使用起来并不理想 不是连接不顺畅就是操作技术高 相比之下 专门的远程桌面软件的体验更
  • NumPy 学习笔记(二):NDArray

    导入 NumPy 开始学习 import numpy as np 不用 Python 非好汉 不晓 NumPy 真遗憾 本专栏 将使用 图解 以及 脑图 的方法来记录我的 图解 NumPy 学习笔记 NumPy 是 Numerical Py
  • 悟空crm-0.5.4 (OpenLogic CentOS7.2)

    平台 CentOS 类型 虚拟机镜像 软件包 5kcrm0 5 4 centos7 2 lamp stack 5 6 22 commercial crm lamp 服务优惠价 按服务商许可协议 云服务器费用 查看费用 立即部署 产品详情 产
  • ValueError: not enough values to unpack (expected 2, got 1)错误解决方案

    在学习python时 遇到了错误 现已解决 源代码如下 role line spoken each line split 1 错误如下 ValueError not enough values to unpack expected 2 go
  • 搜索服务应用:solr的使用

    开始前 环境 solr4 10 3 jdk1 7 tomcat7 下载地址 http archive apache org dist lucene solr 说明 solr和lucen更新是同步的 请配对使用 lucene用什么版本solr
  • 金山文档手机app服务器异常,手机金山文档出现这个文件大家有没有遇到过,在线求解谢谢了。{...

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 手机金山文档出现这个文件大家有没有遇到过 在线求解谢谢了 version 3 UpdateFrequency 1 AppIDConfig Global DataReport UserPortra
  • 相机参数原理深入剖析 与 实际运用

    1 相机内参与应用 fx fy u0 v0只与摄像机内部参数有关 故称矩阵M1为内参数矩阵 其中fx f dX fy f dY 分别称为u轴和v轴上的归一化焦距 f是相机的焦距 dX和dY分别表示传感器u轴和v轴上单位像素的尺寸大小 单位为
  • 三角函数公式

    转自 https baike baidu com item E4 B8 89 E8 A7 92 E5 87 BD E6 95 B0 E5 85 AC E5 BC 8F 4374733 fr aladdin 三角函数是数学中属于 初等函数中的
  • 现在学java的都是傻子

    不经意的看见 看到学java的都是傻子 当不经意看到 说明 这个最近已经在网上疯传了很多 说明目前这个行业真的已经不好了 所以你得自己当心了 在这个行业不知有多少学习了又放弃了 博主我也是其中之一 从博主我的名字相信大家也可以看出来 从放弃
  • Stem-and-Leaf Plot in R

    Data set faithful 272 2 Waiting time between eruptions and the duration of the eruption for the Old Faithful geyser gt d
  • flink源码阅读---Flink intervalJoin 使用和原理分析

    1 前言 Flink中基于DataStream的join 只能实现在同一个窗口的两个数据流进行join 但是在实际中常常会存在数据乱序或者延时的情况 导致两个流的数据进度不一致 就会出现数据跨窗口的情况 那么数据就无法在同一个窗口内join