究竟是什么在管理光束中的水印?

2024-04-08

Beam 的强大功能来自于它先进的窗口功能,但它也有点令人困惑。

在本地测试中看到一些奇怪的情况(我使用rabbitmq作为输入源),其中消息并不总是得到ackd,并修复了不总是关闭的窗口,我开始挖掘 StackOverflow 和 Beam 代码库。

似乎对于何时设置确切的水印存在特定于源的问题:

  • RabbitMQ 水印不前进:Apache Beam:RabbitMqIO 水印不前进 https://stackoverflow.com/questions/55736593/apache-beam-rabbitmqio-watermark-doesnt-advance
  • 对于低容量,PubSub 水印不会前进:https://issues.apache.org/jira/browse/BEAM-7322 https://issues.apache.org/jira/browse/BEAM-7322
  • SQS IO 在没有新传入消息的一段时间内不会提前水印 -https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsIO。 java#L44 https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsIO.java#L44

(和别的)。此外,似乎还有独立的概念Checkpoints (CheckpointMarks) 与Watermarks.

所以我认为这是一个由多部分组成的问题:

  1. 什么代码负责移动水印?它似乎是 Source 和 Runner 的某种组合,但我实际上似乎无法find为了更好地理解它(或根据我们的用例调整它)。这对我来说是一个特殊的问题,因为在流量较低的时期,水印永远不会前进,消息也不会前进ackd
  2. 我没有看到太多关于检查点/检查点标记概念的文档(非代码 Beam 文档没有讨论它)。 CheckpointMark 如何与 Watermark 交互(如果有的话)?

  1. 每个 PCollection 都有自己的水印。水印表明完成程度那个特定的PCollection是。源对其生成的 PCollection 的水印负责。水印到下游 PCollection 的传播是自动的,无需额外的近似;可以粗略地理解为“输入PCollections和缓冲状态的最小值”。所以在你的情况下,它是RabbitMqIO查看水印问题。我不熟悉这个特定的 IO 连接器,但如果您还没有这样做的话,向用户列表发送错误报告或电子邮件会很好。
  2. 检查点是特定于源的数据片段,只要运行程序持久保留该检查点,就可以恢复读取而不会丢失消息。消息 ACK 往往发生在检查点终结中,因为运行程序在知道消息永远不需要重新读取时调用此方法。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

