为每个文件运行气流 DAG

2024-04-18

所以我在airflow中有一个非常好的DAG,它基本上在二进制文件上运行几个分析步骤(作为airflow插件实现)。 DAG 由 ftp 传感器触发,该传感器仅检查 ftp 服务器上是否有新文件,然后启动整个工作流程。

所以目前的工作流程是这样的:DAG 按照定义触发 -> 传感器等待 ftp 上的新文件 -> 执行分析步骤 -> 工作流程结束。

我想要的是这样的:DAG 是触发器 -> 传感器等待 ftp 上的新文件 -> 对于 ftp 上的每个文件,分析步骤单独执行 -> 每个工作流程单独结束。

如何让分析工作流程对 ftp 服务器上的每个文件执行,如果服务器上没有文件,则只有一个传感器应该等待新文件? 例如,我不想每秒启动一个 DAG,因为那时我有许多传感器正在等待新文件。


使用 2 个 DAG 将传感步骤与分析步骤分开。

DAG 1:

传感器等待 ftp 上的新文件 -> 一旦新文件到达,使用 TriggerDagRunOperator 触发 DAG 1 本身 -> 使用 TriggerDagRunOperator 触发 DAG 2

DAG 2:

对文件进行分析步骤

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为每个文件运行气流 DAG 的相关文章

  • Python OpenCV:检测大体运动方向?

    我仍在编写一个书籍扫描脚本 现在 我所需要的只是能够自动检测翻页 这本书占据了 90 的屏幕 我使用一个粗糙的网络摄像头进行运动检测 所以当我翻页时 运动方向基本上是同一个方向 我修改了一个运动跟踪脚本 但导数却无济于事 usr bin e
  • 使用 Python 和 Boto3 列出 S3 存储桶的目录内容?

    我正在尝试使用 Python 和 Boto3 列出 S3 存储桶中的所有目录 我正在使用以下代码 s3 session resource s3 I already have a boto3 Session object bucket nam
  • 在python中乘以多维数组

    我在 3d 数组中存储了许多 2d 数组 我需要将每个数组与一个向量相乘 所以我将所有这些向量存储在一个二维数组中 就像这样 A np random random L M N B np random random L M 我需要将每个 A
  • Scipy map_coordinates 双线性插值与 interp 和 IDL 插值比较

    我正在将同事的 IDL 代码重写为 python 并发现了一些我感到困惑的差异 根据我发现的其他问题和邮件列表线程 如果您使用scipy ndimage interpolation map coordinates并指定order 1它应该进
  • 在 Idle shell 中导入模块

    我正在尝试学习 python 但在导入模块时遇到问题 我有一个 pyc 文件 我正在尝试将其导入到名为 dfa pyc 的空闲 shell 中 我将该文件放在名为 xyz 的文件夹中 我使用以下命令导航到该文件夹 os chdir User
  • Excel 工作表到 Numpy 数组

    我正在尝试做一件令人难以置信的简单事情 将 Excel 工作表的部分内容加载到 Numpy 数组中 我发现了一个有用的拼凑 但它令人尴尬地不Pythonic 假设我的工作表被加载为 ws 代码 A np zeros 37 3 for i i
  • Python @property 与方法性能 - 使用哪一个?

    我编写了一些使用对象属性的代码 class Foo def init self self bar baz myFoo Foo print myFoo bar 现在我想做一些花哨的计算来返回bar 我可以用 property使方法充当属性ba
  • 从多个 csv 文件中提取行和文件名

    我的文件夹中有多个以日期为文件名的 csv 文件 20080101 csv 到 20111031 csv csv 文件具有共同的标题 csv 文件如下所示 20080101 csv X Y Z 1 1 3 1 2 6 1 3 24 2 1
  • 使用 kdeplot 对数刻度

    我正在尝试使用 Seaborn 的 kdeplot 制作一个漂亮的自由能表面 热图 我非常接近 但无法找到改变颜色条比例的方法 颜色条比例很重要 因为它应该表示地图上不同坐标处的能量差异 我需要知道如何缩放颜色条的值 0 5961573 l
  • 将多嵌套 dict/json 加载到 pandas 中

    我正在尝试加载一个非常令人困惑的多重嵌套JSON变成熊猫 我已经在使用了json 规范化 http pandas pydata org pandas docs stable reference api pandas io json json
  • 如何将Python列表分成不等长的子列表?

    我试图将用逗号分隔的元素列表划分为长度不等的块 我该如何划分它 list1 1 2 1 list2 1 1 1 1 1 1 1 2 1 1 1 3 1 1 1 4 list1 包含的元素是我希望将 list2 分成的块的大小 你可以结合以下
  • 在 Pandas 中计算滚动回归并存储斜率

    我有一些时间序列数据 我想计算 Pandas 中最后 n 天的分组滚动回归 并将该回归的斜率存储在新列中 我搜索了较旧的问题 它们要么没有得到解答 要么使用了 Pandas OLS 我听说它已被弃用 我想我可能可以使用df rolling
  • 定制 odoo 中的会计和财务模块?

    我正在研究会计和财务模块 我想做一些修改 例如隐藏字段和隐藏税收图表 有人能帮我吗 请告诉我隐藏左侧菜单项 税表 的程序 我也想知道view id隐藏发票表中的税费和底部税费 更新 请让我知道隐藏它们的外部 ID 我无法找到它们 因为它们链
  • python中通过命令查找进程

    在我的 Python 脚本中 我想检查是否otherscript py目前正在 Linux 系统上运行 这psutil http psutil readthedocs io en latest 图书馆看起来是一个很好的解决方案 import
  • 每个 start_url 已抓取多少个项目

    我使用 scrapy 抓取 1000 个 url 并将抓取的项目存储在 mongodb 中 我想知道每个网址找到了多少个项目 从 scrapy 统计数据我可以看到 item scraped count 3500但是 我需要分别对每个 sta
  • Pandas - 使用其他列值作为列名选择列

    我有一个包含一列的数据框 我们称之为 名称 names 具有其他列的名称 我想添加一个新列 该列的每一行都有基于 名称 列中包含的列名称的值 Example 输入数据框 pd DataFrame from dict a 1 2 3 4 b
  • SymPy 无法对产品进行羔羊化

    我正在使用 SymPy 1 0 和 Python 2 7 我想计算前 100 个整数的总和 此代码运行成功 import sympy as sy from sympy tensor import IndexedBase Idx import
  • 使用 OpenCV 进行车牌识别

    我有一个项目 需要使用 OpenCV 识别汽车的车牌 我想加载数字或字母的图像 让 OpenCV 识别它并将其打印到控制台 有一个函数可以做到这一点吗 如果没有 我该怎么办 Note 我正在研究灰度级 请帮忙 我必须在一周后完成 谢谢你的快
  • Pandas ImportError:绘图需要 matplotlib

    Pandas 无法识别已安装的 matplotlib 库 这是代码 import pandas as pd import numpy as np import matplotlib pyplot as plt matplotlib inli
  • 是否可以使用 numpy 中可用的函数将二维数组修补为子数组数组?

    是否可以使用 np reshape 和 np split 函数将二维数组修补为子数组数组 import numpy as np data np arange 24 reshape 4 6 print data 0 1 2 3 4 5 6 7

