Flink学习20:算子介绍reduce

2023-11-06

1.reduce简介

按照指定的方式,把每个元素进行累计执行。比如实现累加计算

 

示例:

import keyByNameTest.StockPrice
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object reduceTest {

  //defined the dataSource's data type
  case class StockPrice(stockId:String, timestamp: Long, price:Double)

  def main(args: Array[String]): Unit = {

    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //create ds

    val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))

    val ds = env.fromCollection(pricesList)

    //transformation

//update the stock's new time, and accumulate the price
    val reducedDs = ds.keyBy(0).reduce((t1, t2) => StockPrice(t1.stockId, t2.timestamp, t1.price + t2.price))

    reducedDs.print()

    env.execute()

  }

}


输出结果:

自定义reduce func

核心步骤:

1.继承 ReduceFunction 类

2.重写reduce 方法

示例:

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment


object myReduceTest {


  //defined the dataSource's data type
  case class StockPrice(stockId:String, timestamp: Long, price:Double)

  //define my reduce func
  update the stock's new time, and accumulate the price
  class MyReduceFunc extends ReduceFunction[StockPrice] {
    override def reduce(t: StockPrice, t1: StockPrice): StockPrice = {

      //update the stock's new time, and accumulate the price
      StockPrice(t.stockId, t1.timestamp, t.price + t1.price)

    }
  }

  def main(args: Array[String]): Unit = {

    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //create ds
    val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))

    val ds = env.fromCollection(pricesList)

    //transformation
    val keyByedDs = ds.keyBy(0)

      //use my reduce func
    val myReducedDs = keyByedDs.reduce(new MyReduceFunc)

    myReducedDs.print()

    env.execute()
  }



}

