Flink学习27:驱逐器

2023-11-18

 

 

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
import org.apache.flink.util.Collector
import trigger.StockPrice

import java.{lang, util}
import java.time.Duration
import java.util.Properties

object evictor {


  def main(args: Array[String]): Unit = {

    //env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //set parallel
    env.setParallelism(1)

    //set event time
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //creat ds

    //for kafka connect
    val kafkaProps = new Properties()

    //set ip
    kafkaProps.setProperty("bootstrap.servers", "10.10.10.10:9092")

    //group
    kafkaProps.setProperty("group.id", "gksk-bigdata")

    val kafkaSource = new FlinkKafkaConsumer[String]("stockPrice", new SimpleStringSchema, kafkaProps)

    //set offset
    kafkaSource.setStartFromEarliest()

    //auto commit offset
    kafkaSource.setCommitOffsetsOnCheckpoints(true)

    //band datasource
    val ds = env.addSource(kafkaSource)

    //trans
    val stockPriceDS = ds.map(s => s.split(","))
      .map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))

    //water mark
    val sumedDS = stockPriceDS.assignTimestampsAndWatermarks(

      //set water mark 0
      WatermarkStrategy
        .forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0))
        .withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
          override def extractTimestamp(t: StockPrice, l: Long): Long = t.timeStamp
        })
    ).keyBy(s => s.stockId)
      .timeWindow(Time.seconds(2))

      //evictor
      .evictor(new MyEvictor())
      .process(new MyProcessWindowsFunc())

    //print
    sumedDS.print()

    //execute
    env.execute()


  }


  class MyEvictor() extends Evictor[StockPrice,TimeWindow](){
    override def evictBefore(iterable: lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {

      //create the object, which contain (dataObject, timeStamp)
      val ite: util.Iterator[TimestampedValue[StockPrice]] = iterable.iterator()

      //
      while (ite.hasNext){

        //get the dataSource's object
        val element: TimestampedValue[StockPrice] = ite.next()

        println("now the stockPrice is :"+ element.getValue().price)

        //remove illegal data
        if(element.getValue.price <= 0){
          println("illegal price :"+ element.getValue.price)

          ite.remove()
        }

      }
    }

    override def evictAfter(iterable: lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
      //do nothing
    }
  }


  //get each stock's avg price
  class MyProcessWindowsFunc() extends ProcessWindowFunction[StockPrice, (String,Double), String, TimeWindow]() {

    //process Function will be called, while timeWindow is close,
    // (keybyed stream: each group once )
    //so,if data is big,it's inappropriate to use Process func
    override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {

      //ps: all timeWindow's data will save to iterable,that is big deal

      //sum
      var sumPrice = 0.0
      elements.foreach(s => {
        sumPrice = sumPrice + s.price
      })

      //stock price's avg
      out.collect(key, sumPrice/elements.size)

    }
  }





}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink学习27:驱逐器 的相关文章

  • 实时获取建材网商品数据:API实现详解与代码示例

    一 引言 随着电子商务的快速发展 实时获取商品数据对于企业决策 市场分析以及数据驱动的营销策略至关重要 建材网作为国内知名的建材信息平台 提供了API接口 使得第三方开发者可以方便地获取商品数据 本文将详细介绍如何使用 建材网的API接口
  • 为什么这么多人自学黑客,但没过多久就放弃了(掌握正确的网络安全学习路线很重要)

    网络安全是一个 不断发展和演变 的领域 以下是一个 网络安全学习路线规划 旨在帮助初学者快速入门和提高自己的技能 基础知识 网络安全的 基础知识 包括 网络结构 操作系统 编程语言 等方面的知识 学习这些基础知识对理解网络安全的原理和技术至
  • 从外卖员到程序员,自学3年终于转行成功,三面“拿下”拼多多

    前言 先来自我介绍 老家农村 家里好不容易把我送到大城市读书 大学非985 211 但在我们老家 能出一个本科大学生也是非常不容易的 因为农村信息的相对闭塞 我对大学专业一无所知 加上分数并非前茅 最后被调剂一个我并不喜欢的专业 这里就不透
  • 神州信息一表通监管合规系统

    什么是 一表通 国家金融监督管理总局为进一步建立健全数据统计监管体系 规范数据报送指标体系 明确检测数据规则 而推行建立的一套新体系监管报送方式 提升校验准确性和信息安全性 近期 国家金融监督管理总局更是进一步加大推动 一表通 的实行试点范
  • 心灵与计算机:解密情感处理

    1 背景介绍 情感处理是人工智能领域中一个重要的研究方向 它旨在使计算机能理解 识别和处理人类的情感 情感处理的主要应用包括情感分析 情感识别 情感挖掘等 随着大数据 深度学习和自然语言处理等技术的发展 情感处理技术已经取得了显著的进展 然
  • AI大模型应用入门实战与进阶:从AI模型应用到商业转化

    1 背景介绍 人工智能 AI 已经成为当今世界最热门的技术话题之一 其在各个领域的应用也不断拓展 大型AI模型是人工智能领域的核心 它们在自然语言处理 图像识别 语音识别等方面的表现力和性能都有着重要的作用 然而 如何将这些大型AI模型应用
  • 心灵与大脑的沟通:如何让大脑更好地理解我们的情感

    1 背景介绍 心理学和人工智能之间的界限已经不断模糊化 尤其是在情感智能方面 情感智能是一种新兴的人工智能技术 旨在让计算机更好地理解和回应人类的情感 这篇文章将探讨如何让大脑更好地理解我们的情感 以及在这个过程中涉及的核心概念 算法原理
  • 如何成为一名数据科学家:必须掌握的技能和知识

    1 背景介绍 数据科学家是一种新兴的职业 它结合了计算机科学 统计学 数学和领域知识等多个领域的知识和技能 以解决实际问题 数据科学家的主要任务是收集 清洗 分析和解释大量数据 从中挖掘有价值的信息和知识 并将其应用于决策和预测 数据科学家
  • 机器学习中的知识共享:模型与数据的交流与协作

    1 背景介绍 机器学习 Machine Learning 是一种通过数据学习模式和规律的计算机科学领域 在过去的几年里 机器学习技术在各个领域得到了广泛应用 如图像识别 自然语言处理 推荐系统等 随着数据规模的不断增长 单个机器学习模型的复
  • 如何利用CHAT做简单的总结体会?

    问CHAT 在测试过程中使用appium python自动化的优点和体会 CHAT回复 使用 Appium 配合 Python 进行自动化测试主要有以下几点优点 1 跨平台性 Appium 支持 iOS 和 Android 平台的应用自动化
  • 【计算机毕业设计】精品课程在线学习系统

    如今社会上各行各业 都喜欢用自己行业的专属软件工作 互联网发展到这个时候 人们已经发现离不开了互联网 新技术的产生 往往能解决一些老技术的弊端问题 因为传统精品课程学习信息管理难度大 容错率低 管理人员处理数据费工费时 所以专门为解决这个难
  • 2024年金三银四网络安全考试试题

    2023年金三银四网络安全考试试题 1 关于数据使用说法错误的是 A 在知识分享 案例中如涉及客户网络数据 应取敏感化 不得直接使用 B 在公开场合 公共媒体等谈论 传播或发布客户网络中的数据 需获得客户书面授权或取敏感化 公开渠道获得的除
  • 【VUE毕业设计】基于SSM的在线课堂学习设计与实现(含源码+论文)

    文章目录 1 项目简介 2 实现效果 2 1 界面展示 3 设计方案 3 1 概述 3 2 系统流程 3 3 系统结构设计 4 项目获取
  • 用CHAT写一份标题为职业教育教师教学能力提升培训总结

    CHAT回复 标题 职业教育教师教学能力提升培训总结 一 活动概述 本次由学校组织的职业教育教师教学能力提升培训于8月15日至8月20日顺利进行 来自全校的60位职业教育教师参与了此次培训 主讲人为享有盛名的教育专家马丁先生 二 培训内容与
  • 利用CHAT写实验结论

    问CHAT 通过观察放置在玻璃表面上的单个水滴 人们可以观察到水滴充当成像系统 探究这样一个透镜的放大倍数和分辨率 CHAT回复 实验报告标题 利用玻璃表面的单一水滴观察成像系统的放大倍数和分辨率 一 实验目的 通过对比和测量 研究和探索玻
  • 利用CHAT上传文件的操作

    问CHAT autox js ui 上传框 CHAT回复 上传文件的操作如果是在应用界面中的话 由于Android对于文件权限的限制 你可能不能直接模拟点击选择文件 一般来说有两种常见的解决方案 一种是使用intent来模拟发送一个文件路径
  • 使用企业订货软件的担忧与考虑|网上APP订货系统

    使用企业订货软件的担忧与考虑 网上APP订货系统 网上订货系统担心出现的问题 1 如果在订货系统中定错 多 货物了该怎么办 其实这也是很多人在网购或者是现实中经常会犯的一个错误 但是网上订货平台为大家提供了很多的解决方案 其中对于订单的修改
  • 为什么我强烈推荐大学生打CTF!

    前言 写这个文章是因为我很多粉丝都是学生 经常有人问 感觉大一第一个学期忙忙碌碌的过去了 啥都会一点 但是自己很难系统的学习到整个知识体系 很迷茫 想知道要如何高效学习 这篇文章我主要就围绕两点 减少那些罗里吧嗦的废话 直接上干货 CTF如
  • Cortex-M3与M4权威指南

    处理器类型 所有的ARM Cortex M 处理器是32位的精简指令集处理器 它们有 32位寄存器 32位内部数据路径 32位总线接口 除了32位数据 Cortex M处理器也可以有效地处理器8位和16位数据以及支持许多涉及64位数据的操作
  • 实力认证!鼎捷软件荣膺“领军企业”和“创新产品”两大奖项

    近日 由中国科学院软件研究所 中科软科技股份有限公司联合主办的 2023中国软件技术大会 于北京成功举办 本届大会以 大模型驱动下的软件变革 为主题 数十位来自知名互联网公司和软件巨头企业的技术大咖 不同领域行业专家 畅销书作者等分享嘉宾

