具有不同调度程序间隔的气流ExternalTask​​Sensor

2023-11-29

目前我有两个 DAG:DAG_A 和 DAG_B。两者都运行schedule_interval=timedelta(days=1)

DAG_A 有一个 Task1,通常需要 7 小时才能运行。而DAG_B只需要3个小时。

DAG_B 有一个ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1")但还使用每小时生成的一些其他信息 X。

增加 DAG_B 频率使其每天至少运行 4 次的最佳方法是什么?据我所知,两个 DAG 必须具有相同的 Schedule_interval。但是,我想尽可能更新 DAG_B 上的 X。


一种可能性是创建另一个具有 DAG_B 的ExternalTask​​Sensor 的 DAG。但我不认为这是最好的方法。


如果我没理解错的话,你的条件是:

  • 继续跑DAG_A daily
  • Run DAG_B n一天几次
  • 每次 DAG_B 运行时都会等待DAG_A__任务_1要完成的

我认为您可以通过指导轻松调整当前的设计ExternalTaskSensor等待所需的执行日期DAG_A.

来自外部任务传感器运算符定义:

等待不同的 DAG 或不同 DAG 中的任务在特定的execution_date 内完成

That execution_date可以使用定义execution_date_fn范围:

execution_date_fn(可选[Callable]) – 接收当前执行日期作为第一个位置参数和上下文字典中可用的任意数量的关键字参数(可选)的函数,并返回要查询的所需执行日期。 execution_delta 或execution_date_fn 可以传递给ExternalTask​​Sensor,但不能同时传递给两者。

您可以这样定义传感器:

    wait_for_dag_a = ExternalTaskSensor(
        task_id='wait_for_dag_a',
        external_task_id="external_task_1",
        external_dag_id='dag_a_id',
        allowed_states=['success', 'failed'],
        execution_date_fn=_get_execution_date_of_dag_a,
        poke_interval=30
    )

Where _get_execution_date_of_dag_a使用以下命令对数据库执行查询get_last_dagrun让你得到最后的execution_date of DAG_A.

from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun

@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None,  **kwargs):
    dag_a_last_run = get_last_dagrun(
        'dag_a_id', session)
    return dag_a_last_run.execution_date

