使用 Apache Beam 进行窗口化 - 修复了窗口似乎没有关闭的问题?

2023-11-23

我们正在尝试在 Apache Beam 管道上使用固定窗口(使用DirectRunner)。我们的流程如下:

  1. 从发布/订阅中提取数据
  2. 将 JSON 反序列化为 Java 对象
  3. 带有 5 秒固定窗口的窗口事件
  4. 使用自定义CombineFn,合并每个窗口Event变成一个List<Event>
  5. 为了测试方便,直接输出结果List<Event>

管道代码:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn())
                );

我们看到的结果是最后一步永远不会运行,因为我们没有得到任何日志记录。

另外,我们还添加了System.out.println("***")在我们自定义的每个方法中CombineFn类,以便跟踪它们何时运行,但似乎它们也没有运行。

这里的窗口设置不正确吗?我们遵循了一个例子https://beam.apache.org/documentation/programming-guide/#windowing这看起来相当简单,但显然缺少一些基本的东西。

如有任何见解,我们将不胜感激 - 提前致谢!


看起来主要问题确实是缺少触发器 - 窗口正在打开,但没有任何东西告诉它何时发出结果。我们想简单地根据处理时间(而不是事件时间)来设置窗口,因此执行了以下操作:

.apply("Window", Window
    .<Event>into(new GlobalWindows())
    .triggering(Repeatedly
        .forever(AfterProcessingTime
            .pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(5))
        )
    )
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

本质上,这会创建一个全局窗口,在处理第一个元素后 5 秒触发该窗口发出事件。每次关闭窗口时,一旦收到元素,就会打开另一个窗口。当我们没有时,Beam 抱怨道withAllowedLateness片段 - 据我所知,这只是告诉它忽略任何最新的数据。