随机推荐

  • Manifest合并失败几种原因以及解决方法

    今天遇到了一个报错 Error Execution failed for task app processDebugManifest gt Manifest merger failed with multiple errors see lo
  • c语言合并两个单链表LA和LB,把两个递增的单链表La,Lb,合并成一个递减的单链表Lc...

    原文题是严蔚敏同志的数据结构习题中第二章线性表中提出的问题 原问如下 2 24 假设有两个按元素值递增有序排列的线性表A和B 均以单链表作存储结构 请编写算法将A表与B表归并成一个按元素值递减有序 即非递增有序 允许表中含有值相同的元表 排
  • 基于Vue + vuex + Antd-design-vue实现天气App

    simple weather github 地址 github com WqhForGitHu 效果图 PC端 移动设备端 技术框架 该应用是基于 Vue vuex 实现的 页面的 UI 则是使用了 Antd design vue 库来完成
  • Android 版本统一管理

    前言 因为现在项目都比较模块化 组件化 要用到的model比较多 一个model就有一个build gradle文件 里面都有compileSdkVersion或buildToolsVersion等可能出现版本不一致导致编译出现错误 所以要
  • thinkphp5学习路程 三 数据库操作

    首先我用的是php中文网提供的php工具箱 phpmyadmin管理mysql 在此之前最好对sql语句有所了解 会简单的增删改查等 在里面创建数据库和一张表如下 随后你需要打开数据库的配置文件 目录为 application databa
  • Python OpenCV中的图像阈值处理

    1 前言 上一篇介绍了用C 如何对一幅图像进行阈值处理 本篇接着用python来做同样的事情 图像阈值处理是很多高级算法的底层逻辑之一 比如在做图形检测 轮廓识别时 常常会先对图像进行阈值处理 然后再进行具体的检测或识别 因此很有必要掌握图
  • 指针作函数返回值

    include
  • 指向数组的引用 const char(&p)[a]

    指向数组的引用 const char p a 问题起源 如何在函数内 也能获取数组的大小信息 如果是定义一个数组a后 使用如下方法即可获取大小信息 cout lt lt sizeof a sizeof a 0 但是如果作为一个参数传入到一个
  • 最新酒桌小游戏喝酒小程序源码_带流量主源码下载

    2022最新酒桌小游戏喝酒小程序源码 带流量主 喝酒神器3 6 我修改增加了广告位 根据文档直接替换即可 原版本没有广告位 直接上传源码到开发者端即可 通过后改广告代码 然后关闭广告展示提交 通过后打开即可 下载地址 最新酒桌小游戏喝酒小程
  • 在linux系统下安装配置apache服务器

    我所用的是centos linux系统 但apache的服务在linux系统都大同小异 像ubuntu redhat等等 now let us go 如有问题 欢迎直邮 zhe jiang he hp com lt 何哲江 gt 1 获取软
  • Edge浏览器没有让我失望! 今天终于可以在win10中模拟IE内核进行前端测试了!

    前言 ietest现在是不是不好用了 Edge浏览器仿真是不是不见了 如图 如果我们在前端开发javascript遇见一些老旧的语法标准 想要测试一下都难 想想都抓狂 不过不用担心 经过这几天的资料查阅 我还是找到了一个解决办法来模拟旧版I
  • Set集合中的SortedSet接口下的实现类TreeSet

    放入TreeSet集合中的元素必须实现Comparable接口 不然会报错 因为这个集合中的元素会自动按元素的大小顺序排序 所以不是实现比较的接口就会出现ClassCastException 还要注意一点的是Set集合中的元素是不可重读的
  • ctfshow web入门刷题3

    web15 看提示找到邮箱 然后尝试登入后台 url admin 尝试点击忘记密码然后提示输入城市 尝试用qq搜索qq号 发现城市为西安 得到后台密码 登入得到flag WEB16 题目提示php探针 所以url tz php打开探针然后搜
  • 当你穿越到道诡异仙的世界,如何利用密码学知识区分幻想和现实?

    题解 牛群的能量 题目考察的知识点动态规划题目解答方法的文字分析用 f i 代表以第 i个数结尾的 和最大子群能量值之和 设数组的长度为n 则本题的答案时从0到n 1这n个f 题解 牛牛的名字游戏 题目考察的知识点字符串题目解答方法的文字分
  • TensorFlow在MNIST中的应用-循环神经网络RNN

    参考 1 TensorFlow技术解析与实战 2 https www cnblogs com hellcat p 7401706 html 3 http www jianshu com p 3dbeb3ab9aa3 用TensorFlow搭
  • 如何设计一个麻雀般的微型分布式架构?

    欢迎大家前往腾讯云 社区 获取更多腾讯海量技术实践干货哦 本文由mariolu 发表于云 社区专栏 序言 初衷 设计该系统初衷是基于描绘业务 或机器集群 存储模型 分析代理缓存服务器磁盘存储与回源率的关系 系统意义是在腾讯云成本优化过程中
  • 什么是遗传算法?

    00 目录 遗传算法定义 生物学术语 问题导入 大体实现 具体细节 问题汇总 01 什么是遗传算法 1 1 遗传算法的科学定义 遗传算法 Genetic Algorithm GA 是模拟达尔文生物进化论的自然选择和遗传学机理的生物进化过程的
  • 1186: 零起点学算法93——改革春风吹满地

    Description 改革春风吹满地 不会AC没关系 实在不行回老家 还有一亩三分地 谢谢 乐队奏乐 话说部分学生心态极好 每天就知道游戏 这次考试如此简单的题目 也是云里雾里 而且 还竟然来这么几句打油诗 好呀 老师的责任就是帮你解决问
  • Python进行模糊匹配

    Mr Chen昨天提出了一个问题一起探讨 问题如下 Python库里有fuzzywuzzy和difflib 两个库均可实现词粒度的模糊匹配 同时可设定模糊阈值 实现关键词的提取 地址匹配 语法检查等 针对fuzzywuzzy的process
  • Flink学习27:驱逐器

    import org apache flink api common eventtime SerializableTimestampAssigner WatermarkStrategy import org apache flink api