Flink 水位线结合窗口进行KeyBy、Reduce案例

2023-05-16

水位线(Watermark)和窗口(Window) 

Watermark

在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。

 

Window

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。在 Flink 中, 窗口就是用来处理无界流的核心

import java.time.Duration

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


case class User(name: String, money: Double, time: Long)

object StreamWindow {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val strategy: WatermarkStrategy[User] = WatermarkStrategy.forBoundedOutOfOrderness[User](Duration.ofSeconds(5))
      .withTimestampAssigner(new SerializableTimestampAssigner[User] {
        override def extractTimestamp(element: User, recordTimestamp: Long): Long = element.time
      })

    val dataStream: DataStream[User] = env.socketTextStream("localhost", 7777)
      .map((data: String) => {
        val arr: Array[String] = data.split(",")
        User(arr(0), arr(1).toDouble, arr(2).toLong)
      })
      .assignTimestampsAndWatermarks(strategy)

    val resultStream: DataStream[(String, Double)] = dataStream.map((data: User) => (data.name, data.money))
      .keyBy((_: (String, Double))._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .reduce((x: (String, Double), y: (String, Double)) => (x._1, y._2 + x._2))
    resultStream.print()

    env.execute("stream window")
  }
}

基本处理函数(ProcessFunction)

处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。我们知道在Flink 中几乎所有转换算子都提供了对应的函数类接口,处理函数也不例外;它所对应的函数类,就叫作 ProcessFunction。ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。

package com.atguigu.chapter02

import java.time.Duration
import java.util.Calendar

import com.atguigu.chapter05.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


case class Event(user: String, url: String, timestamp: Long)

object StreamWordCount {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val parameterTool: ParameterTool = ParameterTool.fromArgs(args)
    val hostname: String = parameterTool.get("host")
    val port: Int = parameterTool.getInt("port")
    val lineDataStream: DataStream[String] = env.socketTextStream(hostname, port)


    val stream: DataStream[Event] = lineDataStream.map((data: String) => {
      val fields: Array[String] = data.split(",")
      Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
    })

