我正在关注并回答这个帖子 https://stackoverflow.com/questions/48519834/how-to-write-to-a-file-name-defined-at-runtime/48531741#48531741 and 文档 https://beam.apache.org/documentation/sdks/javadoc/2.4.0/index.html?org/apache/beam/sdk/io/FileIO.html为了在管道末尾对我的数据执行动态窗口写入。这是我到目前为止所拥有的:
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(
FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withNaming(key -> defaultNaming(key, ".json")));
}
但 NetBeans 警告我最后一行有语法错误:
FileNaming is not public in Write; cannot be accessed outside package
我该如何制作defaultNaming
可用于我的管道,以便我可以将其用于动态写入。或者,如果这是不可能的,我应该做什么?
发布我的想法,以防其他人遇到这个问题。
我尝试使用的方式存在三个问题writeDynamic()
before.
- 之前我一直使用 Beam 2.3.0 版本,它确实描述了
FileNaming
作为一个内部类FileIO.Write
。 Beam 2.4.0 定义FileNaming
as a public static interface
使其可供外部使用。
- 完全解析/导入
defaultNaming
。而不是打电话defaultNaming
直接 - 正如示例文档中所调用的 - 必须将其调用为FileIO.Write.defaultNaming
since FileIO
是我实际导入的包。
- Adding
withDestinationCoder
还需要执行动态写入。
最终的解决方案看起来像这样。
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}
Where Event::getKey
是在同一包中定义的静态函数,其签名为public static String getKey(String event)
.
这将执行窗口写入,每个窗口写入一个文件(由.withNumShards(1)
方法)。这假设窗口已在上一步中定义。 AGroupByKey
在写入之前不需要,因为只要显式定义分片数量,它就在写入过程中完成。请参阅文件IO文档 https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html有关更多详细信息,请参见“写入文件 -> 每个窗格生成多少个分片”。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)