如果我没理解错的话,你的条件是:
- 继续跑DAG_A daily
- Run DAG_B n一天几次
- 每次 DAG_B 运行时都会等待DAG_A__任务_1要完成的
我认为您可以通过指导轻松调整当前的设计ExternalTaskSensor
等待所需的执行日期DAG_A.
来自外部任务传感器运算符定义:
等待不同的 DAG 或不同 DAG 中的任务在特定的execution_date 内完成
That execution_date
可以使用定义execution_date_fn
范围:
execution_date_fn(可选[Callable]) – 接收当前执行日期作为第一个位置参数和上下文字典中可用的任意数量的关键字参数(可选)的函数,并返回要查询的所需执行日期。 execution_delta 或execution_date_fn 可以传递给ExternalTaskSensor,但不能同时传递给两者。
您可以这样定义传感器:
wait_for_dag_a = ExternalTaskSensor(
task_id='wait_for_dag_a',
external_task_id="external_task_1",
external_dag_id='dag_a_id',
allowed_states=['success', 'failed'],
execution_date_fn=_get_execution_date_of_dag_a,
poke_interval=30
)
Where _get_execution_date_of_dag_a
使用以下命令对数据库执行查询get_last_dagrun
让你得到最后的execution_date
of DAG_A.
from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None, **kwargs):
dag_a_last_run = get_last_dagrun(
'dag_a_id', session)
return dag_a_last_run.execution_date
我希望这个方法可以帮助你。您可以在中找到一个工作示例这个答案.