Apache Beam:具有无限源的批处理管道

2024-05-04

我目前正在使用 Apache Beam 和 Google Dataflow 来处理实时数据。数据来自Google PubSub,它是无限制的,所以目前我正在使用流媒体管道。然而,事实证明,拥有一个 24/7 运行的流管道是相当昂贵的。为了降低成本,我正在考虑切换到以固定时间间隔(例如每 30 分钟)运行的批处理管道,因为对于用户来说实时处理并不重要。

我想知道是否可以使用 PubSub 订阅作为有限源?我的想法是,每次运行作业时,都会在触发之前积累1分钟的数据。到目前为止,这似乎不可能,但我遇到过一个名为BoundedReadFromUnboundedSource https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.html(我不知道如何使用),所以也许有办法吗?

下面是源的大致样子:

PCollection<MyData> data = pipeline
            .apply("ReadData", PubsubIO
                    .readMessagesWithAttributes()
                    .fromSubscription(options.getInput()))
            .apply("ParseData", ParDo.of(new ParseMyDataFn()))
            .apply("Window", Window
                    .<MyData>into(new GlobalWindows())
                    .triggering(Repeatedly
                            .forever(AfterProcessingTime
                                    .pastFirstElementInPane()
                                    .plusDelayOf(Duration.standardSeconds(5))
                            )
                    )
                    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
            );

我尝试执行以下操作,但作业仍然以流模式运行:

PCollection<MyData> data = pipeline
            .apply("ReadData", PubsubIO
                    .readMessagesWithAttributes()
                    .fromSubscription(options.getInput()))
            .apply("ParseData", ParDo.of(new ParseMyDataFn()))

            // Is there a way to make the window trigger once and turning it into a bounded source?
            .apply("Window", Window
                    .<MyData>into(new GlobalWindows())
                    .triggering(AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardMinutes(1))
                    )
                    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
            );

这在中没有明确支持PubsubIO目前,您可以尝试定期启动流作业并在几分钟后以编程方式调用 Drain。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Beam:具有无限源的批处理管道 的相关文章

