flink中AggregateFunction 执行步骤以及含义全网详细解释

2023-11-16


package operator;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.omg.PortableInterceptor.INACTIVE;
import org.apache.flink.api.java.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.sql.Timestamp;

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;
// Tuple4<Long,Long,Long,Timestamp>
public class AggregateFunction_2 {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);






        DataStream<Tuple3<String, Integer, Timestamp>> dataStream = env.addSource(new SourceFunction<Tuple3<String, Integer, Timestamp>>() {
            boolean runing = true;
            @Override
            public void run(SourceContext<Tuple3<String, Integer, Timestamp>> ctx) throws Exception {
                //ctx.collect(new Tuple3("user" , 2, new Timestamp(new Date().getTime())));
                int i = 1;
                while (runing) {
                    Tuple3<String, Integer, Timestamp> t3;
                    Thread.sleep(1000);
                    if (i % 2 == 1) { //判断
                        t3 = new Tuple3("user" + 1, 1, new Timestamp(new Date().getTime()));
                    } else {
                        t3 = new Tuple3("user" + i, i, new Timestamp(new Date().getTime()));
                    }
                    //System.out.println("=======");
                    //System.out.println(t3);
                    i = i + 1;
                    ctx.collect(t3);
                    /* 返回
                    user1  1
                    user2  2
                    user1  1
                    user4  4
                    user1  1
                    user6  6
                     */
                }
            }
            @Override
            public void cancel() {
                runing = false;
            }
        });

/*        DataStream dataStream = env.fromElements(
                Tuple3.of("1",333,new Timestamp(new Date().getTime())),
                Tuple3.of("2", 111,new Timestamp(new Date().getTime())),
                Tuple3.of("1",222,new Timestamp(new Date().getTime())),
                Tuple3.of("2",444,new Timestamp(new Date().getTime())),
                Tuple3.of("9",444,new Timestamp(new Date().getTime())),
                Tuple3.of("6", 555,new Timestamp(new Date().getTime())),
                Tuple3.of("1", 555,new Timestamp(new Date().getTime()))
                     )
                ;
        */
        //dataStream.print();


        //  输入类型IN    累加器类型ACC   输出 out
        DataStream  data_aggregate  =dataStream
               // .timeWindowAll(Time.seconds(2))\
                .keyBy(0)     //分组
                .countWindow(2) //2个
               // .sum(1);
               .aggregate(new AggregateFunction<Tuple3<String, Integer, Timestamp>, Tuple3<String, Integer, Timestamp>, Tuple3<String, Integer, Timestamp>>() {
                               @Override
                               // 初始化列累加器 .创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化
                               //来一条数据.相应组内只有一条数据时候执行一次
                               //如果原先有一条,那么新进来一条时候,就不执行了。直接执行add getresult
                               //累加器有点像是中间传递的东西
                               //user1+user1 通过累加器就是 acc_1=acc(初始化)+第一个user,  acc=acc_1+第一个user1
                               //相加的结果都保留在累加器中。相当于一个寄存的地方
                               public Tuple3<String, Integer, Timestamp> createAccumulator() {
                                   System.out.println("------createAccumulator--------"+new Timestamp(new Date().getTime()));
                                   return new Tuple3<>("",0,new Timestamp(new Date().getTime()));
                               }
                               //累加器的累加方法 来一条数据执行一次 对于数据的每条数据,和迭代数据的聚合的具体实现
                               @Override
                               public Tuple3<String, Integer, Timestamp> add(Tuple3<String, Integer, Timestamp> value, Tuple3<String, Integer, Timestamp> accumulator) {
                                   System.out.println("------add--------"+value);
                                   accumulator.f0=value.f0;  //类加器的第一个值等于第一个数的fo
                                   accumulator.f1+=value.f1; //第二个值累加
                                   return accumulator;
                               }

                               // 返回值  在窗口内满足2个,计算结束的时候执行一次  从累加器获取聚合的结果
                               @Override
                               public Tuple3<String, Integer, Timestamp> getResult(Tuple3<String, Integer, Timestamp> accumulator) {
                                   System.out.println("------getResult--------"+accumulator);
                                   return accumulator;
                               }

                              //合并两个累加器,返回一个具有合并状态的累加器  一般不触发这个
                               @Override
                               public Tuple3<String, Integer, Timestamp> merge(Tuple3<String, Integer, Timestamp> a, Tuple3<String, Integer, Timestamp> b) {
                                   System.out.println("------merge--------"+a);
                                   return null;
                               }
                           }
                );

       data_aggregate.print();
       env.execute("execute");

    }


}
输出



