我有两个独立的管道,分别为“P1”和“P2”。根据我的要求,我只需要在 P1 完全完成执行后才运行 P2。我需要通过一个模板完成整个操作。
基本上,模板在找到 run() 方式(即 p1.run())时就被创建。
所以我可以看到,我需要使用两个不同的模板处理两个不同的管道,但这不能满足我严格的基于顺序的管道执行要求。
我可以想到的另一种打电话方式p1.run()在 ParDo 内p2.run()并保持 p2 的 run() 等待,直到 p1 的 run() 完成。我尝试了这种方法,但陷入了下面给出的 IllegalArgumentException 。
java.io.NotSerializedException:PipelineOptions 对象不可序列化,不应嵌入到转换中(您是否在字段或匿名类中捕获了 PipelineOptions 对象?)。相反,如果您使用 DoFn,请在运行时通过 ProcessContext/StartBundleContext/FinishBundleContext.getPipelineOptions() 访问 PipelineOptions,或者在管道构建时从 PipelineOptions 中预先提取必要的字段。
是否根本不可能在任何转换(例如另一个管道的“Pardo”)内调用管道的 run() ?
如果是这种情况,那么如何通过创建单个模板来满足我按顺序调用两个不同管道的要求?
一个模板只能包含一个管道。为了按顺序执行两个单独的管道(每个管道都是一个模板),您需要在外部安排它们,例如通过一些工作流程管理系统(例如 Anuj 提到的,或 Airflow,或其他东西 - 你可能会从中汲取一些灵感这个帖子 https://cloud.google.com/blog/big-data/2016/04/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions例如)。
我们意识到需要在单个管道中对 Beam 中的原语进行更好的排序,但还没有具体的设计。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)