是否可以更新/覆盖 Airflow [‘dag_run’].conf?

2024-01-08

我们通常使用以下方式启动 Airflow DAG:trigger_dagCLI 命令。例如:

airflow trigger_dag my_dag --conf '{"field1": 1, "field2": 2}'

我们在操作员中使用以下命令访问此配置context[‘dag_run’].conf

有时,当 DAG 在某些任务中中断时,我们希望“更新”conf 并使用此新conf 重新启动中断的任务(以及下游依赖项)。例如:

新的会议-->{"field1": 3, "field2": 4}

是否可以使用这样的新 json 字符串“更新” dag_run conf?

有兴趣听取对此的想法、其他解决方案或避免这种情况的潜在方法。

使用 Apache Airflow v1.10.3

预先非常感谢您。


创建 dag run 后更新 conf 并不像从 conf 中读取那么直接,因为在创建 dag run 后每次使用时都会从 dag_run 元数据表中读取 conf。虽然变量具有写入和读取元数据表的方法,但 dag 运行只允许您读取。

我同意变量是一个有用的工具,但是当您有 k=v 对只想用于单次运行时,它会变得复杂而混乱。

下面是一个运算符,可让您在实例化后更新 dag_run 的conf(在 v1.10.10 中测试):

#! /usr/bin/env python3
"""Operator to overwrite a dag run's conf after creation."""


import os

from airflow.models import BaseOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.operator_helpers import context_to_airflow_vars


class UpdateConfOperator(BaseOperator):
    """Updates an existing DagRun's conf with `given_conf`.

    Args:
        given_conf: A dictionary of k:v values to update a DagRun's conf with. Templated.
        replace: Whether or not `given_conf` should replace conf (True)
                 or be used to update the existing conf (False).
                 Defaults to True.

    """

    template_fields = ("given_conf",)
    ui_color = "#ffefeb"

    @apply_defaults
    def __init__(self, given_conf: Dict, replace: bool = True, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.given_conf = given_conf
        self.replace = replace

    @staticmethod
    def update_conf(given_conf: Dict, replace: bool = True, **context) -> None:
        @provide_session
        def save_to_db(dag_run, session):
            session.add(dag_run)
            session.commit()
            dag_run.refresh_from_db()

        dag_run = context["dag_run"]
        # When there's no conf provided,
        # conf will be None if scheduled or {} if manually triggered
        if replace or not dag_run.conf:
            dag_run.conf = given_conf
        elif dag_run.conf:
            # Note: dag_run.conf.update(given_conf) doesn't work
            dag_run.conf = {**dag_run.conf, **given_conf}

        save_to_db(dag_run)

    def execute(self, context):
        # Export context to make it available for callables to use.
        airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
        self.log.debug(
            "Exporting the following env vars:\n%s",
            "\n".join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()]),
        )
        os.environ.update(airflow_context_vars)

        self.update_conf(given_conf=self.given_conf, replace=self.replace, **context)

用法示例:

CONF = {"field1": 3, "field2": 4}
with DAG(
    "some_dag",
    # schedule_interval="*/1 * * * *",
    schedule_interval=None,
    max_active_runs=1,
    catchup=False,
) as dag:
    t_update_conf = UpdateConfOperator(
        task_id="update_conf", given_conf=CONF,
    )
    t_print_conf = BashOperator(
        task_id="print_conf",
        bash_command="echo {{ dag_run['conf'] }}",
    )
    t_update_conf >> t_print_conf
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

