Apache Beam 云数据流流卡住侧输入

2023-11-24

我目前正在 GCP Dataflow 中构建 PoC Apache Beam 管道。在本例中,我想使用来自 PubSub 的主输入和来自 BigQuery 的侧输入创建流式传输管道,并将处理后的数据存储回 BigQuery。

侧管线代码

side_pipeline = (
    p
    | "periodic" >> PeriodicImpulse(fire_interval=3600, apply_windowing=True)
    | "map to read request" >>
        beam.Map(lambda x:beam.io.gcp.bigquery.ReadFromBigQueryRequest(table=side_table))
    | beam.io.ReadAllFromBigQuery()
)

侧面输入代码功能

def enrich_payload(payload, equipments):
    id = payload["id"]
    for equipment in equipments:
        if id == equipment["id"]:
            payload["type"] = equipment["type"]
            payload["brand"] = equipment["brand"]
            payload["year"] = equipment["year"]

            break

    return payload

主管道代码

main_pipeline = (
    p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/topiq")
    | "bytes to dict" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
    | "transform" >> beam.Map(transform_function)
    | "timestamping" >> beam.Map(lambda src: window.TimestampedValue(
        src,
        dt.datetime.fromisoformat(src["timestamp"]).timestamp()
    ))
    | "windowing" >> beam.WindowInto(window.FixedWindows(30))
)

final_pipeline = (
    main_pipeline
    | "enrich data" >> beam.Map(enrich_payload, equipments=beam.pvalue.AsIter(side_pipeline))
    | "store" >> beam.io.WriteToBigQuery(bq_table)
)

result = p.run()
result.wait_until_finish()

将其部署到 Dataflow 后,一切看起来都很好,没有错误。但后来我注意到enrich data步骤有两个节点而不是一个。

Dataflow Graph

And also, the side input stuck as you can see it has Elements Added with 21 counts in Input Collections and - value in Elements Added in Output Collections. Enrich data stuck

您可以找到完整的管道代码here

我已经遵循这些文档中的所有说明:

  • https://beam.apache.org/documentation/patterns/side-inputs/
  • https://beam.apache.org/releases/pydoc/2.35.0/apache_beam.io.gcp.bigquery.html

但还是发现了这个错误。请帮我。谢谢!


这里有一个工作示例:

mytopic = ""
sql = "SELECT station_id, CURRENT_TIMESTAMP() timestamp FROM `bigquery-public-data.austin_bikeshare.bikeshare_stations` LIMIT 10"

def to_bqrequest(e, sql):
    from apache_beam.io import ReadFromBigQueryRequest
    yield ReadFromBigQueryRequest(query=sql)
     

def merge(e, side):
    for i in side:
        yield f"Main {e.decode('utf-8')} Side {i}"

pubsub = p | "Read PubSub topic" >> ReadFromPubSub(topic=mytopic)

side_pcol = (p | PeriodicImpulse(fire_interval=300, apply_windowing=False)
               | "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
                                           trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
                                           accumulation_mode=trigger.AccumulationMode.DISCARDING)
               | "To BQ Request" >> ParDo(to_bqrequest, sql=sql)
               | ReadAllFromBigQuery()
            )

final = (pubsub | "Merge" >> ParDo(merge, side=beam.pvalue.AsList(side_pcol))
                | Map(logging.info)
        )                    
    
p.run()

请注意,这使用了GlobalWindow(以便两个输入具有相同的窗口)。我使用了处理时间触发器,以便该窗格包含多行。5是任意选择的,使用1也会起作用的。

请注意,侧输入和主输入之间的数据匹配是不确定性,您可能会看到旧的触发窗格中的值波动。

理论上,使用FixedWindows应该解决这个问题,但我无法得到FixedWindows上班。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Beam 云数据流流卡住侧输入 的相关文章

