关于Flink Time中的Watermaker案例的详解

2023-11-12

需求自定义数据源,产出交易订单数据,设置基于事件时间窗口统计
在这里插入图片描述

  • 1)、交易订单数据
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author liu a fu
 * @version 1.0
 * @date 2021/3/9 0009
 * @DESC   TODO: 自定义数据源,产出交易订单数据,设置基于事件时间窗口统计。
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {
    private String id;
    private String userId;
    private Integer money;
    private Long orderTime;
}
  • 2)、自定义数据源
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author liu a fu
 * @version 1.0
 * @date 2021/3/9 0009
 * @DESC   自定义数据源,实时产生订单数据,继承RichParallelSourceFunction接口
 */
public class OrderSource extends RichParallelSourceFunction<Order> {

    //标识符  表示是否产生数据
    private boolean isRunning = true;

    //不断执行 实时产生数据
    @Override
    public void run(SourceContext<Order> ctx) throws Exception {
        int[] times = new int[]{0, 0, 8, 0, 10, 5, 15, 12};
        //随机实例对象
        Random random = new Random();
        while (isRunning){
            //创建订单对象
            Order order = new Order(
                    UUID.randomUUID().toString().substring(1, 18), //
                    "u100" + random.nextInt(2), // u1000, u1001
                    random.nextInt(100) + 1, //
                    // 为了演示生成的订单数据乱序达到Flink应用,当获取当前时间戳以后,再减去随机时间0,1,2,3,4
                    System.currentTimeMillis() - times[random.nextInt(times.length)] * 1000//
            );

            System.out.println("Order >>>" + order);
            //发送数据
            ctx.collect(order);

            // TODO: 每秒钟产生一条数据
            TimeUnit.SECONDS.sleep(2);
        }
    }

    @Override
    public void cancel() {
        // 当不在接收数据时,设置isRunning为false
        isRunning = false;
    }

}

  • 3)、编写主程序测试
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author liu a fu
 * @version 1.0
 * @date 2021/3/9 0009
 * @DESC 滚动事件时间窗口(Tumbling EventTime Window)统计:每隔5秒,计算5秒内,每个用户的订单金额
 *            TODO:添加Watermark水位线,来解决一定程度上的数据延迟和数据乱序问题
 */
public class StreamEventTimeWatermarkWindow {
    public static void main(String[] args) throws Exception {
        //1-环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // TODO: step1. 设置时间语义:事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //2-数据源source
        DataStreamSource<Order> orderSource = env.addSource(new OrderSource());
        // TODO: step2. 提取事件时间字段值,转换为Long类型,并且设置最大允许的延迟时间或乱序时间
        SingleOutputStreamOperator<Order> timeDataStream = orderSource.assignTimestampsAndWatermarks(
                // TODO: 设置最大运行延迟时间,从而计算出每条数据Watermark水位线 = 事件时间 - 最大允许延迟时间
                new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(Order order) {
                        return order.getOrderTime();
                    }
                }
        );

        //3-数据的transformation    每隔5秒,计算5秒内,每个用户的订单金额
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDataStream = timeDataStream
                .map(new MapFunction<Order, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(Order order) throws Exception {
                        return Tuple2.of(order.getUserId(), order.getMoney());
                    }
                })
                .keyBy(0)  //按照用户ID分组
                // step3. 设置事件时间窗口大小
                .timeWindow(Time.seconds(5))
                //聚合计算
                .sum(1);
        //4-数据的sink
        sumDataStream.printToErr();
        //5-execute
        env.execute(StreamEventTimeWatermarkWindow.class.getSimpleName());
    }
}

  • 运行程序, 分析结果,但是不容易看到窗口时间范围及数据时间值,所以需要使用apply函数进行聚合。

在这里插入图片描述

  • 自己编写上述代码功能,在设置每条数据水位线时使用AssignerWithPeriodicWatermarksapply函数聚合数据,获取窗口起始和结束时间。具体代码如下:
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;

/**
 * @author liu a fu
 * @version 1.0
 * @date 2021/3/9 0009
 * @DESC 滚动事件时间窗口(Tumbling EventTime Window)统计:每隔5秒,计算5秒内,每个用户的订单金额
 *           TODO:添加Watermark水位线,来解决一定程度上的数据延迟和数据乱序问题
 */
