我们正在尝试在 Apache Beam 管道上使用固定窗口(使用DirectRunner
)。我们的流程如下:
- 从发布/订阅中提取数据
- 将 JSON 反序列化为 Java 对象
- 带有 5 秒固定窗口的窗口事件
- 使用自定义
CombineFn
,合并每个窗口Event
变成一个List<Event>
- 为了测试方便,直接输出结果
List<Event>
管道代码:
pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)
// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.<Event>into(FixedWindows
.of(Duration.standardSeconds(5))
)
)
// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)
// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn())
);
我们看到的结果是最后一步永远不会运行,因为我们没有得到任何日志记录。
另外,我们还添加了System.out.println("***")
在我们自定义的每个方法中CombineFn
类,以便跟踪它们何时运行,但似乎它们也没有运行。
这里的窗口设置不正确吗?我们遵循了一个例子https://beam.apache.org/documentation/programming-guide/#windowing这看起来相当简单,但显然缺少一些基本的东西。
如有任何见解,我们将不胜感激 - 提前致谢!
看起来主要问题确实是缺少触发器 - 窗口正在打开,但没有任何东西告诉它何时发出结果。我们想简单地根据处理时间(而不是事件时间)来设置窗口,因此执行了以下操作:
.apply("Window", Window
.<Event>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
本质上,这会创建一个全局窗口,在处理第一个元素后 5 秒触发该窗口发出事件。每次关闭窗口时,一旦收到元素,就会打开另一个窗口。当我们没有时,Beam 抱怨道withAllowedLateness
片段 - 据我所知,这只是告诉它忽略任何最新的数据。
我的理解可能有点离题,但是上面的代码片段已经解决了我们的问题!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)