我目前正在 GCP Dataflow 中构建 PoC Apache Beam 管道。在本例中,我想使用来自 PubSub 的主输入和来自 BigQuery 的侧输入创建流式传输管道,并将处理后的数据存储回 BigQuery。
侧管线代码
side_pipeline = (
p
| "periodic" >> PeriodicImpulse(fire_interval=3600, apply_windowing=True)
| "map to read request" >>
beam.Map(lambda x:beam.io.gcp.bigquery.ReadFromBigQueryRequest(table=side_table))
| beam.io.ReadAllFromBigQuery()
)
侧面输入代码功能
def enrich_payload(payload, equipments):
id = payload["id"]
for equipment in equipments:
if id == equipment["id"]:
payload["type"] = equipment["type"]
payload["brand"] = equipment["brand"]
payload["year"] = equipment["year"]
break
return payload
主管道代码
main_pipeline = (
p
| "read" >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/topiq")
| "bytes to dict" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
| "transform" >> beam.Map(transform_function)
| "timestamping" >> beam.Map(lambda src: window.TimestampedValue(
src,
dt.datetime.fromisoformat(src["timestamp"]).timestamp()
))
| "windowing" >> beam.WindowInto(window.FixedWindows(30))
)
final_pipeline = (
main_pipeline
| "enrich data" >> beam.Map(enrich_payload, equipments=beam.pvalue.AsIter(side_pipeline))
| "store" >> beam.io.WriteToBigQuery(bq_table)
)
result = p.run()
result.wait_until_finish()
将其部署到 Dataflow 后,一切看起来都很好,没有错误。但后来我注意到enrich data
步骤有两个节点而不是一个。
![Dataflow Graph](https://i.stack.imgur.com/FOl6w.png)
And also, the side input stuck as you can see it has Elements Added
with 21 counts in Input Collections and -
value in Elements Added
in Output Collections.
![Enrich data stuck](https://i.stack.imgur.com/gCB9A.png)
您可以找到完整的管道代码here
我已经遵循这些文档中的所有说明:
- https://beam.apache.org/documentation/patterns/side-inputs/
- https://beam.apache.org/releases/pydoc/2.35.0/apache_beam.io.gcp.bigquery.html
但还是发现了这个错误。请帮我。谢谢!