public class StreamEventTimeWatermarkWindowDebug {
    public static void main(String[] args) throws Exception {
        //1-执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // TODO:step1. 设置流试时间语义:事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //2-数据源source
        DataStreamSource<Order> orderSource = env.addSource(new OrderSource());
        //TODO: step2. 提取事件时间字段值,转换为Long类型,并且设置最大允许的延迟时间或乱序时间
        FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd hh:mm:ss");
        SingleOutputStreamOperator<Order> orderDataStream = orderSource
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Order>() {
                    // Watermark延迟时间(允许最大数据乱序时间), 此处设置最大延迟2秒
                    Long maxOutOfOrderness = 2000L;
                    // 当前最大事件时间
                    Long currentMaxTimestamp = Long.MIN_VALUE + 2000L;
                    // 最新的水位时间
                    Long lastEmittedWatermark = Long.MIN_VALUE;

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        //依据当前数据中时间时间,计算出水位 Watermaker值
                        long potentialWatermark = currentMaxTimestamp - maxOutOfOrderness;
                        // 比较当前数据计算Watermark值与上次数据Watermark值大小,设置Watermark
                        if (potentialWatermark >= lastEmittedWatermark) {
                            lastEmittedWatermark = potentialWatermark;
                        }

                        return new Watermark(lastEmittedWatermark);
                    }

                    @Override
                    public long extractTimestamp(Order order, long previousElementTimestamp) {
                        //获取订单产生的当前时间
                        Long eventTime = order.getOrderTime();
                        // 比较当前数据事件时间与最大事件时间大小
                        if (eventTime > currentMaxTimestamp) {
                            currentMaxTimestamp = eventTime;
                        }
                        // 打印当前数据Watermark值
                        System.out.println(
                                "userId: " + order.getUserId()
                                        + ", money: " + order.getMoney()
                                        + ", eventTime: " + format.format(eventTime)
                                        + ", watermark: " + format.format(getCurrentWatermark().getTimestamp())
                        );
                        return eventTime;
                    }
                });

        // 3. 数据转换-transformation: 每隔5秒,计算5秒内,每个用户的订单金额
        SingleOutputStreamOperator<String> sumStream = orderDataStream
                .keyBy("userId")
                // TODO: 设置事件时间窗口,大小为5秒
                .timeWindow(Time.seconds(5))
                // 窗口函数聚合
                .apply(new WindowFunction<Order, String, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple,
                                      TimeWindow window,
                                      Iterable<Order> input,
                                      Collector<String> out) throws Exception {

                        // 获取窗口StartTime和EndTime
                        String startTime = format.format(window.getStart());
                        String endTime = format.format(window.getEnd());

                        // 获取组合所有订单时间
                        List<String> list = new ArrayList<>();
                        Integer orderSum = 0;
                        for (Order order : input) {
                            list.add(format.format(order.getOrderTime()));
                            orderSum += order.getMoney();
                        }
                        String output = "窗口>>> userId: " + tuple.toString()
                                + " -> [" + startTime + " ~ " + endTime
                                + "],  sum: " + orderSum + ", Orders: " + list;
                        out.collect(output);
                    }
                });

        // 4. 数据终端-sink
        sumStream.printToErr();

        // 5. 触发执行-execute
        env.execute(StreamEventTimeWatermarkWindowDebug.class.getSimpleName());

    }
}

在这里插入图片描述

  • 分析上面案例分析,如果数据延迟到时间较久(通过Watermark机制,处理乱序数据,都没有处理到的数据),可以使用Allowed Lateness机制和侧边流处理延迟数据
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

关于Flink Time中的Watermaker案例的详解 的相关文章

  • 【Java script基础学习】本地对象 - Date

    Date 日期对象 用来操作计算机的日期和时间 获取 获取当前日期时间 获取当前的时间戳 Date now 时间戳 从1970 1 1 0 0 0 到此刻的毫秒数 获取完整的日期对象 new Date 获取到的是一个对象类型的日期 包含日期

