如何传递动态参数 Airflow 运算符?

2024-03-27

我正在使用 Airflow 在 Google Cloud Composer 上运行 Spark 作业。我需要

  • 创建集群(用户提供的YAML参数)
  • Spark 作业列表(作业参数也由每个作业 YAML 提供)

借助 Airflow API,我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。

但是,考虑到DataprocClusterCreateOperator()

  • cluster_name
  • project_id
  • zone

还有一些其他参数被标记为模板化。

如果我想将其他参数作为模板传递(目前不是这样)怎么办? - 喜欢image_version, num_workers, worker_machine_type etc?

有什么解决方法吗?


不确定“动态”是什么意思,但是当 yaml 文件更新时,如果读取文件进程位于 dag 文件主体中,则 dag 将刷新以从 yaml 文件申请新的参数。所以实际上,你不需要 XCOM 来获取参数。 只需创建一个 params 字典,然后传递给 default_args:

CONFIGFILE = os.path.join(
    os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')

with open(CONFIGFILE, 'r') as ymlfile:
    CFG = yaml.load(ymlfile)

default_args = {
    'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
    'project_id': CFG['section_A']['project_id'],
    'zone': CFG['section_A']['zone'],
    'mage_version': CFG['section_A']['image_version'],
    'num_workers': CFG['section_A']['num_workers'],
    'worker_machine_type': CFG['section_A']['worker_machine_type'],
    # you can add all needs params here.
}

DAG = DAG(
    dag_id=DAG_NAME,
    schedule_interval=SCHEDULE_INTEVAL,
    default_args=default_args, # pass the params to DAG environment
)

Task1 = DataprocClusterCreateOperator(
    task_id='your_task_id',
    dag=DAG
)

但如果你想要动态的 dags 而不是参数,你可能需要其他策略,比如this https://www.astronomer.io/guides/dynamically-generating-dags/.

所以你可能需要弄清楚基本的想法: 动态处于哪个级别?任务级别? DAG 级别?

或者您可以创建自己的 Operator 来完成工作并获取参数。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何传递动态参数 Airflow 运算符? 的相关文章

随机推荐

  • bash 中的间接变量赋值

    似乎在 bash 中进行间接变量设置的推荐方法是使用eval var x val foo eval var val echo x gt foo 问题是常见的eval var x val 1 n pwd eval var val bad ou
  • C语言中的*和&有什么区别?

    我正在学习 C 但我仍然不确定我是否理解两者之间的区别 and yet 请允许我尝试解释一下 int a Declares a variable int b Declares a pointer int c Not possible a 1
  • JWT 令牌的最大大小是多少?

    我需要知道的最大长度 JSON Web 令牌 JWT 规格中没有相关信息 难道说 长度没有限制吗 我也一直在努力寻找这个 我会说 尝试并确保它是7kb 以下 虽然 JWT 在规范中没有定义上限 http www rfc editor org
  • Rmarkdown:在选项卡集下添加标题

    在 Rmarkdown 中我使用 tabset 将块拆分为选项卡 Tabset 1 tabset A Text under tab A B Text under tab B 我想在一些选项卡下添加一个大标题 Tabset 1 tabset
  • Java 中的 ArrayList 与 String

    我正在实现LZW算法 我已经成功地针对字符串和文本文件实现了它 并且当前正在修改我的代码以处理二进制文件 例如图像或可执行文件 因为我无法将这些文件作为字符串读取 我已经更换了String输入我的代码ArrayList
  • 如何在不使用 try-catch 的情况下检查路径是否有效?

    我想检查文件夹是否存在 如果不存在则创建它 但我不知道提供的路径是否有效 当路径无效时 会发生以下情况 string path this is an invalid path if Directory Exists path Directo
  • PayPal 的 Python 接口 - urllib.urlencode 非 ASCII 字符失败

    我正在尝试实现 PayPal IPN 功能 基本协议是这样的 客户从我的网站重定向到 PayPal 的网站以完成付款 他登录自己的帐户 授权付款 PayPal 调用我服务器上的一个页面 以 POST 形式传递详细信息 详细信息包括个人姓名
  • 上传的文件未保存到文件系统

    Context 我正在为我正在构建的 CMS 创建媒体库 基本功能包括上传文件并将其存储在文件系统中 但是 它会为保存的文件创建一个 id 目前我正在通过我构建的内容进行测试localhost db Files Add mediafile
  • Jira 插件自定义字段值如何在 .vm 模板中得到处理

    吉拉服务器 7 2 1 自定义字段插件 问题遵循此讨论 不明白 方法 getSingularObjectFromString 是做什么的 https stackoverflow com questions 17925377 cant und
  • Google表格公式中的数字增量

    在 Google Sheets 数据库中 我建立了一个公式 以便为一系列公司分配参考号 每个公司都应该有其唯一的编号 其形式为RET00XX其中 XX 代表唯一的公司编号 我希望这些数字是连续的 从 1 开始 然后继续 1 每当在数据库中插
  • Python 交换函数

    我很难用 Python 表达这一点 这是需要做什么的描述 swap cards int int 列表 gt NoneType swap cards 3 2 1 4 5 6 0 5 3 2 1 4 5 0 6 swap cards 3 2 1
  • 需要正则表达式来匹配多行,直到在公共分隔符之间找到匹配

    我正在尝试编写一个正则表达式 它将从日志文件返回多行匹配 使用下面的示例 我想匹配整个 事务 其开头和结尾与日志中所有其他事务 开始和结束 的文本相同 然而 在这些行之间有一个自定义标识符 在本例中是一个电子邮件地址 可以将一笔交易与另一笔
  • 如何将 Fluent NHibernate Automapping 与实体中相同类型的多个列表一起使用?

    看来 NHibernate 无法自动映射实体中给定类型的多个 IList 考虑以下两个实体 基于 Fluent NHibernate 源代码中包含的 Examples FirstProject 示例代码 public class Emplo
  • 如何在 LESS CSS 嵌套类上指定 html 标签?

    我有一个类用于article and a sectionHTML5 标签 在家里
  • 在创建位图之前如何从InputStream知道位图的大小?

    我需要在创建图像之前对其进行缩放 并且仅当图像超过 1024KB 例如 时才进行缩放 通过执行以下操作 我可以缩放图像 但我只需要缩放大于给定尺寸的图像 Bitmap bmImg null InputStream is url openSt
  • 如何在不等待事件侦听器运行的情况下触发事件?

    我对 NET C 中的事件有疑问 我必须为几种情况编写代码 在这些情况下 我正在运行后台任务 并且我想通知主线程或控制器类发生了某些事情 例如任务已完成或完成了文件复制 但我不这样做不希望后台任务等待主线程的委托来处理事件 我想做一些类似消
  • java 最优雅的 isNumeric() 解决方案

    我现在正在将一小段 PHP 代码移植到 java 并且我依赖于该函数is numeric x 确定是否 x是一个数字还是不是一个数字 java中似乎没有等效的函数 而且我对目前找到的解决方案并不满意 我倾向于这里找到的正则表达式解决方案 h
  • 如何选择一个CSS使用最深的类?

    如何选择一个css类的使用最深 下面的列表中 如何选择使用最深的类 active 在这种情况下是 li 包裹 span Item 1 1 1 1 span ul li class active span Item 1 span ul li
  • angularjs:在 ng-switch 中更改控制器的父范围

    因此 我可以从子控制器更改模型值 但是当子控制器处于ng switch然后就不行了 为什么呢 我创建一个例子 http plnkr co edit R7D8Xa1HBmYnwffbHQGD p preview来展示它 避免这种情况的一种方法
  • 如何传递动态参数 Airflow 运算符?

    我正在使用 Airflow 在 Google Cloud Composer 上运行 Spark 作业 我需要 创建集群 用户提供的YAML参数 Spark 作业列表 作业参数也由每个作业 YAML 提供 借助 Airflow API 我可以