Flink实战: 窗口TopN分析与实现

2023-11-09

TopN 的需求场景不管是在离线计算还是实时计算都是比较常见的,例如电商中计算热门销售商品、广告计算中点击数前N的广告、搜索中计算搜索次数前N的搜索词。topN又分为全局topN、分组topN, 比喻说热门销售商品可以直接按照各个商品的销售总额排序,也可以先按照地域分组然后对各个地域下各个商品的销售总额排序。本篇以热门销售商品为例,实时统计每10min内各个地域维度下销售额top10的商品。

这个需求可以分解为以下几个步骤:

  • 提取数据中订单时间为事件时间

  • 按照区域+商品为维度,统计每个10min中的销售额

  • 按照区域为维度,统计该区域的top10 销售额的商品

时间提取

数据源类型是Kafka, 数据为订单数据包含:订单id、订单时间、商品id、区域id、订单金额(包含用户Id在这里省略)

case class Order(orderId: String, orderTime: Long, gdsId: String, amount: Double, areaId: String)

我们这里统计的每10min内的数据,希望按照真实的订单时间统计,那么使用事件时间EventTime,考虑到可能存在数据乱序问题,允许最大延时为30s

val orderStream=ds.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(30)) {

 override def extractTimestamp(element: Order): Long = element.orderTime

 })

统计销售额

统计每10min中的销售额,例如[9:00,9:10]、[9:10,9:20] 等等,对应Flink中事件时间滚动滚动窗口

val amountStream=dataStream.keyBy(x => {

 x.areaId + "_" + x.gdsId

 }).timeWindow(Time.minutes(10))

 .reduce(new ReduceFunction[Order] {

 override def reduce(value1: Order, value2: Order): Order = {

 Order(value1.orderId, value1.orderTime, value1.gdsId, value1.amount + value2.amount, value1.areaId)

 }

 })

首先以区域areaId与商品gdsId进行keyBy操作分组,使得相同的key流入到同一个task的window 里面计算,窗口函数包含WindowFunction、ReduceFunction、AggregateFunction,由于使用的是聚合操作,无需保留中间结果数据所以直接使用ReduceFunction边读取数据边聚合,减少内存使用。在ReduceFunction中直接对两个order数据销售额相加得到一个新的订单数据

区域维度的top10 销售额的商品

到目前为止已经拿到了每个10min内各个区域下的各个商品的销售额amountStream,现在需要对其按照区域为维度分组,计算top10销售额的商品,需要考虑两个问题:

  • 如何获取到10min窗口的所有数据

  • 如何排序

先看第一个如何获取到10min窗口的数据,也就是amountStream的每个窗口的输出,这个其实在Flink 官网上也给出了说明,那么就是直接在后面接一个相同大小的窗口即可,那么后面的窗口即获取到了前一个窗口的所有数据,代码如下:

amountStream.keyBy(_.areaId)

 .timeWindow(Time.minutes(10))

 .apply(...)

其实笔者最开始对这里也是不解,为什么后面接一个相同的窗口就能够获取到前一个窗口的输出呢?直到看了一下这里的源码来慢慢理解,事件时间窗口的触发是依靠watermark来推动的,

//AbstractStreamOperator中

public void processWatermark(Watermark mark) throws Exception {

 if (timeServiceManager != null) {

 timeServiceManager.advanceWatermark(mark);

 }

 output.emitWatermark(mark);

 }

这里在前面的时间系统系列也有分析到,advanceWatermark会触发满足要求的窗口,并且将窗口的结果输出,之后在才输出watermark, 在这里有一个很重要的关系watermark是在窗口数据输出之后输出,那么下一个窗口是如何判断上一个窗口的输出应该划分在同一个窗口呢,当然是按照时间,但是窗口输出数据时间是什么呢?

//WindowOperator

private void emitWindowContents(W window, ACC contents) throws Exception {

 timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());

 processContext.window = window;

 userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);

 }

可以看到TimestampedCollector类型的collector,设置的时间正是窗口的endTime, 也就是窗口输出数据的数据时间就是窗口的endTime, 那么同一个窗口的输出数据具有相同的数据时间endTime, 这些数据正好可以在下游窗口被分配到同一个窗口中。在上一个窗口触发之后输出watermark正好可以触发下游窗口的窗口操作。