------createAccumulator--------2020-10-20 20:52:43.177
------add--------(user1,1,2020-10-20 20:52:43.095)      --进来user1 分组后。组内只有一条user1数据  执行createAccumulator-->add (add是加的初始化的累加器)

------createAccumulator--------2020-10-20 20:52:44.179
------add--------(user2,2,2020-10-20 20:52:44.103)      --进来user2 分组后。组内只有一条user2数据   执行createAccumulator-->add(add是加的初始化的累加器)


------add--------(user1,1,2020-10-20 20:52:45.103)      --又进来user1 分组后 组内有两个user1  满足数量要求 执行add-->getresult  (add:第二个user1+(第一个user1+初始的累加器) )同时由于AggregateFunction是增量计算的。所以清空组内的数据,
------getResult--------(user1,2,2020-10-20 20:52:43.178)
(user1,2,2020-10-20 20:52:43.178)


------createAccumulator--------2020-10-20 20:52:46.189   --进来user4  组内只有一条user4数据   执行createAccumulator-->add(add是加的初始化的累加器)
------add--------(user4,4,2020-10-20 20:52:46.103)

------createAccumulator--------2020-10-20 20:52:47.195   --!!!注意由于上面已经进来了两个user1,输出了。由于AggregateFunction是增量计算的。所以前面两个输出后。该组内被清空了 此时是组第一个
------add--------(user1,1,2020-10-20 20:52:47.104)

------createAccumulator--------2020-10-20 20:52:48.2     --进来一个user6 组内只有一个user6   执行createAccumulator-->add(add是加的初始化的累加器)
------add--------(user6,6,2020-10-20 20:52:48.104)

------add--------(user1,1,2020-10-20 20:52:49.104)       --进来一个user1 此时组内有两个了 ,满足数量要求,就 add-->getresult输出。同时由于AggregateFunction是增量计算的。所以清空组内的数据,
------getResult--------(user1,2,2020-10-20 20:52:47.195)
(user1,2,2020-10-20 20:52:47.195)


------createAccumulator--------2020-10-20 20:52:50.109
------add--------(user8,8,2020-10-20 20:52:50.104)

------createAccumulator--------2020-10-20 20:52:51.114
------add--------(user1,1,2020-10-20 20:52:51.105)

------createAccumulator--------2020-10-20 20:52:52.119
------add--------(user10,10,2020-10-20 20:52:52.105)

------add--------(user1,1,2020-10-20 20:52:53.106)
------getResult--------(user1,2,2020-10-20 20:52:51.114)
(user1,2,2020-10-20 20:52:51.114)

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink中AggregateFunction 执行步骤以及含义全网详细解释 的相关文章