随机推荐

  • 如何将计算列添加到我的 EF4 模型?

    给定 MS SQL 2008 中的 用户 表和 登录 表 CREATE TABLE dbo User User UserID int IDENTITY 1000 1 NOT NULL UserName varchar 63 NOT NULL
  • 如何解决读取问候语数据包时出现错误?

    我正在尝试连接到 NetBeans 中的服务器 我写的代码如下 运行此代码会返回此错误 wlecome Warning mysqli connect MySQL server has gone away in C xampp htdocs
  • C 和 C++ 中的 static 和 extern 全局变量

    我制作了 2 个项目 第一个项目使用 C 语言 第二个项目使用 C 语言 两者都具有相同的行为 C项目 header h int varGlobal 7 main c include
  • 在 C++ 中,如何在运行时获取给定元素的模板类型?

    我正在设计一个简单的Array类能够保存任何类型的对象 就像一个向量可以在一个对象中保存多种类型的数据 这是为了学习目的 我有一个名为的空基类Container class Container 还有一个名为的模板化子类Object temp
  • Flex 项目在 Chrome 和 IE11 中重叠

    我正在尝试创建一个固定高度的 Flexbox 布局 当内部内容太大时 它会滚动内部内容 另外 如果内容不会导致滚动 我想修复一个带有按钮的 div 到容器底部 我有一个在 Firefox 中完美运行的布局 但在 Chrome 中 当底部按钮
  • 替换单列值

    如何替换数据框单列中的值 例如 dataz 列中的所有 0 值均变为 1 datay dataz 1 0 100 2 2 101 3 3 102 4 4 103 5 10 0 6 11 0 7 0 0 8 0 0 9 0 0 10 12 1
  • 检查函数参数的最佳方法? [关闭]

    Closed 这个问题是基于意见的 目前不接受答案 Locked 这个问题及其答案是locked因为这个问题是题外话 但却具有历史意义 目前不接受新的答案或互动 我正在寻找一种有效的方法来检查 Python 函数的变量 例如 我想检查参数类
  • TaskCancellationException 如何避免成功控制流上的异常?

    在我们的应用程序中 我们大量使用异步 等待和任务 因此 它确实大量使用 Task Run 有时使用内置的取消支持CancellationToken public Task DoSomethingAsync CancellationToken
  • 使用二叉索引树进行 RMQ 扩展

    The RMQ问题可以这样扩展 给定的是一个数组n整数A 查询 x y 给定两个整数 1 x y n 找到最小值A x A x 1 A y 更新 x v 给定一个整数v且 1 x n do A x v 这个问题可以解决O log n 对于这
  • 当我为 Android RatingBar 使用自定义星星时,对于低于 0.5 的小数值始终显示半星

    我查了很多帖子 例如Android RatingBar更改星星颜色 更改评级栏中星星的颜色 其中评级栏是在android中动态创建的 如何设置评分栏的星星颜色 以更改评级栏中星星的颜色 我关注了这些帖子 并且能够更改自定义评级栏的星星 但在
  • HTML5 视频上一个 - 下一个和自动播放

    我是这个网站的新手 也是 HTML5 和 Javascript 的新手 并不是说我是初学者 当我看到它时 我有点了解 HTML5 和 Javascript 只是我自己无法正确编写它 我有很多视频 都是 mp4 大小相同 都在服务器上的同一个
  • 我应该如何使用区域获取 aws 区域名称

    您好 我想使用区域手段获取亚马逊网络服务 aws 区域名称 region is us east 1 region name is US East N Virginia region is us west 2 region name is U
  • Spring-Data-Elasticsearch 在底层使用什么 Elasticsearch 客户端?

    我想在我的项目中使用 Spring Data Elasticsearch 我看到了这个 The well known TransportClient is deprecated as of Elasticsearch 7 0 0 and i
  • 新 Ember 路由器的访问实例

    如何访问新 Ember 路由器的实例 API 文档似乎是指旧路由器或不正确 http emberjs com api classes Ember Router html RouterV2 不容易通过全局常量访问 这使得以 错误 方式做事变得
  • 使用 Base64UrlEncode 语句[关闭]

    Closed 这个问题是无法重现或由拼写错误引起 目前不接受答案 我正在尝试通过代码发送电子邮件 但遇到了障碍 我当时正在工作this当 Base64UrlEncode 显示为红色时 我的代码中有相同的 using 语句 using Sys
  • 电话号码国家代码列表[关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 目前不接受答案 Locked 这个问题及其答案是locked因为这个问题是题外话 但却具有历史意义 目前不接受新的答案或互动 On this 维基百科条目我发现国际
  • 回形针附件文件大小

    如何获取回形针附件每种样式的文件大小 user attachment file size似乎不起作用 user attachment style size 给出与实际文件大小无关的数字 我没有找到如何获取文件大小对于给定的风格 除了原来的
  • 在boost中定义斐波那契堆的比较函数

    我需要在我的项目中使用斐波那契堆 并且我正在尝试从 boost 库使用它 但我无法弄清楚如何为任意数据类型设置用户定义的比较函数 我需要为结构节点构造一个最小堆 定义如下 struct node int id int weight stru
  • 标识符为“”的应用程序 ID 不可用。请输入不同的字符串

    我正在尝试添加新的 APP ID 来准备 App Store 提交 但在我提供的捆绑包 ID 下出现以下错误 带有标识符的应用程序 ID com domainName AppName 不可用 请输入不同的字符串 这是什么意思 我正在尝试添加
  • Apache Beam 云数据流流卡住侧输入

    我目前正在 GCP Dataflow 中构建 PoC Apache Beam 管道 在本例中 我想使用来自 PubSub 的主输入和来自 BigQuery 的侧输入创建流式传输管道 并将处理后的数据存储回 BigQuery 侧管线代码 si