我正在尝试从 GCS 存储桶加载数据并将内容发布到 pubsub 和 bigquery。这些是我的管道选项:
options = PipelineOptions(
project = project,
temp_location = "gs://dataflow-example-bucket6721/temp21/",
region = 'us-east1',
job_name = "dataflow2-pubsub-09072021",
machine_type = 'e2-standard-2',
)
这是我的管道
data = p | 'CreateData' >> beam.Create(sum([fileName()], []))
jsonFile = data | "filterJson" >> beam.Filter(filterJsonfile)
JsonData = jsonFile | "JsonData" >> beam.Map(readFromJson)
split_data = JsonData | 'Split Data' >> ParDo(CheckForValidData()).with_outputs("ValidData", "InvalidData")
ValidData = split_data.ValidData
InvalidData = split_data.InvalidData
data_ = split_data[None]
publish_data = ValidData | "Publish msg" >> ParDo(publishMsg())
ToBQ = ValidData | "To BQ" >> beam.io.WriteToBigQuery(
table_spec,
#schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
数据在 InteractiveRunner 中流动良好,但在 DataflowRunner 中显示错误,例如
ValueError:无效的 GCS 位置:无。
使用 FILE_LOADS 方法写入 BigQuery 需要提供 GCS 位置来写入要加载到 BigQuery 中的文件。请通过 WriteToBigQuery 构造函数中的 custom_gcs_temp_location 或后备选项 --temp_location 提供 GCS 存储桶,或将 method="STREAMING_INSERTS" 传递给 WriteToBigQuery。 [运行“[15]时:至 BQ/BigQueryBatchFileLoads/GenerateFilePrefix”]
显示 GCS 位置错误,建议添加 temp_location。但我已经添加了 temp_location。