随机推荐

  • 深度学习实战:使用 PyTorch 和序列到序列(Seq2Seq)模型进行机器翻译

    机器翻译是自然语言处理中的一个重要任务 它涉及将一种语言的文本转换为另一种语言的文本 序列到序列 Seq2Seq 模型是一种强大的深度学习模型 用于处理机器翻译任务 在本篇博客中 我们将使用 PyTorch 和 Seq2Seq 模型进行机器
  • 我00后,会Python,月薪5000,兼职1.5w

    当代年轻人的终极烦恼 没钱 主业收入不高但处处都要花钱 特别是今年以来 很多人会在后台问我 做些什么副业好 兼职写文 不知道上哪儿找单 自己也不一定写得好 做wei商 被朋友屏蔽 没有客源也出不了单 摆地摊 东西卖不出去反而倒贴了一笔钱 淘
  • vue中实现点击展开和收起功能(具有动画效果)

    vue中实现点击展开和收起功能 具有动画效果 html div class marketplace aside b div class marketplace aside show that item text div div
  • 一个好玩的小游戏——麻神之战

    题目 一种新的麻将 只留下一种花色 并且除去了一些特殊和牌方式 例如七对子等 规则如下 共有36张牌 每张牌是1 9 每个数字4张牌 你手里有其中的14张牌 如果这14张牌满足如下条件 即算作和牌 14张牌中有2张相同数字的牌 称为雀头 除
  • Java 同步JSON字符串至ES(Elasticsearch) 添加时间戳(@timestamp)、版本(@version) 字段

    解决方法 仿照logstash同步原理 对于同步json字符串 首先将其解析 然后添加时间戳和版本字段 或其他字段 打入es public void insertEs String jsonStr JSONObject jsonObject
  • 95-36-210-ChannelHandler-系统Channel-TimeoutHandler

    文章目录 1 概述 2 继承体系 3 IdleStateHandler 3 1 典型构造方法 3 2 初始化方法 initialize 3 3 销毁方法destroy 3 4 核心的调度任务 ReaderIdleTimeoutTask 1
  • QT的补充知识

    一 文件 QFile QT提供了QFile类用于对文件进行读写操作 也提供了其他的两个类 文本流 QTextSream 和数据流 QDataStream 文本流 QTextSream 用于对文本数据的处理 并且是以字为单位进行读 写 数据流
  • 获取执行计划——使用动态性能视图和AWR、Statspacks

    上一篇中讲了如何使用EXPLAIN PLAN方法来获取sql执行计划 这篇继续讲另两种方法 使用动态性能视图和AWR报告 一 使用动态性能视图 查询动态性能视图我们可以获取丰富的信息 包括执行计划与游标信息等等 下面罗列几个常用的v 视图
  • Python IDLE 自动提示功能

    Python27 Lib idlelib 目录下 config extensions def文件修改等待时间 AutoComplete enable 1 popupwait 2000 2000表示2秒 修改为0 AutoComplete p
  • 分享一个页面

    先看效果 看下代码
  • 34. 注入篇——Cookie注入

    Cookie注入原理 1 数据读取流程 对于WEB服务器而言 读取数据的流程是先取GET中的数据 如果GET中没有数据信息 那么再取POST中的数据 如果POST中也没有那么就会去取COOKIE中的数据 2 防注入系统的常例 系统一般只会对
  • flutter两个非常常用的布局小空间SizedBox和Divider

    SizedBox SizedBox是Flutter中的一个小部件 widget 用于创建具有指定尺寸的空白框 它通常用于调整和控制布局中的间距 大小和位置 SizedBox具有以下常用属性 width 指定SizedBox的宽度 heigh
  • Redis 五大基础数据结构命令详细介绍

    文章目录 一 Redis数据结构 二 Redis通用命令 三 String类型 3 1 String类型 也就是字符串类型 是Redis中最简单的存储类型 3 2 String类型的常见命令 四 Redis key的层级格式 4 1 key
  • GPT发家史

    如今 ML 领域公号也卷得厉害 最早我 reddit 灌灌水 邮件看看 就有东西写了也不怕重 现在基本上能第一眼看到的东西肯定还没动手大号们就发完了 前段时间 DALL E 刚出 果然还没动手写 无数文章就给介绍完了 对个人而言 要写的话要
  • mysql之分页查询14

    1 分页查询 分页查询比较简单 主要是使用limit关键字去分页 一般理解分页公式limit page 1 size size 即可 进阶8 分页查询 应用场景 当要显示的数据 一页显示不全 需要分页提交sql请求 语法 select 查询
  • 【Git】Git切换地址

    如何切换git代码地址 1 查看当前远程 url git remote v 执行命令后 可以看见当前有2个URL 远程 URL 在一般情况下有两个 分别是 fetch 和 push fetch URL 是用于从远程仓库获取最新版本的数据 当
  • Java面向对象进阶&继承

    1 继承 1 1 继承的实现 继承的概念 继承是面向对象三大特征之一 可以使得子类具有父类的属性和方法 还可以在子类中重新定义 以及追加属性和方法 实现继承的格式 继承通过extends实现 格式 class 子类 extends 父类 举
  • 水仙花数(Java语言)——最基础全面的讲解

    题目 判读一个整数是否是水仙花数 所谓水仙花数是一个三位数 其各个位上数字立方和等于它本身 例如 153 1 1 1 3 3 3 5 5 5 首先进行思路分析 1 首先要得到此数百位 十位 个位上的数字 然后用 if 判断他们的立方和是否相
  • 数据库锁表?别慌,本文教你如何解决

    引言 作为开发人员 我们经常会和数据库打交道 当我们对数据库进行修改操作的时候 例如添加字段 更新记录等 没有正确评估该表在这一时刻的使用频率 直接进行修改 致使修改操作长时间无法响应 造成锁表 在 mysql 中 如果出现 alter 操
  • flink中AggregateFunction 执行步骤以及含义全网详细解释

    package operator import org apache flink api common functions AggregateFunction import org apache flink api common funct