一个数据流作业内的并行管道

2023-12-23

我想在 GCP 上的一个数据流作业中运行两个并行管道。我已经创建了一个管道,它工作得很好,但我想要另一个管道而不创建另一份工作。

我已经搜索了很多答案,但找不到任何代码示例:(

如果我这样运行它就不起作用:

pipe1.run();
pipe2.run();

它给我“已经有一个活动的作业名称...如果您想提交第二个作业,请再次尝试使用设置不同的名称--jobName"


您可以将其他输入应用到管道,这将在一项作业中产生一个单独的管道。例如。:

public class ExamplePipeline {

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
    PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
            ParDo.of(new DoFn<String, String>() {

        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline one:" + c.element());
            c.output(c.element() + " extra message.");
        }

    }));
    linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));

    PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
    linesForPipelineTwo.apply("Pipeline 2 transoform",
            ParDo.of(new DoFn<String, String>() {

        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline two:" + c.element());
        }

    }));

    pipeline.run();
}

正如您所看到的,您也可以将两个(或更多)单独的 PBegin 应用于具有多个 PDone/Sink 的管道。在这个例子中"pipeline 1"转储并将输出写入文件"pipeline 2"仅将其转储到屏幕上。

如果你运行这个DataflowRunner在 GCP 上,GUI 将向您显示 2 个未连接的“管道”。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

一个数据流作业内的并行管道 的相关文章

  • 哪个类调用了我的静态方法?

    假设我有一个带有静态方法的 Java 类 如下所示 class A static void foo Which class invoked me 进一步假设 A 类有任意数量的子类 class B extends A class C ext
  • 在 MongoDB Java 驱动程序中如何使用 $filter

    我有一个适用于 MQL 的查询 我需要将其翻译成Java MQL 中的查询如下所示 db
  • 按下按钮时清除编辑文本焦点并隐藏键盘

    我正在制作一个带有编辑文本和按钮的应用程序 当我在 edittext 中输入内容然后单击按钮时 我希望键盘和焦点在 edittext 上消失 但我似乎无法做到这一点 我在 XML 中插入了这两行代码 android focusable tr
  • 如何将列表转换为地图?

    最近我和一位同事讨论了转换的最佳方式是什么List to Map在 Java 中 这样做是否有任何具体的好处 我想知道最佳的转换方法 如果有人可以指导我 我将非常感激 这是个好方法吗 List
  • 使用 java 的 RAR 档案 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 将 JSON Map 传递到 Spring MVC 控制器

    我正在尝试将 Map 的 JSON 表示形式作为 POST 参数发送到我的控制器中 RequestMapping value search do method RequestMethod GET consumes application j
  • 在 Java 中填充布尔数组

    作为一名相当新手的 Java 程序员 我给自己设定了一个艰巨的挑战 尝试编写一个简单的文本冒险 不出所料 我已经遇到了困难 我试图为我的 Location 类提供一个属性来存储它包含的退出 我为此使用了一个布尔数组 本质上保存代表每个出口的
  • DateTimeFormatter.parseLocalDate 抛出 UnsupportedOperationException

    该API用于解析本地日期 http joda time sourceforge net apidocs org joda time format DateTimeFormatter html parseLocalDate 28java la
  • 我需要显式关闭连接吗?

    我持有一个实例MongoClient and DB在我的应用程序中 每次我想执行某些操作时 我都会调用getCollection 我想知道是否需要显式关闭连接 就像connection close 在 JDBC 中 强调一下 我只有一个Mo
  • 根据哈希值确认文件内容

    我需要 检查完整性 content文件数量 文件将写入 CD DVD 可能会被复制多次 这个想法是识别正确复制的副本 在从 Nero 等中删除它们之后 我对此很陌生 但快速搜索表明Arrays hashCode byte http down
  • Jodatime 日期格式

    是否可以格式化 JodaTime 日期 这是代码 private static LocalDate priorDay LocalDate date1 do date1 date1 plusDays 1 while date1 getDayO
  • 将 EditText 聚焦在设备上运行的 PopupWindow 中时出现异常

    我正在为 Android 开发一个弹出窗口 它正在工作 我在上面添加了一个 EditText 和一个按钮 当在 ADV 上运行时 它可以正常工作 而在设备上运行时 当我专注于 EditText 时 这会抛出一个奇怪的异常 android v
  • 如何在 BigQuery 中将列数据拆分为每 6 个字符并形成行

    我需要将列数据拆分为每 6 个字符 Input col1 col2 d1 X11 F11 1000KG123456 d2 X22 F22 3500Kabcdefgh Expecting col1 col2 d1 X11 d1 F11 d1
  • 在 Java 中使用 Inflater 解压缩 gzip 数据

    我正在尝试使用以下方法解压缩 gzip 数据Inflater 根据文档 如果参数 nowrap 为 true 则 ZLIB 标头和校验和 字段将不会被使用 这提供了与 GZIP 和 PKZIP 使用的压缩格式 注意 使用 nowrap 选项
  • java Runtime.getRunTime().exec 和通配符?

    我正在尝试使用删除垃圾文件 Process p Runtime getRuntime exec 只要我不使用通配符 它 就可以正常工作 即 Process p Runtime getRuntime exec bin rm f specifi
  • Java .split("|") 不工作

    我刚刚遇到了一个问题分割法 http docs oracle com javase 6 docs api java lang String html split 28java lang String 29for 字符串不适用于字符 作为一个
  • 云函数 onUpdate:无法读取未定义的属性“forEach”

    现在我正在尝试更新我的项目中的图片 我可以更新云火商店中的图片网址 但我也想使用 firebase 云功能从云存储中删除上一张图片 我想要实现的是 当我上传新图片时 从云存储中删除以前的图片 This is my data structur
  • 计算移动的球与移动的线/多边形碰撞的时间(2D)

    我有一个多边形 里面有一个移动的球 如果球撞到边界 它应该反弹回来 My current solution I split the polygon in lines and calculate when the ball hits the
  • Drools:为什么是无状态会话?

    Drools 使用会话来存储运行时数据 为此 有两种会话 无状态和有状态 与无状态会话相比 有状态会话允许迭代调用 并且似乎比无状态会话具有所有优势 那么为什么会有无状态会话呢 他们服务的目的是什么 与有状态会话相比 它们的优势是什么 谢谢
  • 如何在 Servlet 中打开弹出窗口,然后重定向页面

    我想在调用 servlet 时打开一个弹出窗口 然后想将 servlet 重定向到某个 jsp page 这就是我所做的 protected void doGet HttpServletRequest request HttpServlet

