Airflow:为每个文件运行 DAG 的正确方法

2023-11-24

我有以下任务需要解决:

文件通过端点不定期发送并存储在本地。我需要为每个文件触发 DAG 运行。对于每个文件,将执行相同的任务

总体流程如下:对于每个文件,运行任务 A->B->C->D

正在批量处理文件。虽然这项任务对我来说似乎微不足道,但我找到了几种方法来完成此任务,并且我对哪一种是“正确的”方法(如果有的话)感到困惑。

第一种模式:使用实验性 REST API 触发 dag。

也就是说,公开一个 Web 服务,该服务接收请求和文件,将其存储到文件夹中,并使用实验性 REST API通过将 file_id 作为 conf 传递来触发 DAG

Cons: REST api 仍然实验性的,不确定 Airflow 如何处理同时出现许多请求的负载测试(这不应该发生,但是,如果发生了怎么办?)

第二种模式:2 个 dags。一种通过 TriggerDagOperator 进行感知和触发,一种进行处理。

始终使用与前面描述的相同的 ws,但这次它只存储文件。然后我们有:

  • 第一个 dag:使用 FileSensor 和 TriggerDagOperator 来触发给定 N 个文件的 N 个 dags
  • 第二个目标:任务 A->B->C

Cons:需要避免将相同的文件发送到两个不同的 DAG 运行。 例子:

文件夹 x.json 中的文件 传感器找到 x,触发 DAG (1)

传感器返回并重新安排。如果 DAG (1) 未处理/移动文件,传感器 DAG 可能会重新安排使用同一文件运行的新 DAG。这是不需要的。

第三种模式:对于文件中的文件,任务 A->B->C

正如所见这个问题.

Cons:这可行,但我不喜欢的是 UI 可能会变得混乱,因为每次 DAG 运行看起来都不一样,但它会随着正在处理的文件数量而变化。此外,如果有 1000 个文件需要处理,则运行可能会很难读取

第四种模式:使用子标签

我还不确定它们是如何完全工作的,正如我所看到的他们没有受到鼓励(最后),但是应该可以为每个文件生成一个子dag并让它运行。如同这个问题.

Cons:似乎 subdags 只能与顺序执行器一起使用。


我是否遗漏了一些东西并且过度思考了一些(在我看来)应该非常简单的东西?谢谢


我知道我迟到了,但我会选择第二种模式:“2 个 dags。一个用 TriggerDagOperator 感知和触发,一个处理”,因为:

  • 每个文件都可以并行执行
  • 第一个 DAG 可以选择要处理的文件,对其进行重命名(添加后缀“_processing”或将其移动到处理文件夹)
  • 如果我是贵公司的新开发人员,我打开工作流,我想了解工作流在做什么的逻辑,而不是上次动态构建时处理了哪些文件
  • 如果 dag 2 发现文件有问题,则会重命名该文件(使用“_error”后缀或将其移动到错误文件夹)
  • 这是处理文件的标准方法,无需创建任何额外的运算符
  • 它使 de DAG 幂等且更易于测试。更多信息在此article

重命名和/或移动文件是在每个 ETL 中处理文件的一种非常标准的方法。

