我有以下任务需要解决:
文件通过端点不定期发送并存储在本地。我需要为每个文件触发 DAG 运行。对于每个文件,将执行相同的任务
总体流程如下:对于每个文件,运行任务 A->B->C->D
正在批量处理文件。虽然这项任务对我来说似乎微不足道,但我找到了几种方法来完成此任务,并且我对哪一种是“正确的”方法(如果有的话)感到困惑。
第一种模式:使用实验性 REST API 触发 dag。
也就是说,公开一个 Web 服务,该服务接收请求和文件,将其存储到文件夹中,并使用实验性 REST API通过将 file_id 作为 conf 传递来触发 DAG
Cons: REST api 仍然实验性的,不确定 Airflow 如何处理同时出现许多请求的负载测试(这不应该发生,但是,如果发生了怎么办?)
第二种模式:2 个 dags。一种通过 TriggerDagOperator 进行感知和触发,一种进行处理。
始终使用与前面描述的相同的 ws,但这次它只存储文件。然后我们有:
- 第一个 dag:使用 FileSensor 和 TriggerDagOperator 来触发给定 N 个文件的 N 个 dags
- 第二个目标:任务 A->B->C
Cons:需要避免将相同的文件发送到两个不同的 DAG 运行。
例子:
文件夹 x.json 中的文件
传感器找到 x,触发 DAG (1)
传感器返回并重新安排。如果 DAG (1) 未处理/移动文件,传感器 DAG 可能会重新安排使用同一文件运行的新 DAG。这是不需要的。
第三种模式:对于文件中的文件,任务 A->B->C
正如所见这个问题.
Cons:这可行,但我不喜欢的是 UI 可能会变得混乱,因为每次 DAG 运行看起来都不一样,但它会随着正在处理的文件数量而变化。此外,如果有 1000 个文件需要处理,则运行可能会很难读取
第四种模式:使用子标签
我还不确定它们是如何完全工作的,正如我所看到的他们没有受到鼓励(最后),但是应该可以为每个文件生成一个子dag并让它运行。如同这个问题.
Cons:似乎 subdags 只能与顺序执行器一起使用。
我是否遗漏了一些东西并且过度思考了一些(在我看来)应该非常简单的东西?谢谢