究竟是什么在管理光束中的水印? 的相关文章

  • 如何使用 Beam 读取大型 CSV?

    我正在尝试弄清楚如何使用 Apache Beam 读取大型 CSV 文件 我所说的 大 是指几 GB 因此将整个 CSV 一次性读入内存是不切实际的 到目前为止 我已经尝试了以下选项 使用 TextIO read 这不好 因为带引号的 CS
  • Apache Beam 每用户会话窗口未合并

    我们有一个有用户的应用程序 每个用户每次使用我们的应用程序大约 10 40 分钟 我想根据发生的特定事件 例如 该用户已转换 该用户上次会话出现问题 该用户上次会话成功 在此之后 我想计算每天这些更高级别的事件 但这是一个单独的问题 为此
  • 无法通过在 Apache Beam 中创建模板来按所需顺序运行多个管道

    我有两个独立的管道 分别为 P1 和 P2 根据我的要求 我只需要在 P1 完全完成执行后才运行 P2 我需要通过一个模板完成整个操作 基本上 模板在找到 run 方式 即 p1 run 时就被创建 所以我可以看到 我需要使用两个不同的模板
  • 优化内存密集型数据流管道的 GCP 成本

    我们希望降低在 GCP Dataflow 中运行特定 Apache Beam 管道 Python SDK 的成本 我们构建了一个内存密集型 Apache Beam 管道 每个执行器上运行需要大约 8 5 GB RAM 当前正在加载一个大型机
  • 如何获取当前滑动窗口的最大时间戳

    我正在使用 X 大小和 Y 周期的滑动时间窗口 为了标记每个窗口的输出 我想获取PCollection当前窗口的时间戳 PCollection
  • Dataflow 作业完成时通知 Google PubSub

    有没有办法在 Google Dataflow 作业完成后将消息发布到 Google Pubsub 上 我们需要通知依赖系统传入数据的处理已完成 将数据写入到接收器后 Dataflow 如何发布 EDIT 我们希望在管道完成写入 GCS 后发
  • Cloud Dataflow 中的作业失败:启用 Dataflow API

    我目前正在尝试将 Dataflow 与 Pub Sub 结合使用 但收到此错误 工作流程失败 原因 6e74e8516c0638ca 刷新您的凭据时出现问题 请检查 1 为您的项目启用Dataflow API 2 您的项目有一个机器人服务帐
  • apache beam.io.BigQuerySource use_standard_sql 作为数据流运行程序运行时不起作用

    我有一个数据流作业 我将首先从 bigquery 查询中读取 在标准 sql 中 它在直接运行模式下完美运行 但是 我尝试在数据流运行程序模式下运行此数据流并遇到此错误 响应 content 显然 use standard sql 参数在数
  • 如何从数据流中的PCollection读取bigQuery

    我有一个从 pubsub 获得的对象 PCollection 比如说 PCollection
  • Cloud SQL 增量到 BigQuery

    我需要针对我正在研究的用例之一提供一些建议 使用案例 我们在 Cloud SQL 中拥有大约 5 10 个表的数据 其中一些被视为查找表 另一些则被视为事务性表 我们需要将其发送到 BigQuery 以生成 3 4 个表 扁平化 嵌套或非规
  • 数据流:将 Top 模块与 Python SDK 结合使用:单元素 PCollection

    我正在查看 incubator beam 存储库上的 word counting py 示例 从数据流文档链接 我想修改它以获得n 出现次数最多的 这是我的管道 counts lines split gt gt beam ParDo Wor
  • Apache Beam:DoFn 与 PTransform

    Both DoFn and PTransform是一种定义操作的方法PCollection 我们如何知道何时使用哪个 理解它的一个简单方法是类比map f 对于列表 高阶函数map将函数应用于列表的每个元素 返回结果的新列表 您可以将其称为
  • 从 Dataflow 写入 BigQuery - 作业完成时不会删除 JSON 文件

    我们的 Dataflow 作业之一将其输出写入 BigQuery 我对其幕后实现方式的理解是 Dataflow 实际上将 JSON 格式的结果 分片 写入 GCS 然后启动 BigQuery 加载作业以导入该数据 但是 我们注意到 无论作业
  • Dataflow 2.1.0 中是否有 IntrabundleParallelization 的替代方案?

    根据 dataflow 2 X 的发行说明 IntraBundleParallelization 已被删除 有没有办法控制 增加数据流 2 1 0 上 DoFns 的并行度 当我在 1 9 0 版本的数据流上使用 IntrabundlePa
  • 在 Apache Beam 中监视与文件模式匹配的新文件

    我在 GCS 或其他受支持的文件系统上有一个目录 外部进程正在向该目录写入新文件 我想编写一个 Apache Beam 流式传输管道 它可以连续监视此目录中的新文件 并在每个新文件到达时读取和处理它 这可能吗 从 Apache Beam 2
  • 写入 BigQuery 时处理卡住

    我正在使用云数据流将数据从 Pub Sub 消息导入到 BigQuery 表 我正在使用 DynamicDestinations 因为这些消息可以放入不同的表中 我最近注意到该进程开始消耗所有资源 并且消息表明该进程被卡住开始显示 Proc
  • 在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道

    我正在尝试在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道 但我一直遇到此错误Failed to construct instance from factory method Da
  • 压缩保存在Google云存储中的文件

    是否可以压缩已保存在 Google 云存储中的文件 这些文件由 Google 数据流代码创建和填充 数据流无法写入压缩文件 但我的要求是将其保存为压缩格式 标准 TextIO Sink 不支持写入压缩文件 因为从压缩文件中读取的可扩展性较差
  • 计算一次 GroupBy,然后将其传递给 Google DataFlow (Python SDK) 中的多个转换

    我正在使用适用于 Apache Beam 的 Python SDK 在 Google DataFlow 上运行特征提取管道 我需要运行多个转换 所有这些转换都希望项目按键分组 基于这个答案question https stackoverfl
  • Dataflow SQL (GCP) 不支持使用 STRUCT 的嵌套行

    使用 Dataflow SQL 我想读取 Pub Sub 主题 丰富消息并将消息写入 Pub Sub 主题 哪个 Dataflow SQL 查询将创建我想要的输出消息 Pub Sub input消息 event timestamp 1619

随机推荐