如何管理气流 dag 之间的 python 包?

2024-02-24

如果我有多个气流 dags 以及一些重叠的 python 包依赖项,我如何保留每个项目 deps。脱钩?例如。如果我在同一台服务器上有项目 A 和 B,我会用类似的东西运行它们。

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate

基本上,想在相同的情况下运行 dags(例如,每个 dag 使用可能有重叠的包依赖的 python 脚本。我想单独开发(即,当想要更新时不必使用包更新所有代码)该软件包仅适用于一个项目))。请注意,我一直在使用BashOperator运行 python 任务,例如...

do_stuff = BashOperator(
        task_id='my_task',
        bash_command='python /path/to/script.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

有办法让它发挥作用吗?气流是否还有其他最佳实践方法可以帮助人们解决(或避免)此类问题?


根据 apache-airflow 邮件列表的讨论,解决我使用各种 python 脚本执行任务的模块化方式的最简单答案是直接为每个脚本或模块调用 virtualenv python 解释器二进制文件,例如。

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate

会翻译成类似的东西

do_stuff_a = BashOperator(
        task_id='my_task_a',
        bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)
do_stuff_b = BashOperator(
        task_id='my_task_b',
        bash_command='/path/to/virtualenv_b/bin/python /path/to/script_b.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

在气流中。


关于将参数传递给任务的问题,这取决于您想要传入的参数的性质。在我的例子中,某些参数取决于数据表在 dag 运行当天的样子(例如表中的最高时间戳记录等) .)。为了将这些参数添加到任务中,我有一个在此之前运行的“congif dag”。在配置 dag 中,有一个任务将“真实”dag 的参数生成为 python 字典并转换为 pickle 文件。然后“config” dag 有一个任务,它是TriggerDagRunOperator激活“真实”dag,它具有从“config”dag 生成的 pickle 文件中读取的初始逻辑(在我的例子中,作为Dict)我把它读进去bash_command串状bash_command=f"python script.py {configs['arg1']}".

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何管理气流 dag 之间的 python 包? 的相关文章

  • AssertionError:内部:未指定默认项目

    气流新手 尝试运行 SQL 并将结果存储在 BigQuery 表中 出现以下错误 不确定在哪里设置default rpoject id 请帮我 Error Traceback most recent call last File usr l
  • 无法从flower.command导入名称FlowerCommand

    我已经安装了 apache airflow 版本 2 1 2 但是当我启动工作程序时 它说它无法从flower command 导入名称 FlowerCommand 操作系统centos 7 python版本是3 7 6 apahce ai
  • 如何从气流传感器中提取 xcom 值?

    主要问题 我正在尝试创建一个 BigQuery 表 如果不存在 方法 使用 BigQueryTableSensor 检查表是否存在 并根据返回值 使用 BigQueryCreateEmptyTableOperator 创建或不创建新表 Pr
  • 气流中任务的粒度

    对于一项任务 有许多辅助任务 从文件 数据库获取 保存属性 验证 审核 这些辅助方法并不耗时 一个样本 DAG 流 fetch data gt gt actual processing gt gt validation gt gt save
  • 有什么方法可以监控 Airflow DAG 的执行时间吗?

    我想将 Airflow 与 Statsd 和 DataDog 一起使用来监控 DAG 是否需要例如是之前执行的两倍 所以 我需要某种用于 DAG 的实时计时器 或者operator 我知道 Airflow 支持一些指标 https airf
  • 在 TriggerDagRunOperator 中提供上下文

    我有一个 dag 它被另一个 dag 触发 我已经通过这个 dag 传递了一些配置变量DagRunOrder payload字典以同样的方式官方示例 https github com apache incubator airflow blo
  • Airflow BigQueryOperator:如何将查询结果保存在分区表中?

    我有一个简单的 DAG from airflow import DAG from airflow contrib operators bigquery operator import BigQueryOperator with DAG da
  • 清除后气流强制重新运行上游任务,即使下游任务标记为成功

    我在 Airflow 中有任务 A gt B gt C 当我运行 DAG 并全部成功完成时 我希望能够单独清除 B 同时将 C 标记为成功 B 清除并进入 no status 状态 但当我尝试重新运行 B 时 什么也没有发生 我尝试过 ig
  • 气流动态 dag 创建

    有人请告诉我气流中的 DAG 是否只是一个图表 如占位符 没有任何与其关联的实际数据 如参数 或者 DAG 是否像一个实例 对于固定参数 我想要一个系统 其中要执行的操作集 给定一组参数 是固定的 但每次运行这组操作时 该输入都会不同 简单
  • 我无法通过 BashOperator xcom_push 参数

    我是 Airflow 的 xcom 功能的新手 我用 PythonOperator 尝试了它 它工作得很好 即 我可以从上下文中推送和提取值 但是当我在 BashOperator 上尝试它时 它不起作用 但是 我可以通过在任务创建期间添加
  • 气流池使用的插槽大于插槽限制

    有三个传感器任务并使用相同的池 池 limit sensor 设置为1 但池限制不起作用 三个池一起运行 sensor wait SqlSensor task id sensor wait dag dag conn id dest data
  • 我可以通过编程方式确定 Airflow DAG 是计划的还是手动触发的?

    我想创建一个片段 根据 DAG 是计划的还是手动触发的来传递正确的日期 DAG 每月运行一次 DAG 根据上个月的数据生成报告 SQL 查询 如果我运行预定的 DAG 我可以使用以下 jinja 片段获取上个月的数据 execution d
  • Airflow - 处理 DAG 回调的正确方法

    我有一个DAG然后每当它成功或失败时 我希望它触发一个发布到 Slack 的方法 My DAG args就像下面这样 default args on failure callback slack slack message sad mess
  • AWS Lambda 和 Apache Airflow 集成

    想知道是否有人可以阐明这个问题 我正在尝试找到 Airflow REST API URL 以启动 DAG 以从 AWS Lambda 函数运行 到目前为止 除了查看 Apache 孵化器站点提供的所有相关文档之外 解决该问题的唯一指导是在
  • 没有这样的文件或目录 /airflow/xcom/return.json

    创建了一个图像包含 airflow xcom return json在所有子目录上使用 chmod x 由于日志显示找不到文件或目录 尝试过 chmod x strtpodbefore KubernetesPodOperator names
  • 如何向正在运行的气流服务添加新的 dag?

    我有一个气流服务 当前作为网络服务器和调度程序的单独 Docker 容器运行 两者都由 postgres 数据库支持 我在两个实例之间同步了 dags 并且在服务启动时正确加载了 dags 但是 如果我在服务运行时将新的 dag 添加到 d
  • 气流中的execution_date:需要作为变量访问

    我真的是这个论坛的新手 但有一段时间 我一直在为我们公司玩气流 抱歉 如果这个问题听起来很愚蠢 我正在使用一堆 BashOperators 编写一个管道 基本上 对于每个任务 我想简单地使用 curl 调用 REST api 这就是我的管道
  • 为每个文件运行气流 DAG

    所以我在airflow中有一个非常好的DAG 它基本上在二进制文件上运行几个分析步骤 作为airflow插件实现 DAG 由 ftp 传感器触发 该传感器仅检查 ftp 服务器上是否有新文件 然后启动整个工作流程 所以目前的工作流程是这样的
  • 气流,在 dag 运行之前标记任务成功或跳过它

    我们有一个巨大的 DAG 其中有许多小而快速的任务和一些大而耗时的任务 我们只想运行 DAG 的一部分 我们发现最简单的方法是不添加我们不想运行的任务 问题是我们的 DAG 有很多相互依赖关系 因此当我们想要跳过某些任务时 不破坏 DAG
  • 使用 Airflow BigqueryOperator 向 BigQuery 表添加标签

    我必须向 bigquery 表添加标签 我知道可以通过 BigQuery UI 来完成此操作 但如何通过气流运算符来完成此操作 Use case 用于计费和搜索目的 由于多个团队在同一项目和数据集下工作 我们需要将各个团队创建的所有表组合在

随机推荐

  • Spring Boot REST API 的指标收集

    我正在尝试收集我的 Spring Boot 2 1 0 RELEASE 应用程序的指标 具体来说我想知道 调用各个 REST 端点的次数 每个端点处理请求所花费的时间 我的请求被处理 出错的平均速率 执行器 actuator metrics
  • 测量缠绕的绳子

    我正在尝试创建一个控件 它基本上允许我在彼此下面绘制不同的字符串 但是 字符串的宽度不能大于控件的宽度 为了解决这个问题 我正在考虑将 RectangleF 对象传递给 Graphics DrawString 方法 这将包装比传递的矩形宽度
  • array_walk_recursive 与数组?

    我有一个菜单数组 它是一个多维数组 我想对每个项目做一些事情 所以我尝试了 array walk recursive 这是菜单 menu array array name gt a url gt b array name gt c url
  • X 没有实现 Y(...方法有一个指针接收器)

    已经有几个关于此的问答 X 没有实现 Y 方法有一个指针接收器 的事情 但对我来说 他们似乎在谈论不同的事情 并不适用于我的具体情况 因此 我没有将问题变得非常具体 而是将其变得广泛和抽象 似乎有几种不同的情况可能会导致此错误发生 有人可以
  • 从 Chrome 打包应用程序读取和写入本地 sqlite 数据库

    是否可以从 chrome 打包应用程序读取和写入本地 sqlite 文件 我目前已经读取并写入了一个 json 文件 其中包含本地存储在硬盘上的应用程序数据 但我也希望能够使用 sqlite 数据库来执行此操作 我需要它在本地而不是在驱动器
  • 从概念上讲,重玩在游戏中是如何运作的?

    我有点好奇如何在游戏中实现重播 最初 我认为游戏中只会有一个包含每个玩家 人工智能操作的命令列表 然后它会 重新玩 游戏并让引擎照常渲染 然而 我查看了 FPS RTS 游戏的重播 经过仔细检查 甚至像粒子和图形 声音故障之类的东西都是一致
  • 如何编辑嵌入不和谐中的图像?

    是否可以更改嵌入内的图像 我正在尝试重新创建一个我在 Reddit 上看到的 蚀刻草图 机器人 并且想知道它是如何完成的 到目前为止 这是我尝试过的 这是在制作图像的函数内部 code that draws the etch a sketc
  • Next.js 路由器对某些页面上的浏览器后退按钮没有反应

    当浏览器的后退按钮打开时 我遇到了难以调查的错误https gart gallery 如果你去https gart gallery artworks 然后是任何艺术品 例如https gart gallery artworks my pla
  • 使用 jquery 创建会话?

    是否可以使用 jquery 或 javascript 创建会话变量 或者我是否必须使用 ajax 来调用执行此操作的 php 您需要使用服务器请求 Javascript仅在客户端运行 会话数据存储在服务器上 example of passi
  • 将 CSV 文件转换为 TF 记录

    我已经运行我的脚本超过 5 个小时了 我有 258 个 CSV 文件想要转换为 TF Records 我编写了以下脚本 正如我所说 我已经运行它超过 5 个小时了 import argparse import os import sys i
  • JAX-WS 返回复杂对象?

    我对 Java Web 服务还很陌生 但我在任何地方都找不到很好的解释 我在 NetBeans 中有 2 个 Java Web 项目 一种作为 Web 服务 另一种作为该 Web 服务的客户端 我还创建了自己的类 名为 Person 其中包
  • 如何将 updateview 与foreignkey/onetoonefield一起使用

    class ModTool models Model issue models OneToOneField Issue priority models CharField max length 1 choices PRIORITY blan
  • 使用 formControlName 作为反应式形式的自定义输入组件

    有一个自定义输入组件 它以带有验证的反应形式使用 Component moduleId module id toString selector custom select templateUrl custom select componen
  • 根据另一个文件中的顺序对一个文件中的行进行排序

    给定一个文件1 13 a b c d 5 f a c d 7 d c g a 14 a v s d 和一个文件2 7 x 5 c 14 a 13 i 我想考虑 file2 中第一列的相同顺序对 file1 进行排序 以便输出应为 7 d c
  • Matlab mex“缺少依赖共享库”

    我在 Matlab 2017a 中创建了几个 mex 文件 当我使用 Visual Studio C 2017 编译它们时 它们在我的计算机上运行良好 但是 当我尝试在另一台计算机上使用它们时 我收到一条错误消息 Error using m
  • 如何在 SQLAlchemy 中查询关联表?

    我正在尝试将 SQL 查询转换为 SQLAlchemy 查询 以供用户在 get API 内使用 问题是我无法从关联表中查询任何内容 我确信我不知道该方法 ORM roles users db Table roles users db Co
  • 如何从XLS(Excel)文件读取数据[Java,Android]

    我搜索过 stackoverflow 但没有找到明确的答案 如何将 XLS 文件的特定行和列的数据读取到我的 Android 应用程序 如何读取 XLS 文件 我不想将其转换为 CSV 因为当我尝试转换它们时出现错误 也许我可以用这个htt
  • iOS 长宽比限制在 iOS 7 上被打破,在 iOS 8 上工作

    在我的应用程序中 我设置了 UIView 的约束 其高度是使用宽高比约束从其宽度计算出来的 它在各种设备屏幕尺寸 3 5 4 4 7 5 5 的 iOS8 上完美运行 但它会导致应用程序在每个 iOS7 设备上崩溃 我认为这是纵横比约束 因
  • 使用两个不同类型的 Guava ListenableFutures 的结果

    我有两个 ListenableFutures 它们在其他线程上完成 每个未来都有不同的类型 我希望在它们都完成时使用它们的结果 有没有一种优雅的方法来使用番石榴来处理这个问题 如果您想要某种类型安全 您可以执行以下操作 class Comp
  • 如何管理气流 dag 之间的 python 包?

    如果我有多个气流 dags 以及一些重叠的 python 包依赖项 我如何保留每个项目 deps 脱钩 例如 如果我在同一台服务器上有项目 A 和 B 我会用类似的东西运行它们 source path to virtualenv a act