是否可以更新/覆盖 Airflow [‘dag_run’].conf? 的相关文章

  • sklearn 估计器管道的参数无效

    我正在实现 O Reilly 书中的一个示例 Python 机器学习简介 使用 Python 2 7 和 sklearn 0 16 我正在使用的代码 pipe make pipeline TfidfVectorizer LogisticRe
  • str.translate 与 str.replace - 何时使用哪一个?

    何时以及为什么使用前者而不是后者 反之亦然 目前尚不完全清楚为什么有些人使用前者以及为什么有些人使用后者 它们有不同的目的 translate只能用任意字符串替换单个字符 但一次调用可以执行多次替换 它的参数是一个特殊的表 它将单个字符映射
  • scipy.optimize on pandas dataframe

    我试图搜索它 但结果很差 有人可以向我解释一下如何在 Pandas DataFrame 上执行 optimize minimize 以便最小化 DataFrame 中的类别和结果列之间的错误 考虑这个例子 import pandas as
  • Flask 中“缺少 CSRF 令牌”,但它在模板中呈现

    问题 当我尝试登录 使用 Flask login 时 我得到Bad Request The CSRF session token is missing但令牌正在呈现 在模板中 secret key 已设置 并且我在本地运行localhost
  • Keras model.predict 函数给出输入形状错误

    我已经在 Tensorflow 中实现了通用句子编码器 现在我正在尝试预测句子的类概率 我也将字符串转换为数组 Code if model model type universal classifier basic class probs
  • 如何计算数据框中按另一列的列值分组的一列的连续字符串值?

    我有以下数据框 Levels Labels Confidence 0 Hands 0 8 0 Leg 0 7 0 Eye 0 9 1 Ear 0 9 1 Eye 0 8 2 Hands 0 9 2 Eye 0 8 3 Eye 0 8 我想检
  • 使用 NumPy 编写一个函数来计算具有特定公差的积分

    我想编写一个自定义函数来以特定容差对表达式 python 或 lambda 函数 进行数字积分 我知道与scipy integrate quad人们可以简单地改变epsabs但我想使用 numpy 自己编写该函数 From 这篇博文 htt
  • django 模板 - 如何动态访问变量?

    假设我有一个具有以下上下文的 django 模板 data1 this is data1 data2 this is data2 data name data2 现在我知道了data name 假设它是 data2 是否可以用它来访问变量d
  • 在linux上安装python ssl模块,无需重新编译

    是否可以在已经安装了 OpenSSL 的 Linux 机器上安装 python 的 SSL 模块 而无需重新编译 python 我希望它就像复制几个文件并将它们包含在库路径中一样简单 Python版本是2 4 3 谢谢 是否可以在已经安装了
  • Django 多对多关系(类别)

    我的目标是向我的 Post 模型添加类别 我希望以后能够按不同类别 有时是多个类别 查询所有帖子 模型 py class Category models Model categories 1 red 2 blue 3 black title
  • 高级描述熊猫

    有没有像 pandas 那样更高级的功能 通常我会继续这样 r pd DataFrame np random randn 1000 columns A r describe 我会得到一份很好的总结 就像这样 A count 1000 000
  • 类型错误:此 COM 对象无法自动执行 makepy 过程 - 请为此对象手动运行 makepy

    这是什么错误 回溯错误 C Users DELL PycharmProjects MyNew venv Scripts python exe C Users DELL PycharmProjects MyNew agaaaaain py T
  • 如何用正则表达式替换多个匹配/组?

    通常我们会编写以下内容来替换一场比赛 namesRegex re compile r is life re I replaced namesRegex sub r butter There is no life in the void pr
  • 更改 Matplotlib 投影轴的背景颜色

    我正在尝试使用 Cartopy 创建一个图形 该图形需要在未投影的轴上绘制投影轴 这是一个尽可能简单的代码版本 它将轴上的内容替换为背景颜色 import matplotlib pyplot as plt import cartopy cr
  • 从 wxPython 事件处理程序中调用函数

    我正在努力寻找一种在 wxPython 事件处理函数中使用函数的方法 假设我有一个按钮 单击该按钮时 它会使用事件处理程序运行一个名为 OnRun 的函数 但是 用户忘记单击 OnRun 按钮之前的 RadionButton 我想弹出一个
  • 从迭代器外部将 StopIteration 发送到 for 循环

    有几种方法可以打破一些嵌套循环 他们是 1 使用中断 继续 for x in xrange 10 for y in xrange 10 print x y if x y gt 50 break else continue only exec
  • 如何创建用于霍夫曼编码和解码的树?

    对于我的作业 我将对霍夫曼树进行编码和解码 我在创建树时遇到问题 并且陷入困境 不要介意打印语句 它们只是让我测试并查看函数运行时的输出是什么 对于第一个 for 循环 我从主块中用于测试的文本文件中获取了所有值和索引 在第二个 for 循
  • 如何使用 enumerate 来倒数?

    letters a b c 假设这是我的清单 在哪里for i letter in enumerate letters 将会 0 a 1 b 2 c 我怎样才能让它向后枚举 如 2 a 1 b 0 c 这是一个很好的解决方案并且工作完美 i
  • Python 相当于 Scala 案例类

    Python 中是否有与 Scala 的 Case Class 等效的东西 就像自动生成分配给字段而无需编写样板的构造函数一样 当前执行此操作的现代方法 从 Python 3 7 开始 是使用数据类 https www python org
  • 为boost python编译的.so找不到模块

    我正在尝试将 C 代码包装到 python 中 只需一个类即可导出两个函数 我编译为map so 当我尝试时import map得到像噪音一样的错误 Traceback most recent call last File

