您的代码似乎存在一些问题。首先,有一些关于 null/None(您已经修复)和 ints/floats(在注释中指出)的格式错误的数据。最后,写入Avro转换无法写入无限的 PCollection。有一个解决方法,您可以在其中定义一个新的sink并将其与写入文件能够写入无界 PCollection 的转换。
请注意,截至撰写本文时 (2020-06-18),此方法不适用于 Apache Beam Python SDK BEAM-6522)。在这种情况下,这会强制解决方案改用 FastAvro。如果手动升级 dill 则可以使用 Avro
至 >= 0.3.1.1andAvro >= 1.9.0,但要小心,因为目前尚未测试。
解决方法如下:
from apache_beam.io.fileio import FileSink
from apache_beam.io.fileio import WriteToFiles
import fastavro
class AvroFileSink(FileSink):
def __init__(self, schema, codec='deflate'):
self._schema = schema
self._codec = codec
def open(self, fh):
# This is called on every new bundle.
self.writer = fastavro.write.Writer(fh, self._schema, self._codec)
def write(self, record):
# This is called on every element.
self.writer.write(record)
def flush(self):
self.writer.flush()
这个新水槽的使用方式如下:
import apache_beam as beam
# Replace the following with your schema.
schema = fastavro.schema.parse_schema({
'name': 'row',
'namespace': 'test',
'type': 'record',
'fields': [
{'name': 'a', 'type': 'int'},
],
})
# Create the sink. This will be used by the WriteToFiles transform to write
# individual elements to the Avro file.
sink = AvroFileSink(schema=schema)
with beam.Pipeline(...) as p:
lines = p | beam.ReadFromPubSub(...)
lines = ...
# This is where your new sink gets used. The WriteToFiles transform takes
# the sink and uses it to write to a directory defined by the path
# argument.
lines | WriteToFiles(path=job_options.outputLocation, sink=sink)