如何根据上游任务返回的列表动态生成任务。
我已经尝试过以下方法:
使用外部文件从列表中写入和读取 - 此选项有效,但我正在寻找更优雅的解决方案。
Xcom 拉进了一家 subdag 工厂。这是行不通的。
我能够将列表从上游任务传递到子dag,但 xcom 只能在子dag 的任务内部访问,不能用于循环/迭代返回的列表并生成任务。
例如subdag工厂方法。
def subdag1(parent_dag_name, child_dag_name, default_args,**kwargs):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=default_args,
schedule_interval="@once",
)
list_files='{{ task_instance.xcom_pull( dag_id="qqq",task_ids="push")}}'
for newtask in list_files:
BashOperator(
task_id='%s-task-%s' % (child_dag_name, 'a'),
default_args=default_args,
bash_command= 'echo '+ list_files + newtask,
dag=dag_subdag,
)
return dag_subdag
None
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)