到现在我们可以获取到每一个地域下的所有商品销售额信息,接下来就是完成排序操作,很容易想到的就是Sorted的数据结构TreeSet或者是优先级队列PriorityQueue , TreeSet 实现原理是红黑树,优先队列实现原理就是最大/最小堆,这两个都可以满足需求,但是需要选择哪一个呢?红黑树的时间复杂度是logN,而堆的构造复杂度是N, 读取复杂度是1, 但是我们这里需要不断的做数据插入那么就涉及不断的构造过程,相对而言选择红黑树比较好(其实flink sql内部做topN也是选择红黑树类型的TreeMap)。

最后一点,是否需要保存所有的数据排序?很显然是不需要的,将TreeSet设置成为升序排序,那么第一个节点数据就是最小值,当TreeSet里面的数据到达N, 就获取第一个节点数据(最小值)与当前需要插入的数据进行比较,如果比其大,则直接舍弃,如果比其小,那么就将TreeSet中第一个节点数据删除,插入新的数据,最终得到的TreeSet 数据就是我们需要的topN。看下apply中代码实现:

 override def compare(o1: Order, o2: Order): Int = (o1.amount-o2.amount).toInt

 })

 input.foreach(x => {

 if (topMap.size() >= N) {

 val min=topMap.first()

 if(x.amount>min.amount) {

 topMap.pollFirst() //舍弃

 topMap.add(x)

 }

 }else{

 topMap.add(x)

 }

 })

 //这里直接做打印

 topMap.foreach(x=>{

 println(x)

 })

 }

最后直接执行main函数,为了方便做了一个简单的测试只获取1min以内top3,kafka输入数据:

orderId02,1573483405000,gdsId01,500,beijing

orderId03,1573483408000,gdsId02,200,beijing

orderId03,1573483408000,gdsId03,300,beijing

orderId03,1573483408000,gdsId04,400,beijing

orderId07,1573483600000,gdsId01,600,beijing //触发

最终得到的结果:

==area===beijing

Order(orderId03,1573483408000,gdsId03,300.0,beijing)

Order(orderId03,1573483408000,gdsId04,400.0,beijing)

Order(orderId02,1573483405000,gdsId01,500.0,beijing)

总结

