Airflow 在同一 dag 中的不同时间运行任务?

2023-11-27

我在一个 dag 中有 30 个单独的任务,它们之间没有依赖关系。这些任务运行相同的代码。唯一的区别是数据量,有些任务会在几秒钟内完成,有些任务需要 2 小时或更长时间。

问题是在追赶期间,在几秒钟内完成的任务会被需要几个小时才能完成才能进入下一个执行日期的任务所阻止。

我可以将它们分成单独的 dag,但这似乎很愚蠢,而且 30 个任务将来会增加到更多。

有没有办法在不同的执行时间在同一个 dag 中运行任务?就像任务一完成,就开始下一个执行日期,而不管其他任务的执行情况如何。

添加图片进行说明。基本上,我希望在第一排看到另外两个实心绿色方框,而第三排仍然落后。

airflow_dag_ideal

Edit:

y2k-shubham 之后解释,我尝试去实现它。但它仍然不起作用。快速任务开始于2019-01-30 00,一秒完成,并且不开始2019-01-30 01因为缓慢的任务仍在运行。如果可以的话,跑步是理想的选择2019-01-30 01, 2019-01-30 02, 2019-01-30 03...如果可能的话并行

添加代码示例

import time
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 30, 0, 0, 0),
    'trigger_rule': TriggerRule.DUMMY
}

dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')


def fast(**kwargs):
    return 1


def slow(**kwargs):
    time.sleep(600)
    return 1


fast_task = PythonOperator(
    task_id='fast',
    python_callable=fast,
    provide_context=True,
    priority_weight=10000,
    pool='fast_pool',
    # weight_rule='upstream', # using 1.9, this param doesn't exist
    dag=dag
)

slow_task = PythonOperator(
    task_id='slow',
    python_callable=slow,
    provide_context=True,
    priority_weight=500,
    pool='slow_pool',
    # weight_rule='upstream', # using 1.9, this param doesn't exist
    dag=dag
)

fast_task >> slow_task # not working

事实证明,可以设置两个变量,这将很容易解决我的问题。

concurrency and max_active_runs

在下面的示例中,您可以运行 4 个 dag,每个 dag 可以同时运行 4 个任务。其他组合也是可能的。

dag = DAG(
    dag_id='sample_dag',
    default_args=default_args,
    schedule_interval='@daily',
    # this will allow up to 16 tasks to be run at the same time
    concurrency=16,
    # this will allow up to 4 dags to be run at the same time
    max_active_runs=4,
)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Airflow 在同一 dag 中的不同时间运行任务? 的相关文章

