不确定“动态”是什么意思,但是当 yaml 文件更新时,如果读取文件进程位于 dag 文件主体中,则 dag 将刷新以从 yaml 文件申请新的参数。所以实际上,你不需要 XCOM 来获取参数。
只需创建一个 params 字典,然后传递给 default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args = {
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
}
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
但如果你想要动态的 dags 而不是参数,你可能需要其他策略,比如this https://www.astronomer.io/guides/dynamically-generating-dags/.
所以你可能需要弄清楚基本的想法:
动态处于哪个级别?任务级别? DAG 级别?
或者您可以创建自己的 Operator 来完成工作并获取参数。