Flink:是否有另一种方法来计算平均值和状态变量而不是使用 RichAggregateFunction?

2023-12-26

我不确定必须使用哪个流 Flink 转换来计算某个流的平均值并在 5 秒的窗口内更新状态(假设它是我的状态的整数数组)。 如果我使用RichFlatMapFunction我可以计算平均值并更新我的数组状态。但是,我必须打电话

streamSource
    .keyBy(0)
    .flatMap(new MyRichFlatMapFunction())
    .print()

我不能把它写在窗户上。 如果我使用

streamSource
    .keyBy(0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .aggregate(new MyAggregateFunction())
    .print()

我无法保持数组状态ValueState.

我试图使用RichAggregateFunction我也遇到了这个线程的同样问题。使用 RichAggregateFunction 时出现 Flink 错误 https://stackoverflow.com/questions/47437207/flink-error-on-using-richaggregatefunction是否有另一种方法来计算平均值并跟踪 Flink 中的另一个状态?

在 Flink 中我该如何解决这个问题? 这是我试图做但实际上不起作用的方法>https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L70 https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L70

streamStations.filter(new SensorFilter("COUNT_TR"))
                .map(new TrainStationMapper())
                .keyBy(new MyKeySelector())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)));
                // THIS AGGREGATE DOES NOT WORK
                // .aggregate(new AverageRichAggregator())
                // .print();

    public static class AverageRichAggregator extends
            RichAggregateFunction<Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double>, Tuple3<Double, Long, Integer>, Tuple2<String, Double>> {

        private static final long serialVersionUID = -40874489412082797L;
        private String functionName;
        private ValueState<CountMinSketch> countMinSketchState;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<CountMinSketch> descriptor = new ValueStateDescriptor<>("countMinSketchState",
                    CountMinSketch.class);
            this.countMinSketchState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public Tuple3<Double, Long, Integer> createAccumulator() {
            this.countMinSketchState.clear();
            return new Tuple3<>(0.0, 0L, 0);
        }

        @Override
        public Tuple3<Double, Long, Integer> add(
                Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double> value,
                Tuple3<Double, Long, Integer> accumulator) {
            try {
                if (value.f1.f1.equals("COUNT_PE")) {
                    // int count = (int) Math.round(value.f2);
                    // countMinSketch.updateSketchAsync("COUNT_PE");
                } else if (value.f1.f1.equals("COUNT_TI")) {
                    // int count = (int) Math.round(value.f2);
                    // countMinSketch.updateSketchAsync("COUNT_TI");
                } else if (value.f1.f1.equals("COUNT_TR")) {
                    // int count = (int) Math.round(value.f2);
                    // countMinSketch.updateSketchAsync("COUNT_TR");
                }
                CountMinSketch currentCountMinSketchState = this.countMinSketchState.value();
                currentCountMinSketchState.updateSketchAsync(value.f1.f1);
                this.countMinSketchState.update(currentCountMinSketchState);

            } catch (IOException e) {
                e.printStackTrace();
            }

            return new Tuple3<>(accumulator.f0 + value.f2, accumulator.f1 + 1L, value.f1.f4);
        }

        @Override
        public Tuple2<String, Double> getResult(Tuple3<Double, Long, Integer> accumulator) {
            String label = "";
            int frequency = 0;
            try {
                if (functionName.equals("COUNT_PE")) {
                    label = "PEOPLE average on train station";
                    // frequency = countMinSketch.getFrequencyFromSketch("COUNT_PE");

                } else if (functionName.equals("COUNT_TI")) {
                    label = "TICKETS average on train station";
                    // frequency = countMinSketch.getFrequencyFromSketch("COUNT_TI");

                } else if (functionName.equals("COUNT_TR")) {
                    label = "TRAIN average on train station";
                    // frequency = countMinSketch.getFrequencyFromSketch("COUNT_TR");
                }
                frequency = this.countMinSketchState.value().getFrequencyFromSketch(functionName);

            } catch (IOException e) {
                e.printStackTrace();
            }

            return new Tuple2<>(label + "[" + accumulator.f2 + "] reads[" + frequency + "]",
                    ((double) accumulator.f0) / accumulator.f1);
        }

        @Override
        public Tuple3<Double, Long, Integer> merge(Tuple3<Double, Long, Integer> a, Tuple3<Double, Long, Integer> b) {
            return new Tuple3<>(a.f0 + b.f0, a.f1 + b.f1, a.f2);
        }
    }