到此为止实现了窗口topN功能,我认为比较重要的点就是如何获取窗口的聚合数据并排序,获取窗口的聚合结果就是在后面再接一个相同的窗口,数据排序类似使用最小堆机制。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink实战: 窗口TopN分析与实现 的相关文章

  • 【Flink系列1】flink与spark的区别

    Flink简介 spark基本架构 flink基本架构 Spark提出的最主要抽象概念是弹性分布式数据集 RDD flink支持增量迭代计算 基于流执行引擎 Flink提供了诸多更高抽象层的API以方便用户编写分布式任务 1 DataSet
  • flink中idea配置pom.xml

  • Flink之IntervalJoin介绍

    InterValJoin算子 间隔流 一条流去join另一条流去过去一段时间内的数据 该算子将keyedStream与keyedStream转化为DataStream 再给定的时间边界内 默认包含边界 相当于一个窗口 按指定的key对俩个K
  • 大数据技术Flink详解

    一 有状态的流式处理 Apache Flink 是一个分布式流处理器 具有直观和富有表现力的API 可实现有状态的流处理应用程序 它以容错的方式有效地大规模运行这些应用程序 Flink 于2014 年4 月加入Apache 软件基金会作为孵
  • Caused by: java.lang.NoClassDefFoundError: javax/tools/ToolProvider

    解决方案 在pom文件中的scala maven plugin插件下面加入一个参数 pom xml配置如下
  • Flink CDC(2.0) 如何加速海量数据的实时集成?

    原文 Flink CDC 如何加速海量数据的实时集成 知乎 导读 Flink CDC如何解决海量数据集成的痛点 如何加速海量数据处理 Flink CDC社区如何运营 如何参与社区贡献 今天的介绍会围绕下面四点展开 Flink CDC 技术
  • Macbook Pro 鼠标卡顿问题

    Macbook Pro 鼠标卡顿问题 目前无解 只能改善 该问题最早能追溯到 2015年 https jingyan baidu com article ff42efa93632c5c19e220208 html 原因 据说是无线频段冲突
  • 微众银行DSS部署单机-普通版

    DSS 普通版部署 我的服务器 我的配置 vim conf config sh vim conf db sh QA 我的服务器 centos 7 0 8C16G 100G机械硬盘 我的配置 bashrc文件内容 JDK export JAV
  • Flink Web UI 介绍

    一 提交flink任务到yarn flink run m yarn cluster yn 1 p 2 yjm 1024 ytm 1024 ynm FlinkOnYarnSession MemberLogInfoProducer d c co
  • flink源码阅读---Flink intervalJoin 使用和原理分析

    1 前言 Flink中基于DataStream的join 只能实现在同一个窗口的两个数据流进行join 但是在实际中常常会存在数据乱序或者延时的情况 导致两个流的数据进度不一致 就会出现数据跨窗口的情况 那么数据就无法在同一个窗口内join
  • flink连接kafka报:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic

    报错信息 Caused by org apache flink runtime JobException Recovery is suppressed by NoRestartBackoffTimeStrategy at org apach
  • Flink + Hudi 实现多流拼接(大宽表)

    1 背景 经典场景 Flink 侧实现 业务侧通常会基于实时计算引擎在流上做多个数据源的 JOIN 产出这个宽表 但这种解决方案在实践中面临较多挑战 主要可分为以下两种情况 维表 JOIN 场景挑战 指标数据与维度数据进行关联 其中维度数据
  • Flink常用算子总结

    Streaming 算子 Map 将元素处理转换 再输出 map算子对一个DataStream中的每个元素使用用户自定义的Mapper函数进行处理 每个输入元素对应一个输出元素 最终整个数据流被转换成一个新的DataStream 输出的数据
  • flink学习43:基于行的操作map、flatmap、聚合

    Map FlatMap 聚合
  • Flink on Zeppelin-2

    Flink Interpreter类型 首先介绍下Zeppelin中的Flink Interpreter类型 Zeppelin的Flink Interpreter支持Flink的所有API DataSet DataStream Table
  • 如何在 Flink 1.9 中使用 Hive?

    Flink on Hive 介绍 SQL 是大数据领域中的重要应用场景 为了完善 Flink 的生态 发掘 Flink 在批处理方面的潜力 我们决定增强 FlinkSQL 的功能 从而让用户能够通过 Flink 完成更多的任务 Hive 是
  • Apache Flink Checkpoint 应用实践

    Checkpoint 与 state 的关系 Checkpoint 是从 source 触发到下游所有节点完成的一次全局操作 下图可以有一个对 Checkpoint 的直观感受 红框里面可以看到一共触发了 569K 次 Checkpoint
  • flink学习之state

    state作用 保留当前key的历史状态 state用法 ListState
  • 在JDK17尝鲜Flink1.17

    在JDK17尝鲜Flink1 17 前言 还没玩明白老版本 Flink1 17就来了 总还是要向前看的 根据官网文档 https nightlies apache org flink flink docs release 1 17 docs
  • 大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化

    文章目录 0 前言 1 环境准备 1 1 flink 下载相关 jar 包 1 2 生成 kafka 数据 1 3 开发前的三个小 tip 2 flink sql 客户端编写运行 sql 2 1 创建 kafka 数据源表