输出结果:

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink学习20:算子介绍reduce 的相关文章

  • 在职状态下继续学习的心得体会

    本来平时记录的都是一些技术点的学习和使用 今天打算记录一下学习方法 当然不一定适合所有人 因人而异 仅供参考 学习这件事 对于IT行业来说 真的是活到老学到老 技术的更新迭代速度非常快 而且总是有那么一些公司特别的卷 没办法 改变不了外因
  • 为什么这么多人自学黑客,但没过多久就放弃了(掌握正确的网络安全学习路线很重要)

    网络安全是一个 不断发展和演变 的领域 以下是一个 网络安全学习路线规划 旨在帮助初学者快速入门和提高自己的技能 基础知识 网络安全的 基础知识 包括 网络结构 操作系统 编程语言 等方面的知识 学习这些基础知识对理解网络安全的原理和技术至
  • 网络安全从入门到精通(超详细)学习路线

    首先看一下学网络安全有什么好处 1 可以学习计算机方面的知识 在正式学习网络安全之前是一定要学习计算机基础知识的 只要把网络安全认真的学透了 那么计算机基础知识是没有任何问题的 操作系统 网络架构 网站容器 数据库 前端后端等等 可以说不想
  • 机器智能与人类智能的合作:认知能力的提升

    1 背景介绍 在过去的几十年里 人工智能 AI 技术的发展取得了显著的进展 从早期的规则引擎和专家系统到现代的深度学习和神经网络 AI已经成功地解决了许多复杂的问题 然而 尽管如此 人工智能仍然远远低于人类智能 人类智能的强大之处在于其认知
  • AI大模型应用入门实战与进阶:深入理解Transformer架构

    1 背景介绍 自从2017年的 Attention is All You Need 一文发表以来 Transformer架构已经成为自然语言处理 NLP 领域的主流模型 这篇文章将深入探讨Transformer架构的核心概念 算法原理以及实
  • 机器学习与人类智能的融合:未来趋势与挑战

    1 背景介绍 人工智能 Artificial Intelligence AI 是指一种以计算机程序为代表的智能方法 可以理解 学习和应用人类智能的某些方面 机器学习 Machine Learning ML 是人工智能的一个子领域 它涉及到计
  • AI大模型应用入门实战与进阶:如何训练自己的AI模型

    1 背景介绍 人工智能 Artificial Intelligence AI 是计算机科学的一个分支 旨在模拟人类智能的能力 包括学习 理解自然语言 识别图像和视频 进行决策等 随着数据量的增加和计算能力的提升 人工智能技术的发展得到了巨大
  • 慢思维的力量:如何解决复杂问题

    1 背景介绍 在当今的快速发展和竞争激烈的环境中 我们需要更有效地解决复杂问题 这需要我们具备一种称为慢思维的思考方式 它可以帮助我们更好地理解问题 制定更好的解决方案 本文将介绍慢思维的核心概念 算法原理 具体操作步骤以及数学模型公式 并
  • AI大模型应用入门实战与进阶:Part 7 Transformer模型解析

    1 背景介绍 自从2020年的大模型如GPT 3等开始引起广泛关注 人工智能领域的研究和应用得到了重大推动 在这一波技术创新中 Transformer模型发挥着关键作用 这篇文章将深入探讨Transformer模型的核心概念 算法原理和实例
  • 机器学习中的知识共享:模型与数据的交流与协作

    1 背景介绍 机器学习 Machine Learning 是一种通过数据学习模式和规律的计算机科学领域 在过去的几年里 机器学习技术在各个领域得到了广泛应用 如图像识别 自然语言处理 推荐系统等 随着数据规模的不断增长 单个机器学习模型的复
  • 【CTF必看】从零开始的CTF学习路线(超详细),让你从小白进阶成大神!

    最近很多朋友在后台私信我 问应该怎么入门CTF 个人认为入门CTF之前大家应该先了解到底 什么是CTF 而你 学CTF的目的又到底是什么 其次便是最好具备相应的编程能力 若是完全不具备这些能力极有可能直接被劝退 毕竟比赛的时候动不动写个脚本
  • 扬帆证券:产业化破题在即 人形机器人超预期演进

    大模型助力下的拐点 特斯拉A股产业链上 两笔重磅出资几乎一起现身 总规划超百亿元 1月4日 拓普集团公告 与宁波经济技能开发区办理委员会签署了 机器人电驱系统研发生产基地项目出资协议书 公司拟出资50亿元 建设机器人核心部件生产基地 此次出
  • SRC漏洞挖掘经验+技巧篇

    一 漏洞挖掘的前期 信息收集 虽然是前期 但是却是我认为最重要的一部分 很多人挖洞的时候说不知道如何入手 其实挖洞就是信息收集 常规owasp top 10 逻辑漏洞 重要的可能就是思路猥琐一点 这些漏洞的测试方法本身不是特别复杂 一般混迹
  • 电商数据api拼多多接口获取商品实时数据价格比价api代码演示案例

    拼多多商品详情接口 接口接入入口 它的主要功能是允许卖家从自己的系统中快速获取商品详细信息 通过这个接口 卖家可以提取到商品的各类数据 包括但不限于商品标题 价格 优惠价 收藏数 下单人数 月销售量等 此外 还可以获取到商品的SKU图 详情
  • 扬帆证券:突发利好!外资重大转变,A股收到多份喜报

    A股财报季 利好音讯密集传来 1月16日晚间 A股多家上市公司披露了成绩预告 其间成绩预增 扭亏等利好公告数量占比超80 其间 普瑞眼科公告 估计2023年净赢利同比添加高达1163 98 1285 51 别的 多家上市公司公告称 估计20
  • 渗透测试常用工具汇总_渗透测试实战

    1 Wireshark Wireshark 前称Ethereal 是一个网络分包分析软件 是世界上使用最多的网络协议分析器 Wireshark 兼容所有主要的操作系统 如 Windows Linux macOS 和 Solaris kali
  • 独家 | 鸿蒙(HarmonyOS)开发详细学习笔记免费分享

    前言 华为宣布 将在1月18日 在北京 上海 杭州 南京 成都 厦门 武汉 长沙 8 大城市同时召开大会 届时将揭秘鸿蒙生态和 HarmonyOS NEXT 进阶新篇章 简单的来说就是 纯血鸿蒙系统 即将彻底揭晓 鸿蒙系统自推出来以来 就一
  • ESM10A 消除对单独 PLC 的需求

    ESM10A 消除对单独 PLC 的需求 ESM10A 可以消除对单独 PLC 的需求 该程序是在 PC 上开发的 然后使用免费提供的简单易用的 EzSQ 软件下载到逆变器 似乎这些改进还不够 日立还在 SJ700 中添加了其他新功能 例如
  • 手把手教你使用HarmonyOS本地模拟器

    我们通过下面的动图来回顾下手机本地模拟器的使用效果 本期 我们将为大家介绍HarmonyOS本地模拟器的版本演进 并手把手教大家使用HarmonyOS本地模拟器 一 本地模拟器的版本演进 2021年12月31日 经过一个版本的迭代优化 随D
  • 【js学习之路】遍历数组api之 `filter `和 `map`的区别

    一 前言 数组是我们在项目中经常使用的数据类型 今天我们主要简述作用于遍历数组的api filter 和 map 的区别 二 filter和map的共同点 首先 我们主要阐述一下 filter 和 map 的共同点 api的参数都是回调函数