顺便说一下,我一直推荐这篇文章https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753。事实并非如此

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Airflow:为每个文件运行 DAG 的正确方法 的相关文章

  • Python 中的字节数组

    如何在 Python 中表示字节数组 如 Java 中的 byte 我需要用 gevent 通过网络发送它 byte key 0x13 0x00 0x00 0x00 0x08 0x00 在Python 3中 我们使用bytes对象 也称为s
  • 切片稀疏(scipy)矩阵

    我将不胜感激任何帮助 以理解从 scipy sparse 包中切片 lil matrix A 时的以下行为 实际上 我想根据行和列的任意索引列表提取子矩阵 当我使用这两行代码时 x1 A list 1 x2 x1 list 2 一切都很好
  • JavaScript 相当于 Python 的参数化 string.format() 函数

    这是 Python 示例 gt gt gt Coordinates latitude longitude format latitude 37 24N longitude 115 81W Coordinates 37 24N 115 81W
  • boto3 资源(例如 DynamoDB.Table)的类型注释

    The boto3库提供了几种返回资源的工厂方法 例如 dynamo boto3 resource dynamodb Table os environ DYNAMODB TABLE 我想注释这些资源 以便我可以获得更好的类型检查和完成 但我
  • Jupyter Notebooks 不显示进度条

    我正在尝试在 Jupyter 笔记本中显示进度条 这是一台新电脑 我通常做的事情似乎不起作用 from tqdm import tqdm notebook example iter 1 2 3 4 5 for rec in tqdm not
  • 将 numpy 数组写入文本文件的速度

    我需要将一个非常 高 的两列数组写入文本文件 而且速度非常慢 我发现如果我将数组改造成更宽的数组 写入速度会快得多 例如 import time import numpy as np dataMat1 np random rand 1000
  • 为什么我的代码不能根据字典解码加密字符串?

    我有一本字典 其中包含代表字母的键和值 例如一个简单的 DICT CODE b g n a p o x d t y 我收到了一个加密代码 并将该字符串转换为一个列表 其中每个项目都是一个单词 我需要根据字典中的项目来解决它 代码示例是 wo
  • 了解 Python 中的酸洗

    我最近接到一项作业 需要以腌制形式放置一本字典 其中每个键引用一个列表 唯一的问题是我不知道腌制形式是什么 谁能给我指出一些好的资源的正确方向来帮助我学习这个概念 pickle 模块实现了一个基本但强大的算法 用于序列化和反序列化 Pyth
  • numpy 使用 datetime64 进行数字化

    我似乎无法让 numpy digitize 与 datetime64 一起使用 date bins np array np datetime64 datetime datetime 2014 n 1 s for n in range 1 1
  • python是带有字符串的运算符行为[重复]

    这个问题在这里已经有答案了 我无法理解以下行为 我正在创建 2 个字符串 并使用 is 运算符来比较它 对于第一种情况 它的工作方式有所不同 对于第二种情况 它按预期工作 当我使用逗号或空格时 它显示是什么原因False与比较is当没有使用
  • PySide6.1 与 matplotlib 3.4 不兼容

    当我只安装PySide6时 GUI程序运行良好 但是一旦我安装了matplotlib及其依赖包 包括pyqt5 则GUI程序将无法运行并输出以下错误消息 This application failed to start because no
  • 使用 numpy 在 python 中执行最大方差旋转

    我正在研究矩阵的主成分分析 我已经找到了如下所示的组件矩阵 A np array 0 73465832 0 24819766 0 32045055 0 3728976 0 58628043 0 63433607 0 72617152 0 5
  • Pandas style.bar 颜色基于条件?

    如何渲染其中一列的 Pandas dfstyle bar color属性是根据某些条件计算的 Example df style bar subset before after color ff781c vmin 0 0 vmax 1 0 而
  • 在 Sphinx 中,有没有办法在声明参数的同时记录参数?

    我更喜欢在声明参数的同一行记录每个参数 根据需要 以便应用D R Y http en wikipedia org wiki Don t repeat yourself 如果我有这样的代码 def foo flab nickers a ser
  • Python 声音(“铃声”)

    我想让一个 python 程序在完成任务时通过发出嘟嘟声来提醒我 目前 我使用import os然后使用命令行语音程序说 进程完成 我更愿意它是一个简单的 铃 我知道有一个函数可以用于Cocoa apps NSBeep 但我认为这与此没有太
  • 检测 IDLE 的存在/如何判断 __file__ 是否未设置

    我有一个脚本需要使用 file 所以我了解到 IDLE 没有设置这个 有没有办法从我的脚本中检测到 IDLE 的存在 if file not in globals file is not set 如果你想做一些特别的事情 file 未设置
  • 如何使用 matplotlib 为圆柱体的每个单独面添加颜色

    我正在尝试为圆柱体的每个面着色 但是我不确定如何进行 我尝试了以下方法 for i in range 10 col append for i in range 10 for j in range 20 col i append plt cm
  • 导入到 SQL Server 时忽略 Excel 文件中的列

    我有多个具有相同格式的 Excel 文件 我需要将它们导入 SQL Server 我当前遇到的问题是 有两个文本列我需要完全忽略 因为它们是自由文本 并且某些行的字符长度超出了服务器允许我导入的长度 这会导致截断错误 因为我的分析不需要这些
  • 更改 Python Cmd 模块处理自动完成的方式

    我有一个 Cmd 控制台 设置为自动完成 Magic the Gathering 收藏管理系统的卡牌名称 它使用文本参数在数据库中查询卡片 并使用结果自动完成 建议卡片 然而 这些卡片名称有多个单词 Cmd 会从last到行尾的空间 例如
  • 缓存 Flask-登录 user_loader

    我有这个 login manager user loader def load user id None return User query get id 在我引入 Flask Principal 之前它运行得很好 identity loa

