如何使用 Apache Beam 从 Google Pub/Sub 访问消息 ID?

2024-04-27

我一直在 Python 2.7.16 上使用 2.13.0 SDK 测试 Apache Beam,以流模式从 Google Pub/Sub 订阅中提取简单消息,并写入 Google Big Query 表。作为此操作的一部分,我尝试使用 Pub/Sub 消息 ID 进行重复数据删除,但我似乎根本无法将其取出。

The ReadFromPubSub 方法的文档 https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub and 发布订阅消息类型 https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.PubsubMessage建议服务生成的 KV(例如 id_label)应作为 attribute 属性的一部分返回,但它们似乎并未返回。

请注意,仅在使用数据流运行器时才支持 id_label 参数。

发送消息的代码

import time
import json
from datetime import datetime

from google.cloud import pubsub_v1

project_id = "[YOUR PROJECT]"
topic_name = "test-apache-beam"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

def callback(message_future):
    if message_future.exception(timeout=30):
        print ('Publishing message {} threw an Exception {}.'.format(topic_name, message_future.exception()))
    else:
        print(message_future.result())

for n in range(1,11):
    data = {'rownumber':n}
    jsondata = json.dumps(data)
    message_future = publisher.publish(topic_path, data=jsondata, source='python', timestamp=datetime.now().strftime("%Y-%b-%d (%H:%M:%S:%f)"))
    message_future.add_done_callback(callback)

print('Published message IDs:')

Beam 管道代码:-

from __future__ import absolute_import

import argparse
import logging
import re
import json
import time
import datetime
import base64
import pprint

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import ReadFromPubSub
from apache_beam.io import ReadStringsFromPubSub
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms.trigger import AfterProcessingTime
from apache_beam.transforms.trigger import AccumulationMode

def format_message_element(message, timestamp=beam.DoFn.TimestampParam):

    data = json.loads(message.data)
    attribs = message.attributes

    fullmessage = {'data' : data,
                   'attributes' : attribs,
                   'attribstring' : str(message.attributes)}

    return fullmessage

