动态改变任务重试次数

2024-01-06

重试任务可能毫无意义。例如,如果任务是传感器,并且由于凭据无效而失败,那么以后的任何重试都将不可避免地失败。如何定义可以决定重试是否合理的操作员?

在 Airflow 1.10.6 中,决定任务是否应该重试的逻辑位于airflow.models.taskinstance.TaskInstance.handle_failure,使得无法定义操作员的行为,因为这是任务的责任,而不是操作员的责任。

理想的情况是如果handle_failure方法是在 Operator 端定义的,因此我们可以根据需要重新定义它。

我发现的唯一解决方法是使用PythonBranchingOperator“测试”任务是否可以运行。例如,在上述传感器的情况下,检查登录凭据是否有效,然后才将 DAG 流传送到传感器。否则,失败(或分支到另一个任务)。

我的分析是handle_failure正确的?有更好的解决方法吗?


通过修改来回答我自己的问题self.retries实例变量,可用于所有运算符,在execute方法我们可以动态地强制不再重试。

在以下示例中:

  1. 传感器 0:第一次尝试就会成功
  2. 传感器 1:4 次尝试后将失败(最多重试 1 + 3 次)
  3. 传感器 2:尝试 1 次后将失败(动态强制不再重试)
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import BaseOperator


class PseudoSensor(BaseOperator):
    def __init__(
            self,
            s3_status_code_mock,
            *args,
            **kwargs):
        super().__init__(*args, **kwargs)
        self.s3_status_code_mock = s3_status_code_mock

    def execute(self, context):
        # Try to read S3, Redshift, blah blah
        pass
        # The query returned a status code, that we mock when the Sensor is initialized
        if self.s3_status_code_mock == 0:
            # Success
            return 0
        elif self.s3_status_code_mock == 1:
            # Error but should retry if I can still can
            raise Exception("Retryable error. Won't change retries of operator.")
        elif self.s3_status_code_mock == 2:
            # Unrecoverable error. Should fail without future retries.
            self.retries = 0
            raise Exception("Unrecoverable error. Will set retries to 0.")


# A separate function so we don't make the globals dirty
def createDAG():
    # Default (but overridable) arguments for Operators instantiations
    default_args = {
        'owner': 'Supay',
        'depends_on_past': False,
        'start_date': datetime(2019, 11, 28),
        'retry_delay': timedelta(seconds=1),
        'retries': 3,
    }

    with DAG("dynamic_retries_dag", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False) as dag :
        # Sensor 0: should succeed in first try
        sensor_0 = PseudoSensor(
            task_id="sensor_0",
            provide_context=True,
            s3_status_code_mock=0,
        )

        # Sensor 1: should fail after 3 tries
        sensor_1 = PseudoSensor(
            task_id="sensor_1",
            provide_context=True,
            s3_status_code_mock=1
        )

        # Sensor 1: should fail after 1 try
        sensor_2 = PseudoSensor(
            task_id="sensor_2",
            provide_context=True,
            s3_status_code_mock=2
        )

        dag >> sensor_0
        dag >> sensor_1
        dag >> sensor_2

        globals()[dag.dag_id] = dag


# Run everything
createDAG()

Gantt showing the tries per task enter image description here

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

动态改变任务重试次数 的相关文章