    stream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
      .withTimestampAssigner(
        new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }
      ))
      .keyBy((_: Event).user)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(new WatermarkWindowResult)
      .print()


    env.execute()
  }

  class WatermarkWindowResult extends ProcessWindowFunction[Event, String, String, TimeWindow] {
    override def process(user: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
  
      val start: Long = context.window.getStart
      val end: Long = context.window.getEnd
      val count: Int = elements.size

      val currentWatermark: Long = context.currentWatermark

      out.collect(s"窗口 $start ~ $end , 用户 $user 的活跃度为:$count, 水位线现在位于:$currentWatermark")
    }
  }

}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 水位线结合窗口进行KeyBy、Reduce案例 的相关文章

  • linux下 bash-completion 离线安装(Ubuntu或centos )

    bash completion 安装 实现k8s命令自动补全 xff0c 我们需要安装bash completion 在github下载离线包 下载地址解压 tar xvJf bash completion 2 11 tar xz 命令补全
  • ROS自定义地图(CAD、手绘等)

    0x00 概述 在前面的文章中 xff0c 我们介绍如何自动导航时 xff0c 都是基于使用gmapping或者hector mapping创建的地图 当然使用其他的建图方法创建的地图也可以 xff0c 但是目前为止 xff0c 无论使用哪
  • STM32 控制蜂鸣器播放音乐的原理和实例

    STM32 控制蜂鸣器播放音乐的原理和实例 本文通过将乐谱里的每个音符的声音频率和声音时长保存在两个数组里面 1 使用通用定时器TIM4实现无中断的微秒级延时函数 xff0c 控制每个音符的发声时长 2 使用系统滴答时钟Systick实现带
  • 影响力最大化——CELF算法的简介与python实现

    CELF算法是Leskovecl等人利用IC模型的子模特性对爬山贪心算法进一步改进得到的优化算法 子模函数的定义为 任意函数f 将有限集合映射为非负实数集并且满足收益递减特性即为子模函数 设集合s T xff0c 任意元素v添加到集合S中获
  • Qos队列调度算法(SP/WRR/DWRR)

    本文重点分析sonic中支持的三种Qos队列调度算法 xff1a 1 SP xff08 Strict Priority xff0c 严格优先级 xff09 也称为PQ xff08 Priority Queuing xff09 调度 xff0
  • python的MapReduce的应用案例

    在学习这个项目中用到许多数学公式 xff0c 有的自己不太懂 xff0c 所以上传上来进行实地应用 参考资料 generate train feature map py usr bin env python encoding 61 UTF
  • 索赔649亿!GitHub Copilot惹上官司,被指控侵犯代码版权,是开源社区“寄生虫”...

    大数据文摘授权转载自AI前线 整理 xff1a 刘燕 xff0c 核子可乐 一位 20 年老开源程序员 xff1a GitHub Copilot 就是开源社区的 寄生虫 GitHub 面临集体起诉 xff0c 索赔 647 亿 GitHub
  • SDN网络技术:OpenFlow协议(1)

    本文首发于我的公众号码农之屋 xff08 id Spider1818 xff09 xff0c 专注于干货分享 xff0c 包含但不限于Java编程 网络技术 Linux内核及实操 容器技术等 欢迎大家关注 xff0c 二维码文末可以扫 导读
  • Ubuntu、debian安装图形界面,输入法,解决远程桌面卡顿问题

    安装图形界面 tasksel选择安装Ubuntu Desktopapt get install xrdp tigervnc standalone server安装远程接入systemctl start xrdpsystemctl enabl
  • JS 异步 ( 一、异步概念、Web worker 基本使用 )

    相关阅读 xff1a JS 异步 一 异步概念 Web worker 基本使用 JS 异步 二 Promise 的用法 手写模拟 Promise JS 异步 三 generator 的用法 async await 的用法 文章目录 异步异步
  • eve-ng 自定义linux镜像

    文章目录 1 创建目录2 上传镜像并改名3 创建虚拟磁盘qcow24 登录eve网页5 查找lab UUID和虚拟机编号6 将系统提交成模板7 压缩镜像 xff08 可选 xff09 1 创建目录 root 64 eve ng opt un
  • 百度地图Marker的定位和方向

    原文 xff1a http bbs lbsyun baidu com forum php mod 61 viewthread amp tid 61 83704 今天做百度地图需要在显示很多车辆的位置信息 并显示车辆的角度和行驶方向 需要用到
  • ELFhash - 优秀的字符串哈希算法

    1 字符串哈希 xff1a 我们先从字符串哈希说起 在很多的情况下 xff0c 我们有可能会获得大量的字符串 xff0c 每个字符串有可能重复也有可能不重复 C不像Python有字典类型的数据结构 xff0c 我们没有办法吧字符串当做是键值
  • 详解TensorFlow数据读取机制(附代码)

    在学习TensorFlow的过程中 xff0c 有很多小伙伴反映读取数据这一块很难理解 确实这一块官方的教程比较简略 xff0c 网上也找不到什么合适的学习材料 今天这篇文章就以图片的形式 xff0c 用最简单的语言 xff0c 为大家详细
  • Linux下安装boa服务器遇到的问题

    最近在CentOS7机器上安装boa服务器的时候 xff0c 遇到了不少问题 xff0c 在这里记录一下 1 从官网下载最新 boa源码包 xff0c 网址 xff1a http www boa org xff1b 2 解压 xff0c 进
  • 【linux】查看Linux系统版本信息的几种方法

    一 查看Linux内核版本命令 xff08 两种方法 xff09 xff1a 1 cat proc version 2 uname a 二 查看Linux系统版本的命令 xff08 3种方法 xff09 xff1a 1 lsb releas
  • 如何使用Python为Hadoop编写一个简单的MapReduce程序

    转载自 xff1a http asfr blogbus com logs 44208067 html 在这个实例中 xff0c 我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的 MapReduce 程序 尽管 Hadoo
  • Android Gradle 7.x新版本的依赖结构变化

    版本的小蜜蜂 小海豚 电鳗版本的Android Studio新建工程的依赖结构和之前的发生了变化 xff0c 主要有 xff1a 原来在工程build gradle中的buildscript和allprojects xff0c 移动至set
  • C#:如何用VS开启人生中第一个Windows窗体应用程序(Winform)?

    摘要 xff1a Windows窗体应用程序 xff08 Winform xff0c 下文以此指代 xff09 既能有效 直观地设计Windows窗体界面 xff0c 又支持内部逻辑的编写 那么 xff0c 对于C 初学者来说 xff0c
  • BootLoader & Grub详解

    BootLoader amp Grub详解 xff08 补记 xff09 2008 8 2 星期日 凉爽 补记 xff1a 2010 xff0d 04 xff0d 21 时隔两年 xff0c 会过头来重新看了一下 xff0c 发现GRUB的