error:

Exception in thread "main" java.lang.UnsupportedOperationException: This aggregation function cannot be a RichFunction.
    at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:692)
    at org.sense.flink.examples.stream.MultiSensorMultiStationsReadingMqtt2.<init>(MultiSensorMultiStationsReadingMqtt2.java:71)
    at org.sense.flink.App.main(App.java:141)

Thanks


不允许聚合器保留任意状态,以防聚合器可能与合并窗口一起使用 - 因为 Flink 不知道如何合并您的临时状态。

但是您可以将 AggregateFunction 与 ProcessWindowFunction 组合起来,如下所示:

input
 .keyBy(<key selector>)
 .timeWindow(<duration>)
 .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());

ProcessWindowFunction 的 process 方法将传递一个仅包含预聚合结果的迭代器,以及一个提供对全局和每个窗口状态的访问的上下文 https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction。希望这能以简单的方式提供您所需的内容。但是,如果您需要使用每个到达的记录更新自己的状态,那么您需要扩展聚合器管理的类型以适应这种情况。

以下是如何使用全局状态的粗略概述:

private static class MyWindowFunction extends ProcessWindowFunction<IN, OUT, KEY, TimeWindow> {
    private final static ValueStateDescriptor<Long> myGlobalState =
      new ValueStateDescriptor<>("stuff", LongSerializer.INSTANCE);