随机推荐

  • QT---窗口、按钮的基本设置

    目录 一 窗口相关的设置及中文编译错误设置 1 在源文件widget cpp中进行修改数据 并创建有关界面 2 如遇中文编译错误 即标题中文显示乱码 可如下设置 3 窗口界面及标题设置 窗口是否拉伸 二 创建按钮的相关设置 1 添加头文件
  • mybatis使用resultMap自定义映射处理,处理多对一的映射关系:

    1 级联方式处理
  • UML 中九种图

    UML 中九种图 1 用例图 说明 由参与者 actor 用例 User Case 以及他们之间的关系构成 用来描述系统功能 作用 可视化表达系统需求 更直观 规范 客服纯文字说明不足 图示 2 类图 说明 类 Class 封装了数据和行为
  • 数据结构 线性表的顺序存储和链式存储,以及基本操作、单链表例题

    一 线性表的存储表示 1 顺序表 线性表的顺序表示又称为顺序表 顺序表的静态分配存储表示 线性表的静态分配顺序存储结构 typedef int ElemType typedef struct 顺序表的定义 ElemType elem LIS
  • 2023Go面试问答_Go基础

    与其他语言相比 使用 Go 有什么好处 与其他作为学术实验开始的语言不同 Go代码的设计是务实的 每个功能和语法决策都旨在让程序员的生活更轻松 Golang 针对并发进行了优化 并且在规模上运行良好 由于单一的标准代码格式 Golang 通
  • Android开发中Handler的经典总结

    一 Handler的定义 主要接受子线程发送的数据 并用此数据配合主线程更新UI 解释 当应用程序启动时 Android首先会开启一个主线程 也就是UI线程 主线程为管理界面中的UI控件 进行事件分发 比如说 你要是点击一个 Button
  • 关于Redis配置主从复制遇到的问题,从机连接到主机,主机显示的从机数量仍然为0

    问题 设置单机集群的时候 两台从机都显示连接到主机 但是主机显示连接到的从机数量为0 主机79 从机80 从机81 解决 主库master要求密码验证 因为之前配置了redis的密码 方法一 建议 在配置文件中将requirepass注释掉
  • 云计算常用命令

    云计算IAAS篇 mysql篇 mysql uroot p000000 使用root账号登录mysql use mysql 切换到mysql层 show tables 查询mysql数据库列表 select from mysql user
  • 记一次阿里云黑客攻击事件

    这几天服务器一直发生异常行为 阿里云报警如下 根据执行命令 bin sh c curl fsSL http 165 225 157 157 8000 i sh sh 可知道 后台某个进程一直从这个美国的IP地址下载sh可执行文件 访问这个地
  • SpringMVC:从入门到精通,7篇系列篇带你全面掌握--四.5分钟搞定文件上传与下载

    Welcome Huihui s Code World 接下来看看由辉辉所写的关于SpringMVC的相关操作吧 需要添加的依赖
  • Android Studio安装及环境配置教程

    前言 首先需要确定好电脑是否有安装java环境 即是否安装有JDK 验证方法 直接电脑桌面win R 输入cmd 然后在黑窗口中分别输入java javac javadoc java version enter键 注意是输入一个指令按一次e
  • 【前端|CSS系列第4篇】面试官:你了解居中布局吗?

    欢迎来到前端CSS系列的第4篇教程 如果你正在寻找一种简单而又强大的前端技术 以使你的网页和应用程序看起来更加专业和美观 那么居中布局绝对是你不能错过的重要知识 在前端开发中 实现居中布局是一项必备技能 无论是垂直居中 水平居中 还是同时实
  • python函数中的可变默认值

    In 27 def f a a append 5 print a In 28 P f 5 In 29 L f 5 5 函数多次调用竟然使用的用一个参数对象 请注意
  • 大数据数据库:MPP vs MapReduce

    这些年大数据概念已经成为IT界的热门 我们经常也会在新闻和报纸中看到 大数据概念中最为关键的技术就是数据库管理系统 伴随着hadoop和MapReduce技术的流行 大数据的数据库中Hive和Spark等新型数据库脱颖而出 而另一个技术流派
  • javafx服务器监控系统,用于服务器端图像生成的JavaFX

    这可能听起来很奇怪 但我想使用JavaFX在服务器端生成我的图表图像 因为JavaFX具有很好的canvas API来执行图像转换连接和定位 特别是我有一个spring MVC服务来生成我的图表作为图像 主要问题是如何从方便的Spring
  • 骚操作:c++如何用goto便捷地写人工栈?

    在如今所有NOI系列赛事已经开全栈的时势下 人工栈已经离我们很远很远 所以这博客就是我弄着玩的 首先我们要清楚的是c 的goto写法 loop goto loop 在运行到goto时 就会跳到对应的标记 标记在goto的前后都可以 然而你试
  • 以太坊的状态树 Merkle Patricia Tree

    Merkle Patricia Tree Merkle树 https www cnblogs com fengzhiwu p 5524324 html Merkle Tree 通常也被称作Hash Tree 顾名思义 就是存储hash值的一
  • 编写python代码需要注意什么,Python学习笔记三,编程时需要注意的常犯错误事项...

    在进入正式学习python编程之前 我们一起来了解一下 在python学习过程需要注意的一些常犯错误的事项 Python运行时默认的输入法 在使用python时 电脑的输入法默认状态一定要调整为英文状态 除了在输入汉字的时候将输入法调整为中
  • 简单几句话总结Unicode,UTF-8和UTF-16

    概念 先说一说基本的概念 这包括什么是Unicode 什么是UTF 8 什么是UTF 16 Unicode UTF 8 UTF 16完整的说明请参考Wiki Unicode UTF 8 UTF 16 用比较简单的话来说就是 Unicode定
  • Flink实战: 窗口TopN分析与实现

    TopN 的需求场景不管是在离线计算还是实时计算都是比较常见的 例如电商中计算热门销售商品 广告计算中点击数前N的广告 搜索中计算搜索次数前N的搜索词 topN又分为全局topN 分组topN 比喻说热门销售商品可以直接按照各个商品的销售总