airflow TriggerDagRunOperator 如何更改执行日期

2023-11-23

我注意到,对于计划任务,执行日期是根据过去设置的

Airflow 是作为 ETL 需求的解决方案而开发的。在 ETL 世界中, 您通常会汇总数据。所以,如果我想总结数据 2016-02-19,我会在格林威治标准时间2016-02-20午夜进行,这将是 2016 年 2 月 19 日的所有数据均可用后。

但是,当一个 dag 触发另一个 dag 时,执行时间将设置为 now()。

有没有办法让触发的 dags 与触发 dag 的执行时间相同?当然,我可以重写模板并使用昨天_ds,但是,这是一个棘手的解决方案。


下面的课程扩展了TriggerDagRunOperator允许将执行日期作为字符串传递,然后将其转换回日期时间。这有点老套,但这是我发现完成工作的唯一方法。

from datetime import datetime
import logging

from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder

class MMTTriggerDagRunOperator(TriggerDagRunOperator):
    """
    MMT-patched for passing explicit execution date
    (otherwise it's hard to hook the datetime.now() date).
    Use when you want to explicity set the execution date on the target DAG
    from the controller DAG.

    Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
    http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e

    Parameters
    ------------------
    execution_date: str
        the custom execution date (jinja'd)

    Usage Example:
    -------------------
    my_dag_trigger_operator = MMTTriggerDagRunOperator(
        execution_date="{{execution_date}}"
        task_id='my_dag_trigger_operator',
        trigger_dag_id='my_target_dag_id',
        python_callable=lambda: random.getrandbits(1),
        params={},
        dag=my_controller_dag
    )
    """
    template_fields = ('execution_date',)

    def __init__(
        self, trigger_dag_id, python_callable, execution_date,
        *args, **kwargs
        ):
        self.execution_date = execution_date
        super(MMTTriggerDagRunOperator, self).__init__(
            trigger_dag_id=trigger_dag_id, python_callable=python_callable,
           *args, **kwargs
       )

    def execute(self, context):
        run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
        dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
        dro = self.python_callable(context, dro)
        if dro:
            session = settings.Session()
            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                execution_date=self.execution_date,
                conf=dro.payload,
                external_trigger=True)
            logging.info("Creating DagRun {}".format(dr))
            session.add(dr)
            session.commit()
            session.close()
        else:
            logging.info("Criteria not met, moving on")

使用此功能而不设置时可能会遇到问题execution_date=now():如果您尝试使用相同的数据启动 dag,您的操作员将抛出 mysql 错误execution_date两次。这是因为execution_date and dag_id用于创建行索引,无法插入具有相同索引的行。

我想不出你想要用相同的东西运行两个相同的 dags 的原因execution_date无论如何,在生产中,但这是我在测试时遇到的事情,你不应该对此感到惊慌。只需清除旧作业或使用不同的日期时间即可。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

airflow TriggerDagRunOperator 如何更改执行日期 的相关文章

随机推荐