随机推荐

  • 只为 IE 设置 CSS?

    我有一个简单的div具有 2px 厚的边框和绝对定位 直到其父元素悬停在其上方时才会隐藏 由于IE盒子模型 所说的位置div在 IE 中有些关闭 但在其他浏览器中则不然 我不想为 IE 添加完全独立的样式表 我只想在浏览者使用 IE 时修改
  • 如果您不打算从自适应渲染中受益,那么使用 HtmlTextWriter 有什么好处吗?

    除了从替代设备的自适应渲染中受益之外 编写所有这些代码是否有意义 writer WriteBeginTag table writer WriteBeginTag tr writer WriteBeginTag td writer Write
  • 将位串 numpy 数组转换为以 2 为基数的整数的最快方法

    我有一个由位串组成的 numpy 数组 我打算将位串转换为以 2 为基数的整数 以便执行一些异或按位运算 我可以在 python 中将字符串转换为以 2 为基数的整数 int 000011000 2 我想知道在 numpy 中是否有更快更好
  • 将两个 Map 合并为一个 MultiMap

    在 Java 中将两个 Map 组合成一个 Guava MultiMap 的最佳方法是什么 例如 Map1 包含 1 a 和 2 b Map2 包含 2 c 和 3 d 然后生成的组合多重贴图将包含 1 a 2 b c 和 3 d 这是我当
  • 如何在 Angular 中处理空值

    我正在向 Angular 发送数据 但记录的其中一个值 关闭日期 为空 我如何在 Angular 中处理这个问题 以便如果 value 为 null 它会更改为 getDetails this myService getFlowerDeta
  • Android NumberPicker with Formatter 在首次渲染时不会格式化

    我有一个 NumberPicker 它有一个格式化程序 可以在 NumberPicker 旋转或手动输入值时格式化显示的数字 这工作正常 但是当 NumberPicker 首次显示并且我用以下命令初始化它时setValue 0 0 不会被格
  • GCP Dataproc 作业未找到存储在存储桶中的 SSL pem 证书

    我有一个 GCP Dataproc 集群 我正在尝试部署一个 pyspark 作业 该作业使用 SSL 生成一个主题 pem 文件存储在存储桶 gs dataproc kafka code code 中 我正在使用下面所示的代码访问 pem
  • 使用日期的 MongoDB 聚合 $unwind $match - 我错过了什么?

    我是 MongoDB 的新手 我正在尝试使用聚合 我部分地做了我正在寻找的事情 但我对约会有一种奇怪的行为 MongoDB 信息 版本 2 2 0 操作系统 Windows 7 客观的 获取 2012 11 22 之后创建的所有评论 让我们
  • 移动应用程序的后台数据同步

    我们正在数据库之上构建 API 和移动应用程序 该数据库的主表中有数十万条记录 我们的移动开发人员正在努力推动应用程序在本地数据库中预加载完整的表 然后提供一项服务 让手机可以将更改与数据库同步updated at column 虽然这肯定
  • 静态局部变量和静态全局变量有什么区别?

    C 入门 说 每个局部静态变量在第一次之前都会被初始化 执行通过对象的定义 本地静态数据是 函数结束时不会被销毁 当程序运行时它们被销毁 终止 局部静态变量与全局静态变量有什么不同吗 除了申报地点不同之外 还有什么不同呢 void foo
  • 如果通过 setImageURI 设置一次,我如何更新壁画 SimpleDraweeView 的图像

    我正在使用 Facebook Fresco 库和 SimpleDraweeView 来显示文件中的头像图像 Uri avaURI Uri fromFile new File getCacheDir Constants AVA FILE NA
  • AWS cli windows - 在将路径添加到环境变量后仍然收到“'aws'无法识别...”?

    找到以下说明here https stackoverflow com questions 42099986 setting up the path so aws cli works properly 和其他地方 我将 USERPROFILE
  • ORA-12170: TNS: 发生连接超时

    我尝试使用 Oracle Toad 连接到笔记本电脑中的数据库 但仍然出现此错误 ORA 12170 TNS 发生连接超时 我不断出现此错误的可能原因是什么 我昨天访问了同一个数据库并且能够访问它 收集评论里的答案 问题是Oracle服务运
  • R/RStudio、Yosemite 和 Java

    我昨天升级到 OSX Yosemite 当我跑步时library xlsx in RStudio 程序崩溃 我收到消息 要打开 RStudio 您需要安装旧版 Java SE 6 运行时 我得到相同的结果和关于打开的消息R当我跑步时libr
  • jQuery load() 函数不起作用

    我是 JavaScript 世界的新手 我正在尝试使用函数 load 插入另一个 html 文件 解释起来有点困难 代码如下 你能帮助我吗 我没有使用网络服务器 谢谢 为什么它不起作用 浏览器安全限制可能会阻止您使用 A
  • 无法消除我的子图像和父 div 之间奇怪的 2px 空间

    我已经尝试了我所知道的一切 我开始认为它是由 Tumblrs javascript 创建的 首先 我确实有内联块 我知道它们读取空白并添加像素 但经过多次测试 剥离了我所有的 javascript 并删除了所有内容 只留下了一个标准 div
  • 在 WPF 中将项目添加到组合框

    当我向 WPF 窗口添加组合框时 如何向组合框添加项目 Int 设计的 XAML 代码还是在 NameOfWindow xaml cs 文件中 情况 1 您没有数据源 您只需填充ComboBox静态值如下 来自 XAML
  • 在 Pycharm 中过滤记录器控制台输出的简单方法?

    我已经通过该程序命名了我的所有记录器 我想做的是能够通过 Pycharm 中的记录器名称过滤控制台日志输出 可以在不编辑日志配置文件的情况下完成此操作吗 您可以使用以下命令在 PyCharm 的控制台窗口中过滤输出格雷普控制台 https
  • 如何非递归地获取二叉树中叶节点的数量?

    我有一个练习问题被难住了 在不使用递归的情况下获取二叉树中叶节点的数量 我已经四处寻找一些想法 我已经看到了一些想法 例如将节点传递到堆栈 但我不知道当有多个分支时如何做到这一点 任何人都可以提供指针吗 NumberOfLeafNodes
  • 为每个文件运行气流 DAG

    所以我在airflow中有一个非常好的DAG 它基本上在二进制文件上运行几个分析步骤 作为airflow插件实现 DAG 由 ftp 传感器触发 该传感器仅检查 ftp 服务器上是否有新文件 然后启动整个工作流程 所以目前的工作流程是这样的