当子文件夹具有相同名称时,Airflow Packaged Dags(压缩)会发生冲突

2023-12-19

我们正在建立一个 Airflow 框架,多个数据科学家团队可以在其中协调他们的数据处理管道。我们开发了一个 Python 代码库来帮助他们实现 DAG,其中包括各种包和模块中的函数和类(还有 Operator 子类)。

每个团队都会将自己的 DAG 与包中的函数和类一起打包在 ZIP 文件中。例如第一个 ZIP 文件将包含

ZIP1:

main_dag_teamA.py

子文件夹1:package1-with-generic-functions +init.py

子文件夹2:package2-with-generic-operators +init.py

另一个 ZIP 文件将包含

ZIP2:

main_dag_teamB.py

子文件夹1:package1-with-generic-functions +init.py

子文件夹2:package2-with-generic-operators +init.py

请注意,在两个 ZIP 文件中,子文件夹 1 和子文件夹 2 通常完全相同,这意味着具有相同功能和类的完全相同的文件。 但随着时间的推移,当新版本的软件包可用时,DAG 软件包中的软件包内容将开始出现偏差。

通过这种设置,我们遇到了以下问题:当包/子文件夹的内容在 ZIP 中开始出现偏差时,Airflow 似乎不能很好地处理同名包。 因为当我运行“airflow list_dags”时,它显示如下错误:

文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,位于 > from subfolder1.functions1 import function1 导入错误:没有名为“subfolder1.functions1”的模块

可以使用以下代码重现问题,其中两个小 DAG 与包 my_functions 一起位于其 ZIP 文件中,该包具有相同的名称,但内容不同。

DAG 包 ZIP 1:

程序1.py

from my_functions.functions1 import function1

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)

my_functions/functions1.py:

def function1():
    print('function1')

DAG 包 ZIP 2:

程序2.py:

from my_functions.functions2 import function2

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)

my_functions/functions2.py:

def function2():
    print('function2')

使用这两个 ZIP 文件,当我运行“airflow list_dags”时,它显示错误:

文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,位于 from subfolder1.functions1 import function1 ImportError:没有名为“subfolder1.functions1”的模块

当 ZIP 中子文件夹的内容相同时,不会发生错误。

我的问题:如何防止 ZIP 中的子文件夹发生冲突?我真的很希望拥有完全代码独立的 DAG,以及它们自己的软件包版本。


通过在 DAG(program1.py 和 program2.py)顶部执行以下操作来解决

from my_functions.functions1 import function1

and

from my_functions.functions2 import function2

Code:

import sys

# Cleanup up the already imported function module
cleanup_mods = []
for mod in sys.modules:
    if mod.startswith("function"):
        cleanup_mods.append(mod)
for mod in cleanup_mods:
    del sys.modules[mod]

这可确保每次解析 DAG 时,导入的库都会被清理。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

当子文件夹具有相同名称时,Airflow Packaged Dags(压缩)会发生冲突 的相关文章