随机推荐

  • 遍历 DOM 树

    由于大多数 全部 执行 HTML 清理的 PHP 库 例如 HTML Purifier 严重依赖于正则表达式 因此我认为尝试编写一个使用 DOMDocument 和相关类的 HTML 清理程序将是一个值得尝试的实验 虽然我还处于这个项目的早
  • 使用 GCDAsyncSocket 通过套接字进行 Telnet

    我正在尝试从目标 c 通过 telnet 连接到 Cisco C40 编解码器 在我的计算机上使用终端时 我得到 密码 然而 在进行套接字连接时 需要进行 telnet 协商 我就是这样 但由于某种原因我无法到达上面的 密码 提示 void
  • MySQL会自动优化子查询吗?

    我想运行以下查询 Main Query SELECT COUNT FROM table name WHERE device id IN SELECT DISTINCT device id FROM table name WHERE NAME
  • 覆盖默认的 Android 主题

    我已经能够覆盖任何名称前面带有 android 的主题 但 Android theme xml 还定义了似乎无法覆盖的属性 例如
  • 成员名称不能与分部类的封闭类型相同

    我定义了一个具有如下属性的分部类 public partial class Item public string this string key get if Fields null return null if Fields Contai
  • 在 sagemaker 中进行预测之前如何预处理输入数据?

    我正在使用 java Sagemaker SDK 调用 Sagemaker 端点 我发送的数据在模型可以使用它进行预测之前几乎不需要清理 我怎样才能在 Sagemaker 中做到这一点 我在 Jupyter 笔记本实例中有一个预处理功能 它
  • 相邻兄弟姐妹的边际崩溃

    我正在阅读有关保证金崩溃的文章 我发现了这一点 margin 相邻兄弟姐妹 相邻兄弟姐妹的边距折叠 除非后一个兄弟需要清除过去的浮动 我不明白最后一句话 除非后一个兄弟需要清除过去的浮动 有人可以举个例子吗 谢谢 首先 下面的示例仅适用于基
  • 输出两个单词之间的文本

    我想使用 PHP 输入某个文本 输出应该是两个单词之间的文本 澄清 Input Lorem ipsum dolor sit amet Output dolor sit str Lorem ipsum dolor sit amet word1
  • 调用仅在运行时已知的函数

    我想通过输入验证服务器来首次尝试 Rust 应用程序 该服务器可以验证 AJAX 请求中的值 这意味着我需要一种方法来使用 JSON 配置文件来根据输入值的名称以及可能在运行时在 HTTP 请求中传入的表单名称来指定使用哪些验证函数 我怎样
  • 无法在 Chrome 扩展程序中使用 jQuery 触发点击

    我试图用一行 jQuery 代码创建一个 Chrome 扩展 但它不起作用 我正在尝试触发对某个元素的点击 chrome的控制台根本没有显示任何错误 当我只将 jQuery 代码放入控制台时 它工作正常 My code 内容 js docu
  • Hibernate 5 更改为不使用仅获取第一行

    我正在使用带有 oracle 11 的 Hibernate 5 2 它不支持仅获取第一行 我需要回到旧样式的 hibernate 有没有休眠配置可以做到这一点 您可以强制 Hibernate 使用 Oracle 10g 方言 这样您将使用
  • 为什么更改总和顺序会返回不同的结果?

    为什么更改总和顺序会返回不同的结果 23 53 5 88 17 64 47 05 23 53 17 64 5 88 47 050000000000004 Both Java and JavaScript返回相同的结果 据我了解 由于浮点数以
  • eclipse中Android插件安装问题

    我正在尝试安装 ADT 插件 我转到 Help install new software in eclipse 然后选择开发工具后出现此错误 Cannot complete the install because one or more r
  • Django:通过外键字段的id进行查询的正确方法是什么?

    我有两个models class Organization models Model title models CharField max length 100 class Folder models Model organization
  • 有没有办法同时使用 GET 和 POST?

    我需要使用这两种方法一起传递一些数据 GET 和 POST 我写了这个方法 但不知道是否安全
  • firebase2.default.firestore 不是一个函数 - React Firebase

    我正在尝试使用火库在我的 Reactjs 应用程序中 当我从 firebase 配置对象调用 firestore 时 It says firebase2 default firestore 不是一个函数 这是代码 import fireba
  • 如何使匿名函数中的多个参数隐式?

    如果我们有一个接受匿名函数的方法A gt B作为参数 我们可以使A隐含在我们的调用中 def impl a Int f Int gt Int Int f a impl a implicit z gt 但是我们可以使用具有多个参数的匿名函数来
  • EclipseLink JPA“此上下文中的表无效”与@OneToMany Map

    我希望我只是在这里做了一些愚蠢的事情 我正在尝试为一个设置 JPA 注释Map
  • OpenCV 不规则形状的质心

    如何使用 OpenCV 获得不规则形状的质心 我建议看看简历 时刻 C 或 cvMoments C 函数 这个 StackOverflow 线程提供了一些与您的问题非常相似的示例代码 这个帖子讨论一些与寻找物体中心点相关的理论
  • Airflow 在同一 dag 中的不同时间运行任务?

    我在一个 dag 中有 30 个单独的任务 它们之间没有依赖关系 这些任务运行相同的代码 唯一的区别是数据量 有些任务会在几秒钟内完成 有些任务需要 2 小时或更长时间 问题是在追赶期间 在几秒钟内完成的任务会被需要几个小时才能完成才能进入