随机推荐

  • STM32MP157驱动开发——Linux LCD驱动(上)

    STM32MP157驱动开发 Linux LCD驱动 上 0 前言 一 LCD 和 LTDC 简介 1 LCD 简介 1 分辨率 2 像素格式 3 LCD 屏幕接口 4 LCD 时间参数 5 RGB LCD 屏幕时序 6 像素时钟 7 显存
  • 【Python pygame】零基础也能轻松掌握的学习路线与参考资料

    Python pygame是一款专门用于开发游戏和多媒体应用程序的Python库 它可以帮助开发者实现丰富多彩的图形界面和实时动态交互效果 本篇文章将为大家介绍Python pygame的学习路线 包括入门基础 进阶知识以及优秀实践 帮助大
  • C++中return语句的用法

    C 中的return语句是函数中一个重要的语句 return语句用于结束当前正在执行的函数 并将控制权返回给调用此函数的函数 return语句有两种形式 return return expression 1 没有返回值的函数 不带返回值的r
  • 贪吃蛇(一)--用C++编写一个简单的贪吃蛇

    这里简单介绍怎么用C 编写一个简单的黑白框的贪吃蛇游戏 复杂的加了可视化界面程序点击这里贪吃蛇 二 easyX图形库进行可视化界面制作 首先分析在黑白框中的贪吃蛇需要哪些功能 1 需要能在界面指定位置 x y 直接输出对应内容 2 需要动态
  • 学习的逻辑: 知识经济学

    来自http liguanglei name blogs 2012 11 28 the logic of learning 1 怎么证明学会了 2 你的身价是由你表现出来的知识决定的 不是你掌握的知识决定的 万物有始皆有终 我们的逻辑链条起
  • GEO分析

    title R Notebook output html notebook 1 下载加载包 cran packages lt c tidyr tibble dplyr stringr ggplot2 ggpubr factoextra Fa
  • 汽车租赁毕业设计

    一个基于Java汽车租赁管理系统的毕业设计网站 系统分为用户和管理员2个角色 详细介绍请见下文 一 环境信息 运行环境 java8 mysql5 6 开发语言 java 开发框架 springboot springmvc mybatis t
  • 物联网云平台系统设计

    物联网云平台系统设计 下面将谈到几个关键问题 设备如何接入网络 设备间如何通信 物联网数据的用途 如何搭建起一个物联网系统框架呢 它的技术架构又是怎么样呢 物联网终端软件系统架构 物联网云平台系统架构 1 物联网设备如何接入到网络 只有设备
  • VScode 中文显示出现黄色方框的解决方法

    VScode 中文显示出现黄色方框的解决方法 使用 VScode 打开源码时 发现注释中的汉字都被一个黄色的方框圈住了 这是因为使能了批注中字符的突出显示的功能 不喜欢这个黄色方框的小伙伴 可以参照下列步骤 禁用批注中字符的突出显示 将鼠标
  • 【统计学习方法】感知机Python 对偶形式实现

    代码可在Github上下载 https github com FlameCharmander MachineLearning 前言 上篇博文感知机的原始形式提到了感知机的原始形式 而这篇博文介绍的是感知机的对偶形式 算法理论 感知机的原始形
  • 数据集成:数据挖掘的准备工作之一

    欢迎来到我的博客 作者 秋无之地 简介 CSDN爬虫 后端 大数据领域创作者 目前从事python爬虫 后端和大数据等相关工作 主要擅长领域有 爬虫 后端 大数据开发 数据分析等 欢迎小伙伴们点赞 收藏 留言 关注 关注必回关 上一篇文章已
  • 人工智能算法分类

    一 人工智能学习算法分类 人工智能算法大体上来说可以分类两类 基于统计的机器学习算法 Machine Learning 和深度学习算法 Deep Learning 总的来说 在sklearn中机器学习算法大概的分类如下 纯算法类 1 回归算
  • Android Studio 自动配置Gradle失败处理方法,Could not install Gradle distribution from “....zip

    转载 https blog csdn net BraveAnt666 article details 124736258 提示的错误一般是https services gradle org distributions gradle 7 3
  • python后端学习(十三)路由支持正则、Url编码、增删改操作、增加log日志

    路由支持正则 编码 增删改操作 增加log日志 mini frame py import re url编码相关 import urllib parse import logging from pymysql import connect U
  • OpenMV输出PWM,实现对舵机控制

    OpenMV的定时器官方函数介绍 Timer类 控制内部定时器 目录 OpenMV的PWM资源介绍 为什么要用OpenMV输出PWM OpenMV的PWM资源分配 资源 注意 建议 PWM输出代码 代码讲解 Timer Timer chan
  • Matlab的fspecial函数

    参考 http www ilovematlab cn thread 52886 1 1 html 函数原型 h fspecial type h fspecial type para 根据函数原型对fspecial函数作个说明 fspecia
  • QT笔记-窗体创建方式

    1 非模式窗体 窗体创建方式1 Modal属性决定了show 应该将弹出的dialog设置为模式状态还是非模式状态 默认情况下改属性为false并且show 弹出的窗口是非模式状态的 把这个属性设置为true就相当于QWidget wind
  • webpack引入第三方库的方式,以及注意事项

    一般情况下 我们不用担心所使用的第三方库 在npm管理仓库中找不到 如果需要某一个库 如 jquery 可以直接运行npm install jquery脚本命令来安装这个项目所需要的依赖 然后 在使用jquery的模块文件中 通过impor
  • vs2019找不着工具箱了_VS2019 Nuget找不到包的问题处理

    VS不记得改了什么设置之后 发现找不到EF 解决办法 1 点击右侧的设置按钮 2 弹出窗中左侧树形结构选择 程序包源 再点击右上方的添加按钮 然后点击更新 确定按钮 再次搜索就可找到EF安装包 vs2019中NuGet控制台的常用命令 Db
  • 关于Flink Time中的Watermaker案例的详解

    需求 自定义数据源 产出交易订单数据 设置基于事件时间窗口统计 1 交易订单数据 import lombok AllArgsConstructor import lombok Data import lombok NoArgsConstru