随机推荐

  • 如何访问 Knockout 组件中的自定义元素?

    看看这个场景 ko components register hello viewModel function template h1 hello wrold h1 如果我使用
  • 卡夫卡生产者批量大小

    我有一个相关问题batch size生产者配置 当发生什么batch size已经达到并且生产者应用程序线程发送更多数据 线程是否会阻塞 直到包含批处理的缓冲区中的空间可用为止 batch size以总字节数而不是消息数来衡量批量大小 它控
  • 如何使用 RSpec 在 Rails 测试环境中加载 Seed.rb?

    我有以下 seeds rb 文件 State create name gt Alabama abbreviation gt AL name gt Alaska abbreviation gt AK name gt Arizona abbre
  • CSS ::before ::after 类的伪元素不起作用

    我正在尝试添加一个 before and after菜单标题的伪元素 伪元素对于菜单外的常规链接效果很好 但是 当我尝试将它们应用到菜单项时 background属性已设置 但 before and after属性不是 这是相关的CSS c
  • Rails 资产用于开发而非生产

    古老的 我不懂链轮 问题 我在 app assets javascript jquery fancybox js 中有一个资产 我使用 来访问 加载它 这在开发中有效 但在生产中无效 我正在查看日志 但那里肯定发生了一些事情 因为我看不到它
  • Azure B2C 自定义重置密码策略

    我在 B2C 中有一些正在发挥作用的自定义策略 但我并没有尝试让重置密码发挥作用 我遇到的问题之一是 我调用 Restful API 来检查提供的电子邮件地址是否是本地用户 或者我们是否从 Microsoft AAD 登录它们 这工作正常
  • 如何在shiny中加载csv文件的文件夹

    我有一个 CSV 文件的文件夹 我想将它们作为闪亮的文件列表上传和访问 我尝试使用以下代码来上传文件 server output sourced lt renderDataTable inFile lt input file1 if is
  • ember.js + 把手:渲染 vs 出口 vs 部分 vs 视图 vs 控制

    每个周围都有分散的解释 但我仍然不是 100 清楚它们的差异和用法 有人可以给我一个横向比较吗 outlet outlet NAME render partial view control Note 这个帖子 https stackover
  • Grails WAR 热部署导致的“call 'refresh'”错误

    当我的 Grails WAR 热部署到 Tomcat 并刷新页面时 出现以下错误 2010 年 2 月 1 日 7 00 51 PM org apache catalina core ApplicationDispatcher 调用 严重
  • 具有多个条件的 if 的执行顺序

    在具有多个条件的 if 语句中 如果第一个条件的结果明确 是否会执行第二个条件 example if i gt 0 array i 0 如果我交换条件 i 的负值可能会发生段错误 但这样就不会发生段错误 我可以确定这总是有效还是必须使用嵌套
  • groovy.json.JsonException:期待“}”或“,”但获得当前字符

    我正在尝试让一段代码为我工作 但运气不佳 所以我把代码分解成这个让我悲伤的小片段 任何人都可以帮助确定为什么会发生这个错误 import groovy json JsonSlurper String index accessCode d20
  • Firestore - 创建集合的副本

    所以我有一个名为 草稿 的集合 其中包含多个文档 每个文档都有一个自动 ID 每个文档包含字段 名称 和 详细信息 每个文档都显示在 nameLabel 和 detailsLabel 下的 tableViewCell 中 我想做的是 当用户
  • 如何更改android中EditText提示的字体?

    正如我在问题中提到的 我正在尝试更改 EditText 中提示的字体 但我似乎无法实现它 这是我的 EditText 获取用户名的代码
  • 重新计算风格:为什么这么口吃?

    假设我们有一段代码将一系列相似的元素注入到 DOM 中 像这样的东西 var COUNT 10000 elements Object keys Array COUNT join split var d document root d get
  • AS3 最大文本字段宽度

    如何设置Textfield的最大宽度 我需要自动调整宽度 直到达到最大宽度 因此长文本会断行 var maxWidth Number 200 textField multiline false textField wordWrap fals
  • devicePixelRatio可以小于1吗

    由于某种原因 任何小于 1 的像素比值在我的渲染算法中都不起作用 值 0 没有意义 但 0 5 则有意义 有没有可能devicePixelRatio返回 0 到 1 之间的值 如果您将浏览器缩放设置为小于 100 则 DPR 可能小于 1
  • 制作(从源代码安装)python 而不运行测试

    我从源码 tar 编译 python 一切正常 但测试运行了 2 小时和两次 如何绕过这些测试 0 16 20 178 405 test inspect 0 16 26 179 405 test int 0 16 27 180 405 te
  • 如何在 tvOS 中打开 GameCenter

    如何在 tvOS 中打开游戏中心排行榜 我已将此代码用于我的 iPhone 游戏 排行榜标识符 在 tvOS 上不可用 我计划在 AppleTV 上使用相同的排行榜 这将是同一个游戏 非常感谢您的帮助 斯特凡 IBAction func h
  • xsl 在 xml 中定义

    我在 movie xml 中的前几行如下
  • 当子文件夹具有相同名称时,Airflow Packaged Dags(压缩)会发生冲突

    我们正在建立一个 Airflow 框架 多个数据科学家团队可以在其中协调他们的数据处理管道 我们开发了一个 Python 代码库来帮助他们实现 DAG 其中包括各种包和模块中的函数和类 还有 Operator 子类 每个团队都会将自己的 D