以下是一些解决方案:
1. 覆盖BashOperator
向上下文添加一些值
class NextExecutionDateAwareBashOperator(BashOperator):
def render_template(self, attr, content, context):
dag = context['dag']
execution_date = context['execution_date']
context['next_execution_date'] = dag.following_schedule(execution_date)
return super().render_templates(attr, content, context)
# or in python 2:
# return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)
这种方法的好处是:您可以在自定义运算符中捕获一些重复的代码。
不好的部分:在渲染模板化字段之前,您必须编写一个自定义运算符来将值添加到上下文中。
2. 在用户定义的宏中进行计算
Macros https://airflow.apache.org/code.html#macros不一定是价值观。它们可以是函数。
在你的达格中:
def compute_next_execution_date(dag, execution_date):
return dag.following_schedule(execution_date)
dag = DAG(
'simple',
schedule_interval='0 21 * * *',
user_defined_macros={
'next_execution_date': compute_next_execution_date,
},
)
task = BashOperator(
task_id='bash_op',
bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
dag=dag,
)
好的部分是:您可以定义可重用的函数来处理运行时可用的值(XCom价值观 https://airflow.apache.org/concepts.html#xcoms、作业实例属性、任务实例属性等),并使您的函数结果可用于渲染模板。
不好的部分(但不是那么烦人):您必须在每个需要的地方导入这样的函数作为用户定义的宏。
3. 直接在模板中调用语句
这个解决方案是最简单的(如阿丹的回答 https://stackoverflow.com/a/45941551/3216318),并且可能是您的情况中最好的一个。
BashOperator(
task_id='bash_op',
bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
dag=dag,
)
非常适合像这样的简单呼叫。它们是其他一些直接可用的对象macros https://airflow.apache.org/code.html#macros (like task
, task_instance
, ETC...);甚至一些标准模块也可用(例如macros.time
, ...).