是否可以同时将 Pub/Sub 和 BigQuery 作为 Google Dataflow 中的输入?

2024-04-03

在我的项目中,我希望在 Google Dataflow 中使用流式传输管道来处理 Pub/Sub 消息。在清理输入数据时,我还希望获得来自 BigQuery 的侧面输入。这提出了一个问题,将导致两个输入之一无法工作。

我在管道选项中设置了streaming=True,这允许正确处理Pub/Sub输入。但 BigQuery 与流式传输管道不兼容(请参阅下面的链接):

https://cloud.google.com/dataflow/docs/resources/faq#what_are_the_current_limitations_of_streaming_mode https://cloud.google.com/dataflow/docs/resources/faq#what_are_the_current_limitations_of_streaming_mode

我收到此错误:“ValueError:Cloud Pub/Sub 目前仅可在流式处理管道中使用。”基于局限性,这是可以理解的。

但我只想使用 BigQuery 作为侧面输入,以便将数据映射到传入的 Pub/Sub 数据流。它在本地运行良好,但是一旦我尝试在 Dataflow 上运行它,它就会返回错误。

有没有人找到一个好的解决方法?

编辑:添加下面我的管道框架以供参考:

# Set all options needed to properly run the pipeline
options = PipelineOptions(streaming=True,
                          runner='DataflowRunner', 
                          project=project_id)

p = beam.Pipeline(options = options)

n_tbl_src = (p
         | 'Nickname Table Read' >> beam.io.Read(beam.io.BigQuerySource(
            table = nickname_spec
        )))

# This is the main Dataflow pipeline. This will clean the incoming dataset for importing into BQ.
clean_vote = (p
              | beam.io.gcp.pubsub.ReadFromPubSub(topic = None,
                                     subscription = 'projects/{0}/subscriptions/{1}'
                                                  .format(project_id, subscription_name),
                                     with_attributes = True)
              | 'Isolate Attributes' >> beam.ParDo(IsolateAttrFn())
              | 'Fix Value Types' >> beam.ParDo(FixTypesFn())
              | 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
              | 'Fix Nicknames' >> beam.ParDo(FixNicknameFn(), n_tbl=AsList(n_tbl_src))
              | 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn()))


# The final dictionary will then be written to BigQuery for storage
(clean_vote | 'Write to BQ' >> beam.io.WriteToBigQuery(
    table = bq_spec,
    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
))

# Run the pipeline
p.run()

@Pablo 上面的评论是正确的答案。对于遇到相同情况的任何人,以下是我的脚本中有效的更改。

# This opens the Beam pipeline to run Dataflow
p = beam.Pipeline(options = options)
logging.info('Created Dataflow pipeline.')

# This will pull in all of the recorded nicknames to compare to the incoming PubSubMessages.

client = bigquery.Client()
query_job = client.query("""
    select * from `{0}.{1}.{2}`""".format(project_id, dataset_id, nickname_table_id))
nickname_tbl = query_job.result()
nickname_tbl = [dict(row.items()) for row in nickname_tbl]

# This is the main Dataflow pipeline. This will clean the incoming dataset for importing into BQ.
clean_vote = (p
              | beam.io.gcp.pubsub.ReadFromPubSub(topic = None,
                                     subscription = 'projects/{0}/subscriptions/{1}'
                                                  .format(project_id, subscription_name),
                                     with_attributes = True)
              | 'Isolate Attributes' >> beam.ParDo(IsolateAttrFn())
              | 'Fix Value Types' >> beam.ParDo(FixTypesFn())
              | 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
              | 'Fix Nicknames' >> beam.ParDo(FixNicknameFn(), n_tbl=nickname_tbl)
              | 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn()))


# The final dictionary will then be written to BigQuery for storage
(clean_vote | 'Write to BQ' >> beam.io.WriteToBigQuery(
    table = bq_spec,
    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
))

# Run the pipeline
p.run()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

是否可以同时将 Pub/Sub 和 BigQuery 作为 Google Dataflow 中的输入? 的相关文章

随机推荐