随机推荐

  • QWidget、QDialog及QMainWindow的区别与联系

    QWidget类是所有用户界面对象的基类 QMainWindow和QDialog都是QWidget的子类 一般来说 如果需要嵌入到其他窗体中 则基于QWidget创建 如果是顶级对话框 则基于QDialog创建 如果是主窗体 则基于QMai
  • H5如何直接跳转小程序?

    1 云开发方式 不推荐 不推荐理由 1 要钱 2 麻烦 需要兼容 参考链接 https developers weixin qq com miniprogram dev wxcloud guide staticstorage jump mi
  • 稀疏重构算法详解

    引入 在室内环境中 多径信号具有天然的空间稀疏性 根据压缩感知理论可知 如果信号是可压缩的或者在某个变换域是稀疏的 可以采用一个随机测量矩阵将高维信号映射到一个低维空间上 通过求解优化问题 以很高的概率重构出原始信号 因此 在该理论框架下
  • 带分数

    标题 带分数 100 可以表示为带分数的形式 100 3 69258 714 还可以表示为 100 82 3546 197 注意特征 带分数中 数字1 9分别出现且只出现一次 不包含0 类似这样的带分数 100 有 11 种表示法 题目要求
  • 小心踩雷,一次Java内存泄漏排查实战

    问题出现 晚上七点多开始 我就开始不停地收到报警邮件 邮件显示探测的几个接口有超时情况 多数执行栈都在 java io BufferedReader readLine BufferReader java 389 java io Buffer
  • c++ 变量常量指针练习题

    Q1 在win32 x86模式下 int p int pp double q 请说明p pp q各占几个字节的内存单元 p 占 4 个字节 pp 占 4 个字节 q 占 4 个字节 Q2常量1 1 0 1 的数据类型是什么 1 是 整形 i
  • YOLOv7中的数据集处理【代码分析】

    本文章主要是针对yolov7中数据集处理部分代码进行解析 和yolov5是一样的 也是可以更好的理解训练中送入的数据集到底是什么样子的 数据集的处理离不开两个类 一个是Dataset from torch utils data import
  • python3 -sorted函数 对所有可迭代的对象进行排序操作 sorted(corr_list,key=lambda x: -abs(x[0]))

    sorted 函数对所有可迭代的对象进行排序操作 返回重新排序的列表 sort 与 sorted 区别 sort 是应用在 list 上的方法 sorted 可以对所有可迭代的对象进行排序操 作 list 的 sort 方法返回的是对已经存
  • linux export 的作用

    功能说明 设置或显示环境变量 语 法 export fnp 变量名称 变量设置值 补充说明 在shell中执行程序时 shell会提供一组环境变量 export可新增 修改或删除环境变量 供后续执行的程序使用 export的效力仅及于该此登
  • 我在腾讯做测10年,总结的7条生存经验

    简单做个自我介绍 我是一名测试工程师 从15年毕业到现在工作了6年 一路走过来 觉得自己很幸运遇到了很多伯乐 教会了我很多道理和职场经验 也非常荣幸在阿里工作过4年 搭建过蚂蚁金服的platuo测试框架 thrift测试框架 自动化测试平台
  • React源码分析(一)=> scheduler分析

    文章目录 1 前言 2 getCurrentTime 3 unstable scheduleCallback函数 4 scheduleHostCallbackIfNeeded 4 1 flushWork 4 2 flushFirstCall
  • 学习笔记实操手册

    https note youdao com s KP25iMDf https note youdao com s GAmVO7V 使用yum安装php72 https www cnblogs com JahanGu p 10439472 h
  • 编写一个使用指针的C函数,交换数组a和数组b的对应元素

    编写一个使用指针的C函数 交换数组a和数组b的对应元素 int a 5 1 2 3 4 5 int b 5 10 20 30 40 50 输出格式要求 a d 2d b d 2d 程序运行示例如下 a 0 10 a 1 20 a 2 30
  • QT应用部署流程

    参考链接 https www shuzhiduo com A LPdo07AGz3 1 Windows系统 Windows下使用QT自带工具windeployqt exe部署 windows gt command 切换到QT的工具目录 在c
  • signature=a195252fc5196d0fb82cccccc68b06b3,Gene signatures in wound tissue as evidenced by molecular...

    Wound induction in the chicken CAM Chick embryos were cultured for 10 days and CAMs were inflicted by parallel scalpel s
  • linux 数组里面是json,将JSON解析为shell脚本中的数组

    小编典典 如果您确实无法使用适当的JSON解析器 例如 1 请尝试 基于的解决方案 jq awk Bash 4 x readarray t values lt 3 print 4 myfile json Bash 3 x IFS n rea
  • lua 3.0 中 普通方法延时

    local delayTime cc DelayTime create 1 local callFunND cc CallFunc create function self pushjoystick end local seq cc Seq
  • 微信企业付款至零钱,状态处理中,status=PROCESSING的解决办法

    前段时间腾讯因为支付系统异常 更新了一些东西 然后就开始出现了这个问题 时不时的就会有一个两个状态为 处理中 的交易 但文档中并没有给出解决办法 尝试咨询了客服 给出了两个解决方案 1 把该笔交易当做失败处理 但以后这笔订单就不要再去折腾它
  • ESP8266 RTOS SDK 移植 u8g2 移植代码

    LED屏驱动ssd1306 屏幕128x64大小 1 移植代码核心 方法1 port c define SCL Pin GPIO SCL define SDA Pin GPIO SDA void delay us uint32 t time
  • Flink学习20:算子介绍reduce

    1 reduce简介 按照指定的方式 把每个元素进行累计执行 比如实现累加计算 示例 import keyByNameTest StockPrice import org apache flink api scala createTypeI