随机推荐

  • 位置无关可执行文件的正确 Xcode 设置是什么

    最近刚刚开始收到一封应用商店提交后的电子邮件 其中包含以下建议 请确保您的构建设置已配置为创建 PIE 可执行文件 然而 XCode 中的设置看起来是正确的 在链接部分我发现 不创建位置无关的可执行文件 设置为 否 双重否定YUK 您收到此
  • Android排序数组

    我如何按日期或名称对该数组进行排序 String datetable new String 21 2 datetable 0 0 2011 01 01 datetable 0 1 Name1 datetable 1 0 2011 01 03
  • 为什么宽度/高度不适用于非定位伪元素?

    我想设置一个width of before伪元素达到80 如果我使用定位 那么一切都会正常 但如果我不使用它 那么一切都会失败 你能解释一下为什么百分比宽度在没有定位的情况下不起作用吗 如果可以的话 请添加一些对规范的引用 position
  • jQuery 方法链接是流畅编程的一个例子吗?

    我对 JavaScript jQuery 有点陌生 但是当我看到方法链接的示例时 我立即感到熟悉 其他接口 如 LINQ 执行类似的操作 其中一组方法的返回类型与它们所操作的类型相同 TweetSharp 执行的操作非常类似 这是流畅编程的
  • 从 CSV 文件批量插入 - 跳过重复项

    更新 最终使用了 Johnny Bubriski 创建的这个方法 然后对其进行了一些修改以跳过重复项 效果就像一个魅力 而且速度显然相当快 关联 http johnnycode com 2013 08 19 using c sharp sq
  • 向 Django FlatPages 添加功能,无需更改原始 Django 应用程序

    我想向 Django FlatPage 数据库模型添加一个字段 但我真的不知道如何在不编辑原始应用程序的情况下扩展它 我想要做的是将以下字段添加到模型中 from django db import models from django co
  • 在 nginx 上找不到 Laravel 路由

    当我尝试访问我的测试应用程序时 只有索引路由有效 malte italoborg es http malte italoborg es 如果我尝试访问另一条路线 例如 malte italoborg es admin http malte
  • 我可以使用 jQuery 检查是否至少有一个复选框被选中吗?

    我有以下 HTML 表单 其中可以有许多复选框 单击提交按钮时 我希望用户收到一个 javascript 警报 以检查至少一个复选框 如果未选中 有没有一种简单的方法可以使用 jQuery 来做到这一点
  • Android:API 级别低于 19 的远程 Webview 调试?

    据我所知 远程调试通过chrome inspect已在 API 级别 19 中添加用于 Web 视图 不过 我正在开发一个支持 17 设备的应用程序 只是在 API 19 上 我得到了02 28 00 31 16 569 12332 123
  • 在 R 中将 LASSO 与分类变量结合使用

    我有一个包含 1000 个观察值和 76 个变量的数据集 其中大约 20 个是分类变量 我想对整个数据集使用 LASSO 我知道通过 lars 或 glmnet 在 LASSO 中使用因子变量并不能真正起作用 但是变量太多 并且它们可以采用
  • 半六角形,只有一个元件

    我试图复制以下形状但没有成功 我想我需要一些 before and after伪元素以及以下 css pentagon position relative width 78px height 50px background 3a93d0 使
  • 当我在 Haskell 中编写“show”和“read”时发生了什么?

    以下是 GHCi 的简短文字记录 Prelude gt t read read Read a gt String gt a Prelude gt t show show Show a gt a gt String Prelude gt t
  • 使用计时器显示文本 3 秒?

    是否可以使用计时器在标签中显示文本 3 秒左右 F E 当您保存某些内容并且成功时 您会收到一条短信 成功 3秒后返回原页面 有人知道如何使用标签或消息框来做到这一点吗 是的 有可能 您可以在将标签文本设置为 成功 的位置启动计时器 并将其
  • elasticsearch 5.5突出显示字段不起作用

    我测试了弹性搜索突出显示字段功能 它工作正常 我用了弹性搜索2 4 4 and spring data elasticsearch 2 0 0 RELEASE 示例代码在下面的帖子中 如何使用 Spring data elasticsear
  • 如果该集合不可在进程之间整除,则使用 MPI_Scatter

    我有一个使用 MPI Scatter 和 MPI Gather 的程序 该程序将整数 N 作为输入 并返回从 2 到 N 的质数 我创建一个包含从 2 到 N 的数字的数组 并使用 MPI Scatter 将数组拆分为 N procs 数量
  • GCC 转储预处理器定义

    gcc g 有没有办法从命令行转储其默认预处理器定义 我的意思是像 GNUC STDC 等等 是的 使用 E dM选项而不是 c 示例 将它们输出到标准输出 echo gcc dM E echo clang dM E For C echo
  • Microsoft 整数文字扩展 - 在哪里记录?

    我在 Windows 安装的标准 stdint h 头文件中遇到了一些整数文字 文字具有以下形式的后缀 i8 ui8 i16 ui16 i32 ui32 i64 ui64 我以前遇到过 i64 形式的后缀 但从未遇到过任何其他形式的后缀 我
  • 尝试改进 Encode::decode 警告消息:$SIG{__WARN__} 处理程序中的段错误

    我正在尝试改进发出的警告消息Encode decode https metacpan org pod Encode FB WARN 我希望它打印正在读取的文件的名称以及在该文件中找到格式错误的数据的行号 而不是打印模块的名称和模块中的行号
  • Oracle STANDARD_HASH 在 PLSQL 中不可用?

    我正在尝试在 PL SQL 中使用 STANDARD HASH Oracle 12c 函数 但似乎不可用 SQL gt exec dbms output put line STANDARD HASH test BEGIN dbms outp
  • 一个数据流作业内的并行管道

    我想在 GCP 上的一个数据流作业中运行两个并行管道 我已经创建了一个管道 它工作得很好 但我想要另一个管道而不创建另一份工作 我已经搜索了很多答案 但找不到任何代码示例 如果我这样运行它就不起作用 pipe1 run pipe2 run