我有一个 CSV 文件,其中每行都包含新的回车符 (\n)。
使用 Apache Beam 的 TextIO.read 函数从云存储读取 CSV 文件时,它会将 \n 视为新记录。我怎样才能克服这个问题。
我尝试过扩展 filebasedsource,但当我们应用 pTransorms 时,它仅读取 CSV 文件的第一行。
帮助将不胜感激
提前致谢
TextIO
不能这样做 - 它总是根据回车符分割输入,并且不知道其中一些回车符的 CSV 特定引用。
但是,Beam 2.2 包含一个转换,使您可以非常轻松地自己编写特定于 CSV 的(或任何其他文件格式特定的读取)代码:FileIO
。做这样的事情:
p.apply(FileIO.match().filepattern("gs://..."))
.apply(FileIO.readMatches())
.apply(ParDo.of(new DoFn<ReadableFile, TableRow>() {
@ProcessElement
public void process(ProcessContext c) throws IOException {
try (InputStream is = Channels.newInputStream(c.element().open())) {
// ... Use your favorite Java CSV library ...
... c.output(next csv record) ...
}
}
}))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)