    @Override
    public void process(KEY key, Context context, Iterable<IN> values,  Collector<OUT> out) {
        ValueState<Long> goodStuff = context.globalState().getState(myGlobalState);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink:是否有另一种方法来计算平均值和状态变量而不是使用 RichAggregateFunction? 的相关文章

  • Hibernate OneToMany 关系是 PersistentBag 而不是 List

    我正在 javafx 中开发一个应用程序 它通过 RMI 与 EAR 连接 该 EAR 连接到 SQLServer DB 并使用 hibernate 映射 POJOS 这些 POJOS 包含双向 OneToMany 和 ManyToOne
  • .java 和 .scala 类之间是否可能存在循环依赖?

    假设我在 java 文件中定义了类 A 在 scala 文件中定义了类 B A 类使用 B 类 B 类使用 A 类 如果我使用 java 编译器 则会出现编译错误 因为 B 类尚未编译 如果我使用scala编译器A类将找不到 有没有可以同时
  • 如何解决错误:java.lang.ClassNotFoundException:io.netty.util.concurrent.GenericFutureListener?

    昨天我第一次尝试用 Java 制作 Prometheus 客户端 从 Python 开始 最后是 GoLang 是否找到示例 import io prometheus client Counter import io prometheus
  • 如何在 OpenAPI 3.0 中定义字节数组

    我正在将 API 从 Swagger 2 0 迁移到 OpenAPI 3 0 在 DTO 中 我有一个指定为字节数组的字段 Swagger 对 DTO 的定义 Job type object properties body type str
  • Java:检查给定日期是否在当前月份内

    我需要检查给定的日期是否在当前月份 我编写了以下代码 但 IDE 提醒我getMonth https docs oracle com javase 7 docs api java util Date html getMonth and ge
  • 无法从后台服务通过 WiFi 访问互联网

    我将直接介绍我发现的一些事实 数据 如果您遇到 解决了类似的问题 请帮助我 我每 5 分钟向服务器发送一次数据 除非用户在服务器的帮助下手动将其关闭 wakeful broadcast receiver通过一个intent service
  • 业务代表与服务定位器

    Business Delegate 和 Service Locator 之间有什么区别 两者都负责封装查找和创建机制 如果 Business Delegate 使用 Service Locator 来隐藏查找和创建机制 那么 Busines
  • Java 的 QP 求解器 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 如何使用 Java 原生接口从 Java 调用 Go 函数?

    可以通过以下方式调用 C 方法JNA https en wikipedia org wiki Java Native AccessJava 中的接口 如何使用 Go 实现相同的功能 package main import fmt impor
  • 在 Mac 上使用 JRE 打开 jar 文件

    我有一个 jar 文件 旨在通过命令行运行 我不打算在运行应用程序的机器上进行任何java开发 我的思考过程是 因此我应该只需要JRE而不是JDK 此外 JDK 大约是 JRE 的 4 倍 我不想下载它 在 Mac 上安装 JRE 时 它不
  • Java String.format 向整数添加空格

    我有一小段代码 我不明白输出 此输出向我的字符串格式文本添加空格 我做错了什么吗 public class HelloWorld public static void main String args int a1 540 int a2 4
  • java 属性文件作为枚举

    是否可以将属性文件转换为枚举 我有一个包含很多设置的属性文件 例如 equipment height equipment widht equipment depth and many more like this and not all a
  • 当容器大小更改时,JTable 仅调整选定列的大小

    对于面板内的 JTable 如果面板变大 我如何将额外的空间仅分配给某些列 在我的例子中 分配给最后一列 尽管提供 第 3 4 列和8 将获得额外的空间 我想允许用户手动更改所有列的列大小 我尝试了 table setAutoResizeM
  • 为什么在尝试使用 Java 连接到 RDS PostgreSQL 数据库时会收到 SocketTimeoutException?

    我有一个 Spring 应用程序 我试图在 AWS 上托管 几天来我一直在努力配置 我有一个 EC2 实例 并且能够通过 SSH 连接到它 我还在 AWS 中设置了 Postgres RDS 数据库 但我无法使用 IDE 中的代码连接到它
  • Mule/码头设置

    我有一个正在运行的 Mule 应用程序 我想在其上设置 Jetty 来响应 http 请求 以下配置
  • 防止 Firebase 中的待处理写入事务不起作用

    我的目标是在单击按钮时将名称插入 Cloud Firestore 中 但如果用户未连接到互联网 我不希望保存处于挂起状态 我不喜欢 Firebase 保存待处理写入的行为 即使互联网连接已恢复 我研究发现Firebase 开发人员建议使用事
  • Java SE + Spring Data + Hibernate

    我正在尝试使用 Spring Data Hibernate 启动 Java SE 应用程序 并且到目前为止已经完成了以下操作 配置文件 Configuration PropertySource classpath hibernate pro
  • Java泛型类型

    当我有一个界面时 public interface Foo
  • 监控 Java 应用程序上的锁争用

    我正在尝试创建一个小基准 在 Groovy 中 以显示几个同步方法上的高线程争用 当监控自愿上下文切换时 应该会出现高争用 在 Linux 中 这可以通过 pidstat 来实现 程序如下 class Res private int n s
  • Java 9 中紧凑字符串和压缩字符串的区别

    有什么优点紧凑的字符串 http openjdk java net jeps 254JDK9 中的压缩字符串 压缩字符串 Java 6 和紧凑字符串 Java 9 都有相同的动机 字符串通常实际上是 Latin 1 因此浪费了一半的空间 和

随机推荐

  • 使用 MPI 分散不同大小的矩阵块

    假设所有矩阵都按行优先顺序存储 说明该问题的一个示例是将 10x10 矩阵分布在 3x3 网格上 以便每个节点中的子矩阵的大小如下所示 3x3 3x3 3x4 3x3 3x3 3x4 4x3 4x3 4x4 我在 Stackoverflow
  • 改进单选按钮的使用以启用/禁用表单字段

    我有两个单选按钮和两个相应的表单字段 根据选择的单选按钮 一个表单字段将被禁用 而另一个表单字段将被启用 我的代码可以工作 但我认为它可以改进 现在我有两个独立的进程 检查页面加载时选择了哪个单选按钮并禁用相应的字段 另一个在页面加载后响应
  • 将下拉菜单与年份绑定

    我必须在 C 中绑定一个下拉框 其中包含从 2008 年到当前年份的年份 我怎样才能实现它 您可以使用以下命令构建整数序列System Linq Enumerable Range var startYear 2008 myDropDownL
  • Swift 3 - 如何提取正则表达式中捕获的组?

    我正在使用 Swift 3 并尝试访问捕获的组 let regexp ALREADY PAID NOT ALR PROVIDER MAY READY MAY BILL BILL YOU PAID n d d d check if some
  • 连接被拒绝 - connect(2) 用于“localhost”端口 25 Rails

    在培训期间 我正在开发一个网站 我们使用 Ruby on Rails 我们需要向用户发送邮件 所以我创建了一个邮件程序 我尝试过将 smtp 放在两者中development rb and environment rb config act
  • 如何在action类和jsp页面之间传递对象数据?

    我有一个名为 Code 的 Java 类 它具有与代码相关的所有值 例如codeId codeDescription等等及其 getter 和 setter 我正在成功检索一个操作类中的代码数据 我正在使用 struts 2 现在我想将这些
  • Go 中的 Unix FIFO?

    有没有办法用Go语言创建unix FIFO 没有Mkfifo nor Mknod in os包 尽管我预计命名的 FIFO 主要用于 posix 操作系统 事实上 有一个创建未命名的 FIFO 管道 的函数 但没有创建命名管道的函数 我是唯
  • 使用constraintEqualToAnchor()时设置自动布局约束后如何更改它们?

    我尝试使用以下命令设置具有自动布局约束的视图constraintEqualToAnchor override func viewDidLoad super viewDidLoad let myView UIView myView backg
  • 如果不存在则调用自由函数而不是方法

    假设您有一系列与类型无关的类 通过返回值的给定方法来实现共同概念 class A public int val const class B public int val const 假设您需要一个通用的自由函数T为未实现的任何类型返回常规值
  • 通过 ASPX 页面流式传输 PDF 数据

    我如何在我的网络服务器上流式传输 pdf 文件 并像谷歌文档一样在我自己的页面中显示它 该页面嵌入到页面中 None
  • swagger-codegen 客户端:如何在模型上包含 Jackson 注释

    我正在使用 swagger codegen 生成一个休息客户端 但遇到一个问题 我正在使用的服务返回一个具有继承的模型 API 模型如下所示 public class Person private List
  • Ansible 日期时间时区转换

    有没有办法在我的剧本中的 调试 语句中将ansible日期转换为不同的时区 我不想在剧本级别设置全球时区 我有这个 debug msg Y m d H M S strftime ansible date time epoch 这工作正常 但
  • 尝试的条件约束不是可索引操作

    我正在使用 DynamoDB 对于除 EQ 之外的所有 ComparisonOperators 的查询 API 它一直给出 尝试的条件约束不是可索引操作 错误 是什么原因 TableName My Table name IndexName
  • 使用 C# 程序为 Canon EOS Rebel XS 创建 USB 延时拍摄

    我想知道这是否可能 我想制作自己的软件来通过远程快门释放来控制快门的释放 从图中我看到它的电压为 3 3V 空载 阈值电压为 1 8V 我想知道如果我理解正确的话 我是否可以使用限制大约 5 25v 的 USB 电缆电压 这是否可能 或者我
  • R 根据前一行中的值删除行

    我是 R 新手 尝试根据前一行的值删除行 样本数据 Cust ID Date Value 500219 2016 04 11 12 00 00 0 500219 2016 04 12 16 00 00 0 500219 2016 04 14
  • /usr/bin/sudo 必须由 uid 0 拥有并设置了 setuid 位版本 .ubantu14.04 LTS

    当我为 npm 设置 EACCESS 并在终端中运行 chown 命令以更改所有者权限时 但现在我陷入了困境 sudo usr bin sudo 必须由 uid 0 拥有并设置了 setuid 位 我的版本是 ubuntu14 04 LTS
  • 将查询的所有结果放入 Prolog 中的列表中

    我想知道如何创建一个谓词 将从某个查询中获得的所有结果 因此我得到一个结果并按分号 直到得到 False 放入列表中 例如 如果我写foo X 1 2 3 在一些 Prolog 监听器中 假设结果是 X 11 X 22 False 我想将所
  • 覆盖css文件中的定义

    我有一个 css 文件 它定义了所有样式 tags 像这样 p 我怎样才能写一个在包含具有默认样式的样式表的页面中 没有简单的方法可以做到这一点 不过 有一些常见的技巧可以模拟这种行为 最好使用的方法取决于被覆盖区域的复杂程度以及您想要执行
  • 适用于 GAE 的 Weasyprint Dockerfile

    我正在尝试在 gae 上安装 weasyprint 我知道我们可以通过将运行时从 python 更改为 app yaml 中的自定义来将其传递到 Dockerfile 中来安装外部库 我在为 weasyprint 库创建 Dockerfil
  • Flink:是否有另一种方法来计算平均值和状态变量而不是使用 RichAggregateFunction?

    我不确定必须使用哪个流 Flink 转换来计算某个流的平均值并在 5 秒的窗口内更新状态 假设它是我的状态的整数数组 如果我使用RichFlatMapFunction我可以计算平均值并更新我的数组状态 但是 我必须打电话 streamSou