我希望这个方法可以帮助你。您可以在中找到一个工作示例这个答案.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有不同调度程序间隔的气流ExternalTask​​Sensor 的相关文章

  • 从外部触发 Perfect 工作流程

    我目前有一个在 EC2 实例上本地运行的 Prefect 工作流程 我可以通过 UI 在 localhost 8080 上触发我的工作流程 有没有办法通过 REST API 或其他方式在外部触发 Prefect 工作流程 例如 AWS La
  • 如何定义 Airflow 上 STFP Operator 的操作?

    class SFTPOperation object PUT put GET get operation SFTPOperation GET NameError name SFTPOperation is not defined 我在这里定
  • Airflow DAG动态结构

    我正在寻找一个可以决定 dag 结构的解决方案当 dag 被触发时因为我不确定我必须运行的操作员数量 请参阅下面我计划创建的执行顺序 Task B 1 Task C 1 Task B 2 Task C 2 Task A Task B 3 g
  • Airflow:ValueError:无法配置处理程序“处理器” - wasb 记录器

    我正在尝试使用 Azure blob 配置远程日志记录 Airflow version 1 10 2 Python 3 6 5 Ubuntu 18 04 以下是我所做的步骤 在 AIRFLOW HOME config log config
  • 删除 Airflow Scheduler 日志

    我正在使用 Docker Apache Airflow 版本 1 9 0 2 https github com puckel docker airflow https github com puckel docker airflow 调度程
  • 气流池使用的插槽大于插槽限制

    有三个传感器任务并使用相同的池 池 limit sensor 设置为1 但池限制不起作用 三个池一起运行 sensor wait SqlSensor task id sensor wait dag dag conn id dest data
  • 如何在 Airflow 中安装软件包?

    我在 Airflow 在 GCP 上 部署了一个 dag 但收到错误 没有名为 scipy 的模块 如何在 Airflow 中安装软件包 我尝试添加一个单独的 DAG 来运行 def pip install package subproce
  • AWS Lambda 和 Apache Airflow 集成

    想知道是否有人可以阐明这个问题 我正在尝试找到 Airflow REST API URL 以启动 DAG 以从 AWS Lambda 函数运行 到目前为止 除了查看 Apache 孵化器站点提供的所有相关文档之外 解决该问题的唯一指导是在
  • 还有一个“此 DAG 在网络服务器 DagBag 对象中不可用”

    这似乎是一个相当普遍的问题 我有一个 DAG 我不仅可以手动触发它airflow trigger dag 但它甚至按照其时间表执行 但拒绝显示在 UI 中 我已经多次重新启动网络服务器和调度程序 按 刷新 十亿次 然后运行它airflow
  • 在 Airflow 中编写和导入自定义插件

    这实际上是两个问题合二为一 My AIRFLOW HOME结构如下 airflow dags plugins init py hooks init py my hook py another hook py operators init p
  • 如何向正在运行的气流服务添加新的 dag?

    我有一个气流服务 当前作为网络服务器和调度程序的单独 Docker 容器运行 两者都由 postgres 数据库支持 我在两个实例之间同步了 dags 并且在服务启动时正确加载了 dags 但是 如果我在服务运行时将新的 dag 添加到 d
  • 如何检查何时为特定 dag 安排了下一次 Airflow DAG 运行?

    我已设置气流并运行一些 DAG 安排每天一次 0 0 我想检查下次计划运行特定 dag 的时间 但我看不到可以在管理员中执行此操作的位置 如果你愿意 你可以使用Airflow s CLI 有next execution option htt
  • 为每个文件运行气流 DAG

    所以我在airflow中有一个非常好的DAG 它基本上在二进制文件上运行几个分析步骤 作为airflow插件实现 DAG 由 ftp 传感器触发 该传感器仅检查 ftp 服务器上是否有新文件 然后启动整个工作流程 所以目前的工作流程是这样的
  • 我怎样才能得到dag中的execution_date?运算符的外部?

    我怎样才能获得execution date参数在 dag 之外 execution min execution date strftime M if execution min 00 logging info YES It s 00 fin
  • BigQuery with Airflow - 缺少projectId

    尝试下面的例子 https cloud google com blog big data 2017 07 how to aggregate data for bigquery using apache airflow https cloud
  • 使用 Airflow BigqueryOperator 向 BigQuery 表添加标签

    我必须向 bigquery 表添加标签 我知道可以通过 BigQuery UI 来完成此操作 但如何通过气流运算符来完成此操作 Use case 用于计费和搜索目的 由于多个团队在同一项目和数据集下工作 我们需要将各个团队创建的所有表组合在
  • 气流获取重试次数

    在我的 Airflow DAG 中 我有一个任务需要知道它是第一次运行还是重试运行 如果是重试尝试 我需要调整任务中的逻辑 我对如何存储任务的重试次数有一些想法 但我不确定其中是否有合法的 或者是否有更简单的内置方法可以在任务中获取此信息
  • 使用DockerOperator时如何同时使用xcom_push=True和auto_remove=True?

    Problem 跑步时DockerOperator with xcom push True xcom all True and auto remove True 任务会引发错误 就好像容器在读取其内容之前被删除一样STDOUT Exampl
  • Airflow 默认连接数过多

    我打开气流并检查连接 发现其后面运行的连接太多 关于如何杀死那些我不使用的任何想法 或者我很想知道运行它的最小 conn id 建筑学 LocalExecutor 与其他经纪人不同 Postgres 作为元数据库 但它列出了 17 个连接
  • 无法设置气流,在“启动气流数据库”时出现错误

    无法设置气流 在 启动气流数据库 时出现错误 我收到以下错误 File Library Frameworks Python framework Versions 3 8 bin airflow line 26 in

随机推荐