随机推荐

  • 签名问题:EXPKEYSIG F42ED6FBAB17C654 Open Robotics <info@osrfoundation.org>

    sudo apt key adv keyserver keyserver ubuntu com recv keys F42ED6FBAB17C654 代码如上 xff0c 更换签名
  • Python,gnuplot,libsvm配置详细步骤

    1 下载Python xff0c gnuplot以及libsvm 我的电脑是64位 xff0c Win7操作系统 1 1 python 2 7 6 64位 这里我用的Python是64位的Python2 7 6 下载地址 xff1a htt
  • C++中assert函数的用法介绍

    assert宏的原型定义在 lt assert h gt 中 xff0c 其作用是如果它的条件返回错误 xff0c 则终止程序执行 xff0c 原型定义 xff1a inclide lt assert h gt void assert in
  • C++中stdlib.h头文件介绍

    stdlib头文件即standard library标准库头文件 xff0c stdlib头文件里包含了C C 43 43 语言的最常用的系统函数 xff0c 该文件包含了C语言标准库函数的定义 xff0c stdlib h中定义了物种类型
  • 蛋白质性质和结构分析

    原文链接 第七章 蛋白质性质和结构分析 传统的生物学认为 xff0c 蛋白质的序列决定了它的三维结构 xff0c 也就决定了它的功能 由于用X光晶体衍射和NMR核磁共振技术测定蛋白质的三维结构 xff0c 以及用生化方法研究蛋白质的功能效率
  • Libsvm网格参数寻优教程

    原文 xff1a http endual iteye com blog 1262010 首先下载Libsvm Python和Gnuplot xff1a l libsvm的主页http www csie ntu edu tw cjlin li
  • 打井问题

    在偏远的山区 xff0c 水资源很稀缺 xff0c 因此 xff0c 我们问每个山区进行打井工程 xff0c 在不同的地方打了N口井 xff0c 现在我们要在这N口井之间修建管道 xff0c 要使得这些井都能连通 xff0c 同时所使用的管
  • C语言结构体的初始化

    C primer Plus第五版 第14章结构和其他数据形式 1 结构声明 结构声明 xff08 structure declaration xff09 是描述结构体如何组合的主要方法 xff0c 声明就像下面这样 xff1a struct
  • 【Unix编程】文件处理函数

    文件处理函数 xff1a http www iteedu com os linux linuxprgm linuxcfunctions file fcntl php 1 close xff08 关闭文件 xff09 相关函数 open xf
  • ubuntu安装vnc踩的坑

    较新版本的ubuntu 安装vnc 1 搜索setting 把里面的sharing的权限都打开 2 试一下sudo apt get install vnc4server 或者sudo apt y install vnc4server 3 如
  • ElasticSearch 7.6中遇到的一些坑

    一 限制单个index在单个节点上的总shard数 index routing allocation total shards per node 一般在冷热分离的场景种 xff0c 冷数据会设置副本 xff0c 热数据为了保证写入速度 xf
  • 大数据部门组织结构

    平台团队 运维团队 运维工程师最基本的职责都是负责服务的稳定性 xff0c 确保服务可以7 24H不间断地为用户提供服务 xff0c 负责维护并确保整个服务的高可用性 xff0c 同时不断优化系统架构提升部署效率 优化资源利用率 xff1b
  • Hadoop HDFS 副本机制

    Data Replication HDFS is designed to reliably store very large files across machines in a large cluster It stores each f
  • Apache Spark 3.0:全新功能知多少

    Spark3 0解决了超过3400个JIRAs xff0c 历时一年多 xff0c 是整个社区集体智慧的成果 Spark SQL和 Spark Cores是其中的核心模块 xff0c 其余模块如PySpark等模块均是建立在两者之上 Spa
  • Spark优化篇:动态内存管理

    Spark内存管理分为静态内存管理和统一内存管理 xff0c Spark1 6之前使用的是静态内存管理 xff0c Spark1 6之后的版本默认使用的是统一内存管理 动态内存机制图 xff1a 内存估算 xff1a Other Memor
  • Spark优化篇:RBO/CBO

    在Spark1 0中所有的Catalyst Optimizer都是基于规则 rule 优化的 为了产生比较好的查询规 则 xff0c 优化器需要理解数据的特性 xff0c 于是在Spark2 0中引入了基于代价的优化器 xff08 cost
  • Spark优化篇:数据倾斜解决

    数据倾斜是指我们在并行进行数据处理的时候 xff0c 由于数据散列引起Spark的单个Partition的分布不均 xff0c 导致大量的数据集中分布到一台或者几台计算节点上 xff0c 导致处理速度远低于平均计算速度 xff0c 从而拖延
  • Apache Flink 作业图 JobGraph 与执行图 ExecutionGraph

    由 Flink 程序直接映射成的数据流图 xff08 dataflow graph xff09 xff0c 也被称为逻辑流图 xff08 logical StreamGraph xff09 到具体执行环节时 xff0c Flink 需要进一
  • Apache Flink 任务 Tasks 和任务槽 Task Slots

    目录 任务槽 xff08 Task Slots xff09 任务槽数量的设置 任务对任务槽的共享 任务槽和并行度的关系 任务槽 xff08 Task Slots xff09 Flink 中每一个 worker 也就是 TaskManager
  • Flink 水位线结合窗口进行KeyBy、Reduce案例

    水位线 xff08 Watermark xff09 和窗口 xff08 Window xff09 Watermark 在事件时间语义下 xff0c 我们不依赖系统时间 xff0c 而是基于数据自带的时间戳去定义了一个时钟 xff0c 用来表