随机推荐

  • ffmpeg退出状态-1094995529

    我正在开发一个应用程序 使打电话给ffprobe返回非正统的退出状态 1094995529对于 Windows 上的某些文件 这个退出状态是一致给出的 并且有一些小的讨论这个的 为什么给出这个值 它记录在哪里 我是否可以期望此状态在允许的退
  • document.getElementByID 外部还是内联?

    我一直在尝试使用document getElementByID从外部 JS 文件中提取 HTML 文件的信息 但它似乎不起作用 是否document getElementByID仅当它与 HTML 文件内联时才有效 或者它可以在外部 JS
  • 如何让 Windows 窗体设计器使用外部程序集中的资源?

    我在资源文件中有一些资源 在本例中为图像 我在 Windows 窗体项目中的控件上使用这些资源 Visual Studio 资源选择对话框没有很好地支持从资源文件中选择图像 除非它们位于特定位置 但您可以直接编辑设计器文件 这工作得很好 应
  • 如何使用 git-bundle 保持开发同步?

    我需要使我的开发树在不同的计算机上保持同步 并且它们之间没有网络连接 我们有一个中央 git 存储库 我通常在我的办公室计算机上使用我自己的克隆 有时我需要在另一台从未连接到办公网络的计算机上进行一些开发 没有一台计算机曾经连接到互联网 可
  • 如何直接更新像素——使用CGImage和直接CGDataProvider

    实际问题 有几个答案可以解决我的问题 我可以强制CGImage从直接数据提供者 使用创建的CGDataProviderCreateDirect like CGContextDrawImage做 或者有其他方法可以设置 self layer
  • 以编程方式在 WPF 中创建网格作为模板

    我想以编程方式创建一个具有样式的基本用户控件 在这种风格中我想添加一个Grid 没问题 但我无法向该网格添加列定义 我的示例代码是 ControlTemplate templ new ControlTemplate FrameworkEle
  • 如何启用 NestJs swagger 4.x 插件

    如何使用新的 swagger 插件 我的编译器选项中有它 compilerOptions plugins nestjs swagger plugin 我正在运行该应用程序nest start如上所述 https docs nestjs co
  • androidx_security_crypto_encrypted_prefs_key_keyset不存在

    我正在尝试使用更安全的方式编辑用户的共享首选项集EncryptedSharedPreferences private fun provideSecureSharedPreference context Context SharedPrefe
  • 如何更改内存地址中存储的值?

    可以说内存地址0A7F03E4存储一个值124 如何将其更改为300使用Python 是否有针对此类任务的模块 gt gt gt import ctypes gt gt gt memfield ctypes c char from addr
  • WooCommerce:在订单中保存对产品的更改时挂钩

    我已经找了好几个小时了 在编辑现有订单中的产品数量后 我无法弄清楚如何在单击 保存 时执行功能 我试过这个 add action woocommerce order edit product your function name funct
  • 使用 requireJs 的模块中的循环依赖关系

    阅读 requireJs 文档 为了修复循环依赖 建议使用exports为该模块创建一个空对象 该对象可立即供其他模块引用 我尝试了这段代码 但它似乎不起作用 怎么了 P S 阅读评论以查看输出 特别是 setTimeout 调用中的 B
  • 如何在我的网络服务器上检测移动客户端?

    当http请求到达我的服务器时 我如何检测它是否来自iphone android或其他设备 您可以获取用户代理 这表明它是什么浏览器类型 iphone chrome 即任何浏览器 来帮你 http whatsmyuseragent com
  • 使用ggplot2的stat_密度_2d仅显示高密度区域

    我想将 stat 密度2D 函数与分类变量一起使用 但将绘图限制在高密度区域 以减少重叠并提高易读性 我们以以下数据为例 plot data lt data frame X c rnorm 300 3 2 5 rnorm 150 7 2 Y
  • C++:使用向量/数组优化速度?

    我有一个嵌套的 for 循环结构 现在我在每次迭代开始时重新声明向量 void function n1 n2 bound etc for int i 0 i
  • 将 InputStream 读入 Data 对象

    在 Swift 3 x 中 我们通常使用以下方式处理二进制数据Data 从它你可以生成大多数其他重要的类型 并且它上面有有用的函数 但我如何创建一个Data从一个InputStream 有什么好的办法吗 我找不到好的方法 我们可以围绕不安全
  • 什么编程语言最像英语? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心以获得指导 我主要是一名Python程
  • Bootstrap 4 与远程 Modal

    我无法使用新的 Twitter Bootstrap 版本 Bootstrap 4 alpha 让 Modal 在远程模式下工作 它与 Bootstrap 3 完美配合 使用 Bootstrap 4 我收到弹出窗口 但模型主体未加载 没有进行
  • 实施 Azure 共置缓存

    使用 VS2012 我从 WebRole Properties Caching Tab 添加了缓存功能 其中 它在 web config 中生成了以下 XML
  • 复合视图模型对象导致远程验证失败

    我在这个项目中的一些表单中使用了复合视图模型的模式 它运作得很好 在本例中 我有一个 VendorAddress 视图模型 我在这个项目的几个地方使用了 Address es 因此我制作了可以重复使用的 Address 视图模型对象 然后我
  • Airflow:为每个文件运行 DAG 的正确方法

    我有以下任务需要解决 文件通过端点不定期发送并存储在本地 我需要为每个文件触发 DAG 运行 对于每个文件 将执行相同的任务 总体流程如下 对于每个文件 运行任务 A gt B gt C gt D 正在批量处理文件 虽然这项任务对我来说似乎