我的理解可能有点离题,但是上面的代码片段已经解决了我们的问题!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Apache Beam 进行窗口化 - 修复了窗口似乎没有关闭的问题? 的相关文章

  • 如何在日期选择器中设置不在当前月份的单元格的样式

    我目前正在为我的 JavaFX 应用程序制作注册表 问题是 当日期选择器中的单元格不在页面的月份上时 我想让该单元格变灰 让我们看看我当前的日期选择器 我的日期选择器 正如您所看到的 我希望下个月的日期 27 日 28 日 30 日以及 1
  • 热重载在docker中运行的java程序

    我开发了一个java程序 应该在docker中运行 然而 我在调试docker中运行的java程序时遇到了很多痛苦 我在网上搜索 一些教程提出了像 spring dev tools 这样的工具 因为我的java程序是基于spring boo
  • 如何在一行中将字符串数组转换为双精度数组

    我有一个字符串数组 String guaranteedOutput Arrays copyOf values values length String class 所有字符串值都是数字 数据应转换为Double QuestionJava 中
  • 线程自动利用多个CPU核心?

    假设我的应用程序运行 2 个线程 例如渲染线程和游戏更新线程 如果它在具有多核 CPU 当今典型 的移动设备上运行 我是否可以期望线程在可能的情况下自动分配给不同的核心 我知道底层操作系统内核 Android linux内核 决定调度 我的
  • CXF Swagger2功能添加安全定义

    我想使用 org apache cxf jaxrs swagger Swagger2Feature 将安全定义添加到我的其余服务中 但是我看不到任何相关方法或任何有关如何执行此操作的资源 下面是我想使用 swagger2feature 生成
  • 在数据流模板中调用 waitUntilFinish() 后可以运行代码吗?

    我有一个批处理 Apache Beam 作业 它从 GCS 获取文件作为输入 我的目标是根据执行后管道的状态将文件移动到两个 GCS 存储桶之一 如果管道执行成功 则将文件移动到存储桶 A 否则 如果管道在执行过程中出现任何未处理的异常 则
  • Convert.FromBase64String 方法的 Java 等效项

    Java 中是否有相当于Convert FromBase64String http msdn microsoft com en us library system convert frombase64string aspx which 将指
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • 当分配给变量时,我可以以某种方式重用 Gremlin GraphTraversals 代码吗?

    我有看起来像这样的 GraphTraversals attrGroup GraphTraversal
  • jdbc4.MySQLSyntaxErrorException:数据库中不存在表

    我正在使用 SpringBoot 开发一个网络应用程序 这是我的application properties文件来指定访问数据库的凭据 spring datasource driverClassName com mysql jdbc Dri
  • 使用替换字符串中多个单词的最有效方法[重复]

    这个问题在这里已经有答案了 此刻我正在做 Example line replaceAll replaceAll cat dog replaceAll football rugby 我觉得那很丑 不确定有更好的方法吗 也许循环遍历哈希图 ED
  • 请求位置更新参数

    这就是 requestLocationUpdates 的样子 我使用它的方式 requestLocationUpdates String provider long minTime float minDistance LocationLis
  • 序列化对象以进行单元测试

    假设在单元测试中我需要一个对象 其中所有 50 个字段都设置了一些值 我不想手动设置所有这些字段 因为这需要时间而且很烦人 不知何故 我需要获得一个实例 其中所有字段都由一些非空值初始化 我有一个想法 如果我要调试一些代码 在某个时候我会得
  • Spring Data 与 Spring Data JPA 与 JdbcTemplate

    我有信心Spring Data and Spring Data JPA指的是相同的 但后来我在 youtube 上观看了一个关于他正在使用JdbcTemplate在那篇教程中 所以我在那里感到困惑 我想澄清一下两者之间有什么区别Spring
  • 归并排序中的递归:两次递归调用

    private void mergesort int low int high line 1 if low lt high line 2 int middle low high 2 line 3 mergesort low middle l
  • 制作java包

    我的 Java 类组织变得有点混乱 所以我要回顾一下我在 Java 学习中跳过的东西 类路径 我无法安静地将心爱的类编译到我为它们创建的包中 这是我的文件夹层次结构 com david Greet java greeter SayHello
  • 使用 Flyway 和 Hibernate 的 hbm2ddl 在应用程序的生命周期中管理数据库模式

    我正在开发 Spring Hibernate MySql 应用程序 该应用程序尚未投入生产 我目前使用 Hibernatehbm2ddl该功能对于管理域上的更改非常方便 我也打算用Flyway用于数据库迁移 在未来的某个时候 该应用程序将首
  • 当单元格内的 JComboBox 中有 ItemEvent 时,如何获取 CellRow

    我有一个 JTable 其中有一列包含 JComboBox 我有一个附加到 JComboBox 的 ItemListener 它会根据任何更改进行操作 但是 ItemListener 没有获取更改的 ComboBox 所在行的方法 当组合框
  • Android JNI C 简单追加函数

    我想制作一个简单的函数 返回两个字符串的值 基本上 java public native String getAppendedString String name c jstring Java com example hellojni He
  • 休眠以持久保存日期

    有没有办法告诉 Hibernate java util Date 应该持久保存 我需要这个来解决 MySQL 中缺少的毫秒分辨率问题 您能想到这种方法有什么缺点吗 您可以自己创建字段long 或者使用自定义的UserType 实施后User

