当我尝试扩展气流 API 中提供的 SubDagOperator 时,气流网络服务器 GUI 无法将其识别为 SubDagOperator,从而使我无法放大 subdag。
如何扩展 SubDagOperator,同时保留将其放大为 subdag 的能力?我错过了什么吗?
请参阅下面的示例,了解如何扩展 SubDagOperator。您的情况的关键是覆盖 task_type 函数
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.decorators import apply_defaults
class ExampleSubdagSubclassOperator(SubDagOperator):
template_fields = ()
template_ext = ()
@apply_defaults
def __init__(self, *args, **kwargs):
dag = kwargs.get('dag')
task_id = kwargs.get('task_id')
subdag = DAG(
'{}.{}'.format(dag.dag_id, task_id),
schedule_interval=dag.schedule_interval,
start_date=dag.start_date
)
# Replace the following 3 lines with code to automatically generate the desired tasks in the subdag
t1 = DummyOperator(dag=subdag, task_id='t1')
t2 = DummyOperator(dag=subdag, task_id='t2')
t3 = DummyOperator(dag=subdag, task_id='t3')
super(ExampleSubdagSubclassOperator, self).__init__(subdag=subdag, *args, **kwargs)
# This property needs to be overridden so that the airflow UI recognises the task as a subdag and enables
# the "Zoom into Sub Dag" button
@property
def task_type(self):
return 'SubDagOperator'
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)