随机推荐

  • 3D 数组如何在 C 中存储?

    据我所知 C 中的数组是按行优先顺序分配的 因此 对于 2 x 3 数组 0 1 2 3 4 5 存储在内存中为 0 1 2 3 4 5 但是 如果我有一个 2 x 3 x 2 数组怎么办 0 1 2 3 4 5 and 6 7 8 9 1
  • 从左到右显示 SVG 动画

    我有两个 SVG 图像 我想将它们设置为动画 如下所示 首先显示Full Screen文本从左到右 然后用第二个 SVG 覆盖该单词Screen显示整个第二个 SVG 因此 最后我将得到第二个 svg 中的单词 Full 黑色空间 以及单词
  • CSS位置绝对和全宽问题

    我想改变 dl 下面占据全屏宽度而不更改换行和包含它的标题元素 当我尝试定位 dl 元素 参见 problematic code 部分 下面 导航获取最大宽度为 1003px 的包装器的 100 我希望它在不改变换行和标题 div 的情况下
  • Unity-3d-5 将 16:9 的图像缩放至其他分辨率

    所以我创建了一个蛇游戏 其边框是用 2d 精灵创建的 我将游戏窗口设置为 16 9 在此分辨率下图像看起来不错 然而 扩展到其他任何东西开始让游戏看起来很奇怪 我想要调整游戏窗口的大小 如何让我的精灵根据当前分辨率拉伸和收缩 我已经尝试创建
  • VS Code 和 Flutter/Dart,小部件自动完成功能不起作用

    我正在使用一些教程来学习 Flutter 而我现在正在学习的教程则使用 VSCode 事实上 与 Android Studio 相比 我更喜欢它 因为它不会使我的 Macbook 过热 到目前为止 我唯一感到沮丧的是 有很多次自动完成功能停
  • 使用 Java 中的 JTextfield (Netbeans) 使用点分隔符使输入字段自动格式化数字

    我是 Java 新手 在我的第一个 Java 程序 使用 Netbeans 中 我想添加带有点 的输入字段自动格式编号 使用 JTextfield 分隔符 这是我的简短代码 private void PayTransKeyReleased
  • 私有子网中的EKS,公共子网中的负载均衡器

    我在私有子网中运行 EKS 因此无法创建面向互联网的负载均衡器 但能够创建内部负载均衡器 有什么方法可以在公共子网中创建负载均衡器 可能是手动 并指向私有子网中 EKS 中运行的 Pod 我正在考虑创建负载均衡器链 其中外部负载均衡器将指向
  • 是否可以从父线程访问/更新子线程的资源?

    我正在用 C 语言进行套接字编程 并且对多线程完全陌生 这是我的场景 我需要一个父线程从套接字读取数据 可以说 并将其排队到其子线程的队列中 这里的问题是 如何更新子线程的队列 具体来自父线程 支持多个生产者和消费者的线程安全队列 MtQu
  • mongodb 安装 - 要求?

    有人知道在 mongo 上安装标准 ubuntu 需要多少磁盘空间和内存吗 试图找出我的 VPS 需求 没有最低要求 但我不建议在与网络服务器相同的机器上运行 Mongo MongoDB 自动使用机器上的所有空闲内存作为其缓存 http d
  • RuntimeException 内容具有 id 属性“android.R.id.list_container”的视图,该视图不是 ViewGroup 类

    我仅在使用 Android 5 0 1 的华为设备上遇到此异常 Fatal Exception java lang RuntimeException Unable to start activity ComponentInfo net ex
  • 将复数名词转换为单数名词

    如何使用 R 将复数名词转换为单数名词 我使用 tagPOS 函数来标记每个文本 然后提取所有标记为 NNS 的复数名词 但是如果我想将这些复数名词转换为单数该怎么办 library openNLP library tm acq o lt
  • 跳过痛苦的 Core Data 迁移并迁移到新的数据模型

    当我什至不关心旧数据时 我花费了大量时间将核心数据整理到新的迁移中 有没有一种方法可以删除所有现有数据并跳转到新的数据模型 而不是每次更改数据模型时都处理映射模型的麻烦 是的 只需删除商店文件并重新创建即可 我经常 至少在开发过程中 让我的
  • PrimeFaces 扩展 - 空值属性

    在我的页面上 我有
  • 在发生自调整大小之前,如何准确地为 UICollectionViewLayout 提供矩形中的元素?

    我在用着UICollectionView构建可以在网格或垂直列表布局中显示元素的 UI UICollectionViewFlowLayout不能很好地适应全角列表布局 所以我正在编写自己的UICollectionViewLayout子类 并
  • 如何在 Mac 上的 Safari 中删除所选元素的光泽?

    在 Mac 和 iOS 设备上的 Safari 中
  • 如何使用 ClickOnce 安装 COM

    我已经安装了使用 TeeChart ActiveX COM 使用 ClickOnce 绘制图表的组件 如果我使用 regsvr32 teechart8 ocx 手动注册 TeeChart 我的 应用程序工作正常 但我想要并且需要使用安装应用
  • 节点 --experimental-modules,请求的模块不提供名为的导出

    我已经安装了 Node 8 9 1 v10 5 0 中也出现同样的问题 我正在尝试在文件中使用来自 npm 包的命名导入 mjs import throttle from lodash I run node experimental mod
  • 数组中的唯一条目

    我有以下内容将前 10 个 URL 存储到会话中 function curPageURL pageURL http if SERVER HTTPS on pageURL s pageURL if SERVER SERVER PORT 80
  • 字节码相对于本机代码有哪些优点? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • Apache Beam:具有无限源的批处理管道

    我目前正在使用 Apache Beam 和 Google Dataflow 来处理实时数据 数据来自Google PubSub 它是无限制的 所以目前我正在使用流媒体管道 然而 事实证明 拥有一个 24 7 运行的流管道是相当昂贵的 为了降