随机推荐

  • 从 SharedPreferences 设置和获取 StringSet?

    我正在构建一个 Android 应用程序 我想在首选项中存储一组字符串 以便根据登录信息跟踪谁使用了该应用程序 我不想使用数据库 所以我知道我应该使用 SharedPreferences 来存储登录人员的列表 我希望能够重置此列表 以便将个
  • 使用中位数和分组依据以及谷歌表格进行查询

    我需要获得分组中位数 我已经对表单的数据进行了分组 From type Weight A person person 4 A person person 3 A person organization 11 A person person
  • 在 QToolTip 中使用图片或图像

    有没有办法在 QToolTip 中显示图片 图像 我想显示键盘按钮的小图像 以向用户解释他可以在该特定小部件上使用哪些按钮 快捷方式 您可以使用以下 html 代码轻松显示图像 QToolTip showText QCursor pos i
  • AngularJS 和谷歌云端点:需要演练

    我是 AngularJS 的新手 但我真的很喜欢 AngularJS 的工作方式 因此我想将其部署为我的 Google 云端点后端的客户端 然后我立即遇到两个问题 1 放在哪里我的回调 那么它能够在 ANGularJs 控制器中工作吗 2
  • 在字符串中包含常量而不连接

    PHP 中有没有一种方法可以在字符串中包含常量而无需连接 define MY CONSTANT 42 echo This is my constant MY CONSTANT No 对于字符串 PHP 无法区分字符串数据和常量标识符 这适用
  • 使用元素求幂加速嵌套 for 循环

    我正在编写一个大型代码 我发现自己需要加速其中的特定部分 我创建了一个MWE如下图所示 import numpy as np import time def random data N Generate some random data r
  • 如何以编程方式打开 Safari 扩展 ToolbarItem 弹出窗口

    我想以编程方式触发 Safari 扩展工具栏项目上的 单击 事件 以便在网页上发生某些情况后出现我的自定义弹出窗口 我正在使用新的 Xcode 扩展 IDE 并使用界面生成器构建了我的弹出窗口 目前 StackOverflow 上的所有答案
  • 使用 .bat 文件运行 php 脚本

    我需要每天晚上在我的服务器上运行一个 php 脚本 在 Linux 系统上我设置了一个 cron 作业 但我被困在 Windows 系统上 我知道我必须使用 Windows 任务计划程序设置一个任务 并且该任务需要运行一个 bat 文件 该
  • 向 DataTable 添加多行

    我知道有两种方法将带有数据的新行添加到DataTable string arr2 one two three dtDeptDtl Columns Add Dept Cd for int a 0 a lt arr2 Length a Data
  • 关于 C 中的 ## 预处理器

    Given define cat x y x y 电话cat a 1 回报a1 but cat cat 1 2 3 未定义 但是如果我也定义 define xcat x y cat x y 那么结果是xcat xcat 1 2 3 就是现在
  • RequireJS:根据环境加载不同的文件

    是否有根据当前项目环境 例如开发或生产 加载不同文件的功能 我的意思是 它可以帮助我透明地加载缩小或完整的文件 我读到有关多版本加载的内容 但多版本意味着我需要指定文件的版本 例如 我的模块中有 module js 文件 在这个文件中我需要
  • CSS 媒体查询 - 顺序很重要吗?

    现在我经常使用 CSS 媒体查询 我想知道最好按什么顺序使用它们 Method 1 media only screen and min width 800px content sidebar media only screen and ma
  • 获取文件的 QuickLook 预览图像

    有什么方法可以快速查看文件的预览图像吗 我正在寻找这样的东西 NSImage image QuickLookPreviewer quickLookPreviewForFile path See QLThumbnailRequest在文档中
  • Flutter 中的水平步进器

    我想创建一个水平步进器 我知道这很容易 但是这一次 步数应该很大 举个例子 这就是我在垂直领域所做的事情 import package flutter material dart void main gt runApp new MyApp
  • 在 C++ 中如何实现多个 COM 接口?

    我试图理解这个示例代码关于浏览器帮助程序对象 在内部 作者实现了一个公开多个接口 IObjectWithSite IDispatch 的类 他的 QueryInterface 函数执行以下操作 if riid IID IUnknown pp
  • 如何验证 jar 内 MANIFEST.MF 的顺序?

    我遇到了一个有趣的问题 这对我来说绝对是新的 正如我突然发现的 Jar 规范说 被包含在内 META INF and MANIFEST MF必须是第一个和第二个条目 jar包而不仅仅是存档中的目录和文件 我正在使用 Java 框架 非常注意
  • 如何在 C++/CLI 中使用 boost::bind 绑定托管类的成员

    我在本机 C 类中使用 boost signal 现在我正在 C CLI 中编写 NET 包装器 以便可以将本机 C 回调公开为 NET 事件 当我尝试使用 boost bind 获取托管类的成员函数的地址时 出现编译器错误 3374 指出
  • Python CSV 到 SQLite

    我正在 转换 一个大的 1 6GB CSV 文件并将CSV 的特定字段插入到SQLite 数据库中 基本上我的代码如下所示 import csv sqlite3 conn sqlite3 connect path to file db co
  • 使用 Apache POI 将部分单元格内容设置为下划线?

    我正在开发一个程序 其中我必须在 Excel 电子表格中设置单元格值 例如 这是一下划线 text 它可以是任何粗体 斜体或下划线 我正在使用 Apache POI 3 9 请尝试以下操作 public static void differ
  • 使用 Apache Beam 进行窗口化 - 修复了窗口似乎没有关闭的问题?

    我们正在尝试在 Apache Beam 管道上使用固定窗口 使用DirectRunner 我们的流程如下 从发布 订阅中提取数据 将 JSON 反序列化为 Java 对象 带有 5 秒固定窗口的窗口事件 使用自定义CombineFn 合并每