我正在尝试创建一个动态工作流程,但想要更改气流自动生成的任务名称并将其分配给列表中的任务。我尝试访问上下文并手动更改taskid
但这在 UI 中的管道渲染期间也不起作用。
My Code
def get_the_route(router_ip, taskid):
dev1 = junos_ops()
dev1.open_fabric_connection()
result = dev1.dev_handler.rpc.get_route_information(destination="10.0.0.3", normalize=True)
logger.info("result is: {}".format(pformat(result)))
dev1.close_fabric_connection()
# <--do-some-logic-->
return {"result": result}
for dev in dev_list:
get_the_route_dev_list.append(get_the_route(router_ip=dev, taskid=dev))
start >> hello_task >> get_the_route_dev_list >> bye_task >> end
生成的图
无论如何,是否可以为动态任务指定不同的名称?我知道这可以使用PythonOperator
。但我正在尝试使用TaskFlow API
反而。
Thanks
您可以这样做(它也适用于动态 DAG):
@task()
def foo():
pass
with DAG(
'test_foo',
start_date=days_ago(1),
schedule_interval=None,
) as dag:
for name in ["a", "b", "c"]:
foo.override(task_id=name)()
你可以在这里阅读更多:https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html#reusing-a-decorated-task https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html#reusing-a-decorated-task
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)