随机推荐

  • ios 8.4.1 webview黑屏

    我需要在 ios 中在一个简单的 webview 中创建一个应用程序 我用example https github com vandadnp iOS 8 Swift Programming Cookbook blob master chap
  • 在日志传送的辅助服务器上创建用户

    我有一个生产服务器说ServerA我已设置日志传送到ServerB其处于只读模式 此日志传送的目的是降低生产服务器上某些昂贵的查询 痛苦的报告 的负载 现在 如果我必须使用我们的域帐户创建一些登录名 我无法执行此操作 因为辅助数据库位于st
  • 如何从 Rust 中的 Vec 中提取两个可变元素[重复]

    这个问题在这里已经有答案了 我试图从 Vec 中提取两个元素 它始终包含至少两个元素 这两个元素需要可变地提取 因为我需要能够在单个操作中更改这两个元素的值 示例代码 struct Piece x u32 y u32 name static
  • 使用 CSS 将按钮放置在另一个按钮之上

    我在这里需要一些高级 CSS 帮助 我有一个登录按钮和一个注册按钮 我只想一次显示一个 如果用户未登录 注册按钮应出现在登录按钮的顶部 我们有一个复杂而疯狂的后端 如果服务器认为用户未登录 它将生成注册按钮的代码 但是两者都会由服务器输出
  • Windows Python:使用区域设置模块更改编码

    使用Python 2 7 我正在编写一个抽象的网络抓取工具 在显示 打印 某些字符时遇到问题 我收到回溯错误 UnicodeEncodeError ascii codec can t encode character u u2606 in
  • 什么是无符号字符?

    在 C C 中 什么是unsigned char是用来 和普通的有什么不同char 在C 中 有以下三种distinct字符类型 char signed char unsigned char 1 char 如果您使用字符类型text 使用不
  • 如何从组件模板将数组作为 Input() 传递?

    我需要使用绑定将值数组传递给组件 例如 Component selector my component template div div export class MyComponent Input data any 然而 Angular
  • Prolog 中的算术,使用 2 的幂表示数字

    我有两个数字 让我们命名它们N and K 我想写N using K2 的幂 例如如果N 9 and K 4 then N可能N 1 2 2 4 2 0 2 1 2 1 2 2 我的程序应该输出类似的内容N 1 2 2 4 我习惯了C 我在
  • WPF 应用程序的单元测试失败,并出现 NotSupportedException“无法识别 Uri 前缀”

    我目前正在编写单元测试 在这个位置测试失败 并出现 NotSupportedException 无法识别 URI 前缀 经过小型研究 我已经注册了 pack Uri 方案 但它没有帮助 return WaitImageThumbnail W
  • 如何将工作负载项与已提交的更改链接起来?

    我正在使用 Git for Visual Studio Online 我添加了一个产品待办事项列表项 我添加了一些文件并提交更改 由于某种原因没有链接 我对该项目做了更多的提交并同步 我的第一个更改未与待办日志项链接 它仍然在任务下显示为待
  • 如何获取 grep 命令的输出(Python)

    我有一个输入文件 test txt 为 host dc2000 host 192 168 178 2 我想通过使用以下方式获取这些机器的所有地址 grep host root test txt 依此类推 我通过python获得命令输出 im
  • 从字符串Python中获取列表[重复]

    这个问题在这里已经有答案了 例如 我有一个字符串 1 2 3 我怎样才能让她翻一张清单 1 2 3 引号字符串 如果有其他字符 则保留字符串 我怎样才能做到这一点 例子 input output 1 2 3 1 2 3 input outp
  • 使用 DbSet 和 IQueryable 以及 NSubstitute 操作对象会返回错误

    我想用N替补 http nsubstitute github io 通过模拟对 Entity Framework 6 x 进行单元测试DbSet http msdn microsoft com en us library gg696460
  • 如何创建可加载 Lua(适用于 Windows)的自定义 .NET dll?

    我们正在 NET 框架中做一个项目 并希望稍后可以将其大部分功能用于 Lua 脚本 我想我可以在 LuaInterface 的帮助下编译一个 dll 并将其加载到 Lua 脚本中 但不知怎的 它不起作用 所做的工作如下 require lu
  • 使用Web浏览器控件通过类名获取div的内容?

    我有一个表格webBrowser1用于加载 HTML 部分包含以下行的页面的控件 div class cls Hello World div 我需要得到innerText of the div元素 我尝试了以下方法 string resul
  • Git 从index.lock 重命名为index 失败

    使用 GitHub Windows 客户端我做了一个sync将远程更改拉取到本地计算机 但在完成同步之前 我用完了磁盘空间 同步失败 现在我似乎有一堆本地更改 这些更改实际上是从原点拉取的更改 我尝试运行 git pull 但得到 C Us
  • Angular ui-grid 外部导出按钮

    我是 Angular UI GRID 的新手 我需要为导出功能创建外部按钮 例如PDF导出 and CSV 导出相似的到这张图片 https i stack imgur com kNkQH png 你知道我该怎么做吗 我还需要一个 打印 按
  • 错误:[Errno 32] 管道损坏 django

    有时 当我查看终端时 我会看到以下错误 任何人都可以让我知道它正在显示以及如何避免它 Exception happened during processing of request from 127 0 0 1 39444 Tracebac
  • 在 ASP.NET 中绑定 jqGrid 时出现问题

    我是使用 jqGrid 和 jquery 的新手 我无法将从 webmethod 检索到的 json 数据绑定到 jqGrid 上 我还使用 firebug 进行交叉验证 并且我正在从中接收数据 关于这方面的一些帮助将会很棒 我还想知道是否
  • 是否可以更新/覆盖 Airflow [‘dag_run’].conf?

    我们通常使用以下方式启动 Airflow DAG trigger dagCLI 命令 例如 airflow trigger dag my dag conf field1 1 field2 2 我们在操作员中使用以下命令访问此配置contex