def run(argv=None):

    parser = argparse.ArgumentParser()
    input_group = parser.add_mutually_exclusive_group(required=True)
    input_group.add_argument(
                        '--input_subscription',
                        dest='input_subscription',
                        help=('Input PubSub subscription of the form '
                        '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
    input_group.add_argument(
                        '--test_input',
                        action="store_true",
                        default=False
    )
    group = parser.add_mutually_exclusive_group(required=True) 
    group.add_argument(
      '--output_table',
      dest='output_table',
      help=
      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))
    group.add_argument(
        '--output_file',
        dest='output_file',
        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True

    if known_args.input_subscription:
        options.view_as(StandardOptions).streaming=True

    with beam.Pipeline(options=options) as p:

        from apache_beam.io.gcp.internal.clients import bigquery

        table_schema = bigquery.TableSchema()

        attribfield = bigquery.TableFieldSchema()
        attribfield.name = 'attributes'
        attribfield.type = 'record'
        attribfield.mode = 'nullable'

        attribsource = bigquery.TableFieldSchema()
        attribsource.name = 'source'
        attribsource.type = 'string'
        attribsource.mode = 'nullable'

        attribtimestamp = bigquery.TableFieldSchema()
        attribtimestamp.name = 'timestamp'
        attribtimestamp.type = 'string'
        attribtimestamp.mode = 'nullable'

        attribfield.fields.append(attribsource)
        attribfield.fields.append(attribtimestamp)
        table_schema.fields.append(attribfield)

        datafield = bigquery.TableFieldSchema()
        datafield.name = 'data'
        datafield.type = 'record'
        datafield.mode = 'nullable'

        datanumberfield = bigquery.TableFieldSchema()
        datanumberfield.name = 'rownumber'
        datanumberfield.type = 'integer'
        datanumberfield.mode = 'nullable'
        datafield.fields.append(datanumberfield)
        table_schema.fields.append(datafield)

        attribstringfield = bigquery.TableFieldSchema()
        attribstringfield.name = 'attribstring'
        attribstringfield.type = 'string'
        attribstringfield.mode = 'nullable'
        table_schema.fields.append(attribstringfield)

        if known_args.input_subscription:
            messages = (p
            | 'Read From Pub Sub' >> ReadFromPubSub(subscription=known_args.input_subscription,with_attributes=True,id_label='message_id')
            | 'Format Message' >> beam.Map(format_message_element)
            )

            output = (messages | 'write' >> beam.io.WriteToBigQuery(
                        known_args.output_table,
                        schema=table_schema,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                    )

    result = p.run()
    result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

以及运行 python 脚本的代码:-

python PythonTestMessageId.py --runner DataflowRunner --project [YOURPROJECT] --input_subscription projects/[YOURPROJECT]/subscriptions/test-apache-beam.subscription --output_table [YOURPROJECT]:test.newtest --temp_location gs://[YOURPROJECT]/tmp --job_name test-job

在提供的代码中,我只是将 Attributes 属性的字典转换为字符串,然后插入到 BigQuery 表中。表中返回的数据如下所示:-

正如您所看到的,attributes 字段中的两个属性只是我传入的属性,并且 PubSub 消息 id 不可用。

有没有办法可以退回这个?


这是一个已知的问题。 A错误报告 https://issues.apache.org/jira/browse/BEAM-3489已在 JIRA 中归档,用于在 PubsubMessage 中公开 message_id。请投票支持此错误报告。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Apache Beam 从 Google Pub/Sub 访问消息 ID? 的相关文章

随机推荐

  • RestKit 对象与外键的映射关系

    RestKit 是否可以在不将外键存储为属性的情况下连接关系 即直接从 JSON 中的键路径存储 特别是 我有一个 Job has many Rooms 关系 房间的 JSON 不包含作业 而是分别加载 job id 1 name John
  • 如何防止已删除的软件包在 Julia 中更新?

    该问题的标题乍一看可能令人困惑 但它是有效的 我安装了Makie jl不久前打包 然后使用成功删除它pkg gt rm Makie 今天我尝试使用以下命令更新所有软件包 如果有的话 pkg gt up 但我得到了一个令人兴奋的日志 Inst
  • fopen 或 file_get_contents 更快?

    我正在运行多个流量较高的网站 根据要求 所有图像均通过下载image php id IMAGE ID HERE 如果您以前曾经这样做过 您就会知道该文件将读取文件图像并使用特殊标头将其回显到浏览器 我的问题是 服务器上的负载非常高 150
  • 如何显示由 setTimeout/setInterval 生成的每个正在运行的线程的列表

    我想通过纯 javascript 或浏览器中的任何类型的控制台或其他方式来完成此操作 是否可以 Thanks 进一步说明 我想调试一个执行动画的库 我想知道如果有多个对象被动画化 是否会创建多个计时器 注意setTimeout 不会产生新线
  • Git:确定分支是否处于合并冲突状态

    我正在编写一个 bash 脚本来进行一些自动化操作 该脚本的一部分涉及导航到本地存储库 切换到本地 master 分支 然后拉取远程 master 以使用最新代码更新本地 master 分支 有谁知道是否有一种方法可以以编程方式确定拉取是否
  • 哪个运算符更快:!= 或 >

    哪个运算符更快 gt or 示例 我想针对 1 测试一个值 可以为正值或 1 if time gt 1 or if time 1 时间的类型为 int 标准没说 因此 这取决于给定编译器在给定版本中生成哪些操作码 以及给定 CPU 执行它们
  • dplyr 在动物园对象中发生变异

    我试图应用dplyr mutate in zoo目的 但是 它产生了一个错误 Error in UseMethod mutate no applicable method for mutate applied to an object of
  • 如果设备关闭,尝试在 IOS 应用程序中检索之前配对的蓝牙设备将不会响应失败

    很抱歉标题很长 但我们在使用 iOS 版 corebluetooth 时遇到了一个非常有趣的问题 我们正在 CBCentralManager 中发出对retrievePeripherals 的调用 并且能够找到之前配对的设备 不管设备是打开
  • Firebase 重置密码 Swift [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我想知道你们是否可以向我展示如何在 Swift 中设置重置密码 我目前正在使用 Firebase 作为我的后端服务 我只需要代码 答案
  • 用于列表和映射的 C++ 容器

    我们有一个键和值对的集合 我们需要一个容器 它可以帮助我们检索值 o 1 但也可以记住插入顺序 以便当我们进行迭代时 我们可以像插入顺序一样进行迭代 由于键是一个字符串 我们将无法使用集合或类似的结构 目前我们已经定义了自己的集合类 其中包
  • 链表、数组和硬件内存缓存

    虽然之前有人问过关于链表与数组的问题 但答案大多归结为我们大多数人在某些时候可能已经学到的东西 列表擅长插入和删除 数组擅长随机访问 现在 像 Bjarne Stroustrup 这样受人尊敬的人已经argued https www you
  • Flutter IOS 使用连接或 wifi 插件读取 wifi 名称

    这个问题是类似的这个问题 https stackoverflow com questions 52498906 how to get the wifi namessid of the currently connected wifi in
  • 是否可以有效地计算 lambda 演算项?

    我最近用 lambda 演算编写了很多程序 我希望能够实时运行其中一些程序 然而 尽管趋势函数范式基于 lambda 演算和 B 约简规则 但我找不到一个不是玩具 不以效率为目的的评估器 函数式语言应该很快 但我所知道的那些语言实际上并不提
  • Android 6.0如何隐藏导航栏?

    我有以下代码 getWindow getDecorView setSystemUiVisibility View SYSTEM UI FLAG LAYOUT STABLE View SYSTEM UI FLAG LAYOUT HIDE NA
  • 定制 NET MAUI 启动画面

    我正在尝试制作闪屏 Net毛伊岛包含渐变背景和动画徽标 我看过有关如何制作简单的飞溅的教程
  • 如何在代码中设置控件模板?

    我在 XAML 中有这个
  • 在自定义 Flex 组件中绘制叠加层

    如何在 Flex 中创建一个自定义 MXML 组件 该组件基于现有组件 但在某些情况下会在该现有组件上绘制覆盖层 理想情况下 新组件应该基于 派生自 现有组件 以便现有组件的出现可以用新组件替换 我尝试在新组件中重写 updateDispl
  • Selenium + Python如何监听元素的变化

    这个想法是创建一个机器人来读取聊天消息 所有消息都在一个ul gt li 不必写消息 例如 ul class message list li class message Hello There li li class message Hel
  • C++ 中的映射的多个键

    我有一个表 其中的条目是这样的 Row Column1 Column2 Column3 Column4 1 0X0A 1 2 A 2 0X0B 2 2 B 3 0x0C 3 2 C 现在我想使用映射 以便我可以使用第 1 列或第 2 列作为
  • 如何使用 Apache Beam 从 Google Pub/Sub 访问消息 ID?

    我一直在 Python 2 7 16 上使用 2 13 0 SDK 测试 Apache Beam 以流模式从 Google Pub Sub 订阅中提取简单消息 并写入 Google Big Query 表 作为此操作的一部分 我尝试使用 P