使自定义 Airflow 宏扩展其他宏

2023-12-22

有没有办法在 Airflow 中创建一个用户定义的宏,它本身是根据其他宏计算出来的?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date }}"',
    dag=dag,
)

这里的用例是向后移植新的 Airflow v1.8next_execution_date宏可在 Airflow v1.7 中工作。不幸的是,这个模板是在没有宏扩展的情况下渲染的:

$ airflow render simple bash_op 2017-08-09 21:00:00
    # ----------------------------------------------------------
    # property: bash_command
    # ----------------------------------------------------------
    echo "{{ dag.following_schedule(execution_date) }}"

以下是一些解决方案:

1. 覆盖BashOperator向上下文添加一些值

class NextExecutionDateAwareBashOperator(BashOperator):
    def render_template(self, attr, content, context):
        dag = context['dag']
        execution_date = context['execution_date']
        context['next_execution_date'] = dag.following_schedule(execution_date)

        return super().render_templates(attr, content, context)
        # or in python 2:
        # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)

这种方法的好处是:您可以在自定义运算符中捕获一些重复的代码。

不好的部分:在渲染模板化字段之前,您必须编写一个自定义运算符来将值添加到上下文中。

2. 在用户定义的宏中进行计算

Macros https://airflow.apache.org/code.html#macros不一定是价值观。它们可以是函数。

在你的达格中:

def compute_next_execution_date(dag, execution_date):
    return dag.following_schedule(execution_date)

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': compute_next_execution_date,
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
    dag=dag,
)

好的部分是:您可以定义可重用的函数来处理运行时可用的值(XCom价值观 https://airflow.apache.org/concepts.html#xcoms、作业实例属性、任务实例属性等),并使您的函数结果可用于渲染模板。

不好的部分(但不是那么烦人):您必须在每个需要的地方导入这样的函数作为用户定义的宏。

3. 直接在模板中调用语句

这个解决方案是最简单的(如阿丹的回答 https://stackoverflow.com/a/45941551/3216318),并且可能是您的情况中最好的一个。

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)

非常适合像这样的简单呼叫。它们是其他一些直接可用的对象macros https://airflow.apache.org/code.html#macros (like task, task_instance, ETC...);甚至一些标准模块也可用(例如macros.time, ...).

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使自定义 Airflow 宏扩展其他宏 的相关文章

随机推荐