随机推荐

  • 加快 Firebase 存储下载速度

    我正在尝试从 Firebase 存储中提取视频并将其放入我的 Android 应用程序上的幻灯片中 但视频需要很长时间才能加载 有谁有任何替代方案或方法来加快数据下载速度 您可以将文件存储在区域存储中 例如 us east1 请参阅http
  • 如何仅显示一页以上的报表的第 x 页(共 y 页)

    我正在使用 jasper reports 4 5 0 我正在使用这个碧玉生成不同格式的报告 我想在我的报告中显示第 X 页 共 Y 页 所以我使用 iReport 提供的页码文本字段 该字段在调色板中可用 它显示所有报告中的页码 甚至是单页
  • 从更新站点下载 eclipse 插件的工具

    我需要在未连接到 Internet 的计算机上安装 eclipse 插件 但找不到用于本地安装的 dist 是否有一个工具可以从更新站点下载插件并创建本地安装存档 或本地更新站点 有传言说你可以用 eclipse 来做到这一点 但我找不到任
  • 如何更改 UITextField 上清除按钮的色调颜色

    我的 UITextfield 上有一个自动生成的清除按钮 具有默认的蓝色色调 我无法将色调颜色更改为白色 我尝试修改故事板和代码但没有成功 并且我不想使用自定义图像 如何在不使用自定义图像的情况下更改默认的透明按钮色调颜色 干得好 Tint
  • HTML.fromHTML - Android 中的 TagHandler

    我有一个 TextView 我想将 HTML 设置为 HTML fromHTML 但我想过滤掉所有 img 带有标签处理程序的标签 我想将所有链接 src 保存在列表数组中 那可能吗 Thanks 是的 这是可能的 您可以使用jsoup h
  • 如何确定电子邮件地址是 Microsoft“工作或学校”帐户还是 Microsoft 帐户

    我想在 Azure 多租户环境中对 Microsoft 帐户和 工作或学校 帐户进行身份验证 每种身份验证类型需要不同的请求 如果我尝试针对 工作或学校 请求以 Microsoft 帐户身份登录 则登录将在 Microsoft 登录时失败
  • 在 api 27、28、29 中混淆应用程序时,工作管理器不会运行

    我有一个每 15 分钟运行一次的定期任务 当混淆应用程序时 如果应用程序从后台被终止 工作管理器将不起作用 测试设备 一加7T 诺基亚5 Google Pixel 2模拟器 仅当应用程序位于前台或后台时 工作管理器才会执行 禁用 progu
  • 如何在混合(C#/C++)调试中设置数据断点?

    我用 C 启动程序 然后调用一些非托管 C 当我在非托管 C 中中断一行时 新数据断点 菜单项呈灰色 有没有办法解决 所以要做到这一点我必须 将非托管dll设置为启动项目 将托管程序设置为启动命令 将调试模式设置为Native 中断执行 或
  • 使用有关 WooCommerce 用户创建的生成密码发送电子邮件通知

    在 WooCommerce 中 使用下面的代码我创建新的 WP User 其中随机密码并将用户角色设置为 客户 我想在购买时自动创建帐户 然后我用WC Emails将登录详细信息发送给买家 在这种情况下 我需要纯密码 但我真的不知道为什么附
  • 如何锁定 SVN 主干(除了来自分支的合并)?

    我想阻止开发人员直接在主干上工作 我的目标是强制所有开发人员离开主干并在自己的分支上工作 直到 CI 测试通过 然后 他们必须从主干合并到分支 以获取最新更改 运行并通过测试 然后再合并回主干 这种 SVN 使用方式有什么规则吗 限制主干提
  • “grep”命令的退出状态代码

    The grep http linux die net man 1 grep手动在退出状态部分报告 EXIT STATUS The exit status is 0 if selected lines are found and 1 if
  • CTE 的意外结果

    我创建了一个使用多个 CTE 的复杂流程 主要用于递归分层工作 在小样本数据集上 一切都按预期进行 但是当我将代码应用于大数据集时 我收到了意外 且错误 的结果 我想我已经将范围缩小到了 CTE 递归 CTE 是在几个早期 CTE 中处理的
  • 在 Datalab 中查询 Hive 表时出现问题

    我已经创建了一个 dataproc 集群 其中包含更新的 init 操作来安装 datalab 一切正常 除了当我从 Datalab 笔记本查询 Hive 表时 我遇到了 hc sql select from invoices limit
  • Chrome 扩展:点击编辑当前网址,然后重定向到编辑后的网址

    我是一名心理学学生 我经常阅读论文 大学图书馆提供数据库的访问 但我每次都需要使用图书馆搜索引擎并登录 很烦人 我找到了一种避免跳转页面的方法 方法如下 我在Google Scholar中找到一篇论文后 在目标数据库地址末尾添加 ezp l
  • Symfony2 SonataAdminBundle 密码字段加密

    我有 FOSUserBundle 来管理我的用户 SonataAdminBundle 来管理我的网站 我有一个问题 每当我尝试更改 添加任何用户的密码时 密码都不会编码到sha512 但是当用户在 fosuserbundle 注册页面中注册
  • SQLite查询:获取一行的所有列(android)?

    这是架构 SQL查询是 从unjdat中选择 其中col 1 myWord 即 我想显示 col 1 为的行的所有列myWord int i String temp words new ArrayList
  • 如何使用正则表达式将缩写与其含义相匹配?

    我正在寻找与以下字符串匹配的正则表达式模式 一些示例文本 SET 演示了我正在寻找的内容 能源系统模型 ESM 用于寻找特定的最佳值 SCO 有人说计算机系统 CUST 很酷 夏天应该首选户外比赛 OUTS 我的目标是匹配以下内容 Some
  • 使用函数触发 chrome.browserAction.onClicked

    我想触发点击 以下代码正在侦听 chrome browserAction onClicked addListener function tab 原因是我有一个工作扩展 它正在后台脚本 上面的 addListener 中监听并在单击时执行一些
  • JavaScript 数组迭代返回多个值

    这太简单了 我感到困惑 我有以下内容 var x shrimp var stypes new Array shrimp crabs oysters fin fish crawfish alligator for t in stypes if
  • 动态改变任务重试次数

    重试任务可能毫无意义 例如 如果任务是传感器 并且由于凭据无效而失败 那么以后的任何重试都将不可避免地失败 如何定义可以决定重试是否合理的操作员 在 Airflow 1 10 6 中 决定任务是否应该重试的逻辑位于airflow model