所以我在airflow中有一个非常好的DAG,它基本上在二进制文件上运行几个分析步骤(作为airflow插件实现)。 DAG 由 ftp 传感器触发,该传感器仅检查 ftp 服务器上是否有新文件,然后启动整个工作流程。
所以目前的工作流程是这样的:DAG 按照定义触发 -> 传感器等待 ftp 上的新文件 -> 执行分析步骤 -> 工作流程结束。
我想要的是这样的:DAG 是触发器 -> 传感器等待 ftp 上的新文件 -> 对于 ftp 上的每个文件,分析步骤单独执行 -> 每个工作流程单独结束。
如何让分析工作流程对 ftp 服务器上的每个文件执行,如果服务器上没有文件,则只有一个传感器应该等待新文件?
例如,我不想每秒启动一个 DAG,因为那时我有许多传感器正在等待新文件。
使用 2 个 DAG 将传感步骤与分析步骤分开。
DAG 1:
传感器等待 ftp 上的新文件 -> 一旦新文件到达,使用 TriggerDagRunOperator 触发 DAG 1 本身 -> 使用 TriggerDagRunOperator 触发 DAG 2
DAG 2:
对文件进行分析步骤
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)