有什么方法可以监控 Airflow DAG 的执行时间吗?

2024-02-16

我想将 Airflow 与 Statsd 和 DataDog 一起使用来监控 DAG 是否需要例如是之前执行的两倍。所以,我需要某种用于 DAG 的实时计时器(或者operator).

我知道 Airflow 支持一些指标 https://airflow.apache.org/docs/stable/metrics.html。 然而,据我了解,所有指标都与已完成的任务/DAG 相关,对吧?所以,这不是解决方案,因为我想监视正在运行的 DAG。

我也考虑过超时执行 https://airflow.apache.org/docs/stable/_api/airflow/operators/index.html#airflow.operators.BaseOperator/SLA https://airflow.apache.org/docs/stable/concepts.html#slas功能,但它们不适合此用例

我希望收到一些 DAG 挂起的通知,但我不想杀死它。


您可以通过多种不同的方式来处理此问题:

  • 过去,我配置了一个遥测 DAG,它将通过查询元数据表来收集所有任务/DAG 的当前状态。我会收集这些指标并将其推送到 CloudWatch。由于这些内部字段经常发生变化,这成为了问题,因此我们在尝试升级到较新版本的 Airflow 时会遇到问题。
  • 还有一些保养得好的普罗米修斯出口商 https://github.com/search?q=airflow+prometheus一些公司已经开源了。通过设置这些,您可以根据需要频繁地轮询公开的导出路径(DataDog支持普罗米修斯 https://docs.datadoghq.com/integrations/prometheus/).

这些只是您的一些选择。由于 Airflow Web 服务器只是一个 Flask 应用程序,因此您可以真正以您认为合适的任何方式公开指标。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

有什么方法可以监控 Airflow DAG 的执行时间吗? 的相关文章

  • 监控 Android 手机中的网络活动

    我想监控我的 Android 手机的网络流量 我正在考虑在 Android 上使用 tcpdump 但我不确定是否必须为手机进行交叉编译 另一个问题是 如果我想监控某个应用程序的流量数据 有什么命令可以做到这一点吗 TCPDUMP 是我最喜
  • 如何在 Airflow 中使用 HashiCorp Vault?

    我开始使用 Apache Airflow 我想知道如何有效地使其使用存储在 Vault 中的秘密和密码 不幸的是 搜索不会返回超出范围的有意义的答案Airflow 中尚未实现的钩子 https issues apache org jira
  • 更改 AirFlow 中 Lambda 调用的“读取超时”

    我有一个 lambda 始终需要超过 1 分钟才能完成执行 这是默认的问题LambdaInvokeFunctionOperator 因为默认情况下 它的钩子会创建一个Boto3与默认连接读取超时60 秒 意味着 60 秒后 如果 Lambd
  • 还有一个“此 DAG 在网络服务器 DagBag 对象中不可用”

    这似乎是一个相当普遍的问题 我有一个 DAG 我不仅可以手动触发它airflow trigger dag 但它甚至按照其时间表执行 但拒绝显示在 UI 中 我已经多次重新启动网络服务器和调度程序 按 刷新 十亿次 然后运行它airflow
  • 没有这样的文件或目录 /airflow/xcom/return.json

    创建了一个图像包含 airflow xcom return json在所有子目录上使用 chmod x 由于日志显示找不到文件或目录 尝试过 chmod x strtpodbefore KubernetesPodOperator names
  • 如果过去 60 分钟的交易量小于 x,如何在 Seyren with Graphite 中发出警报?

    我正在使用 Graphite Statsd 带有 Python 客户端 从 Web 应用程序收集自定义指标 成功交易的计数器 假设计数器是stats transactions count 还有一个每秒速率指标stats transactio
  • 如何在 Linux 中获取 Apache 的“每秒请求数”?

    在 Windows for ASP 中 您可以获得 perfmon 但是 如何获得 每秒请求数 Linux 下的 Apache 这是我编写的一个简短的 bash 脚本 用于对请求率进行采样 基于迪克塞的建议 https stackoverf
  • 如何在 Google Composer 上重新启动气流服务器?

    当我需要在本地重新启动网络服务器时 我会这样做 ps ef grep airflow awk print 2 xargs kill 9 airflow webserver p 8080 D 我如何在 Google Composer 上执行此
  • 为每个文件运行气流 DAG

    所以我在airflow中有一个非常好的DAG 它基本上在二进制文件上运行几个分析步骤 作为airflow插件实现 DAG 由 ftp 传感器触发 该传感器仅检查 ftp 服务器上是否有新文件 然后启动整个工作流程 所以目前的工作流程是这样的
  • 在尝试找到最长路径的同时消除有向无环图中的无关边

    我问了一个question https stackoverflow com q 8685598 35690关于在可变数量的集合中查找没有重复字符的子序列 解决方案是创建每对字母的矩阵 丢弃每组中未出现的字母 然后找到最长路径 http en
  • 气流:Dag 每隔几秒安排两次

    我尝试每天仅运行一次 DAG00 15 00 午夜 15 分钟 然而 它被安排了两次 间隔几秒钟 dag DAG my dag default args default args start date airflow utils dates
  • 气流,在 dag 运行之前标记任务成功或跳过它

    我们有一个巨大的 DAG 其中有许多小而快速的任务和一些大而耗时的任务 我们只想运行 DAG 的一部分 我们发现最简单的方法是不添加我们不想运行的任务 问题是我们的 DAG 有很多相互依赖关系 因此当我们想要跳过某些任务时 不破坏 DAG
  • BigQuery with Airflow - 缺少projectId

    尝试下面的例子 https cloud google com blog big data 2017 07 how to aggregate data for bigquery using apache airflow https cloud
  • Spark流吞吐量监控

    有没有办法监控 Spark 集群的输入和输出吞吐量 以确保集群不会被传入数据淹没和溢出 就我而言 我在 AWS EC2 上设置了 Spark 集群 所以我正在考虑使用AWS 云观察来监控网络输入 and 网络输出对于集群中的每个节点 但我的
  • 气流 - 未知的蓝色任务状态

    我刚刚收到一个蓝色任务 该任务没有出现在状态图例中 我很好奇这是一个错误还是未记录的状态 正如您所看到的 蓝色没有显示在右侧的潜在状态列表中 我刚刚完成了所有过去 未来和上游尝试的清理 仅供参考 这是一个已知的 TaskInstance 状
  • Amazon MWAA Airflow - 任务容器在没有日志的情况下关闭/停止/终止

    我们使用 Amazon MWAA Airflow 很少有任务标记为 FAILED 但根本没有日志 就好像容器在我们没有注意到的情况下被关闭了一样 我找到了这个链接 https cloud google com composer docs h
  • Airflow 默认连接数过多

    我打开气流并检查连接 发现其后面运行的连接太多 关于如何杀死那些我不使用的任何想法 或者我很想知道运行它的最小 conn id 建筑学 LocalExecutor 与其他经纪人不同 Postgres 作为元数据库 但它列出了 17 个连接
  • 我可以在 Airflow 中的一个 DAG 下执行不同开始日期的任务吗?

    我有一个运行两个任务的 DAG A and B 而不是指定start date在 DAG 级别上 我已将其作为属性添加到运算符 我正在使用PythonOperator在本例中 并将其从 DAG 字典中删除 这两个任务每天都会运行 The s
  • 带子任务的 Airflow 并行任务

    我需要在 Apache Airflow 上运行以下图表 但我遇到了并行步骤的问题 因为它们有多个子步骤 gt task 1a gt tast 1b gt task 4a gt tast 4b Start gt task 2a gt tast
  • 在单元测试中运行 Airflow 1.9 的测试 Dag

    我已经实现了运行单个 dag 的测试用例 但它似乎在 1 9 中不起作用 可能是由于气流 1 8 中引入了更严格的池 我正在尝试运行以下测试用例 from airflow import DAG from airflow operators

随机推荐

  • 通过关联 update_all

    我正在尝试通过关联使用 update all 并且收到 mysql 错误 有人知道为什么吗 class Basket lt ActiveRecord Base has many basket items has many articles
  • 有没有实现按键删除并同时获取值? [复制]

    这个问题在这里已经有答案了 我正在做一个性能关键的程序 一些学术性的东西 我希望尽可能地优化 不像它证明的 这是 瓶颈 我有一个自定义字典结构 NET 的包装器Dictionary lt gt 并且我会在一个阶段不断地删除项目 通过Key价
  • 如何使用按钮触发回调更新?

    我刚刚开始使用破折号 举个例子here https plot ly dash getting started part 2 interactivity 我想转换下面的破折号应用程序 import dash from dash depende
  • 批处理文件变量范围问题

    当尝试创建 dos Windows 7 命令行 批处理文件时 我遇到了一个奇怪的变量范围问题 该文件执行一些字符串操作来创建新的文件路径 谁能明白为什么在下面的示例中 OUTPUT FILENAME 变量总是最终为空 echo Enter
  • Java 8 元空间与堆使用

    我有这段代码可以动态生成类并加载它 import javassist CannotCompileException import javassist ClassPool public class PermGenLeak private st
  • 测试覆盖率 React,伊斯坦布尔 -_registerComponent(...):目标容器不是 DOM 元素

    我正在使用 React Redux Webpack 编写一个应用程序 我正在使用 karma mocha 构建测试 并希望使用 istanbul 进行测试覆盖 为了使覆盖范围与业力覆盖范围一起工作 我设置了以下内容karma config
  • 是否可以根据批次标签(y_true)分布更新每个批次的学习率?

    编辑 请参阅此问题的结尾以获取解决方案 TL DR 我需要找到一种方法来计算每批次的标签分布 并更新学习率 有没有办法访问当前模型的优化器来更新每批的learning rate 下面是如何计算标签分布 它可以在损失函数中完成 因为默认情况下
  • Spark 中简单的 RDD 写入 DynamoDB

    刚刚在尝试将基本 RDD 数据集导入 DynamoDB 时陷入困境 这是代码 import org apache hadoop mapred JobConf var rdd sc parallelize Array Map col1 gt
  • 有没有办法即使在 Chrome 或 Firefox 关闭时也显示桌面通知?

    我们正在开发一个使用 GCM 向最终用户发送推送通知的网站 我们已经了解了 Service Worker 等所有内容 我们用这个开发了一个原型codelab https developers google com web fundament
  • 即使用户注销后也保持节点处于运行状态

    即使用户注销 如何保持节点应用程序在 Windows 中运行 即使用户注销后 如何继续运行节点http服务器 您有 2 个不错的选择 一种是上面评论中提到的Forever https www npmjs com package foreve
  • 将文件流式传输到 S3“错误:流意外结束”

    Update 我相信这could是因为我使用的是express提供的body解析器 这是否会扰乱多方试图解析的流 我的解决方案基于这个答案 https stackoverflow com a 15830910 971592 我正在尝试做的事
  • jqgrid如何在同一列显示多个值

    我想知道如何在 jqGrid 的单列中显示多个值 这是我当前网格定义的示例 grid1 jqGrid url Default aspx getGridData datatype json colModel contains the inpu
  • np 数组之间的欧氏距离

    我有两个 numpy 数组 a 和 b a 和 b 的尺寸相同 a 的尺寸可以与 b 的尺寸不同 例如 a 1 2 5 7 b 3 8 4 7 9 15 有没有一种简单的方法来计算 a 和 b 之间的欧几里得距离 以便这个新数组可以在 k
  • JavaScript 中的应用函数

    我正在学习 JavaScript 目前正在尝试找出原因 在蜘蛛猴 https developer mozilla org en SpiderMonkey concat apply 1 2 返回预期的 1 2 but Array concat
  • 如何使用Torch生成的模型进行预测?

    我已经执行了神经网络 tutorial lua https github com nicholas leonard dp blob master examples neuralnetwork tutorial lua 现在我有了模型 我想用
  • 尝试...否则...除了语法错误

    我无法理解这个 无法运行此代码 我不知道为什么它是语法错误 try newT read existingArtist newT Exif Image Artist value existingKeywords newT Xmp dc sub
  • Android:AsyncTask 或带有 ExecutorService 的普通 Java 线程 [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我的应用程序使用Service做一些背景工作 我正在服务中使用额外的线程来进行一些计算 为此 我每 5 到 10 秒创建两个线程 运行 5 到
  • .NET混淆工具/策略[关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我的产品有几个组件 ASP NET Windows Forms App 和 Windows Service 大约 95 的代码是用 VB NET
  • firebase云函数ServerValue增量不起作用

    我有一个 firebase 功能 我想在这里增加ServerValue 但它不起作用并给出错误 我哪里做错了 我在下面附上控制台屏幕截图 注 我不太了解javascript java代码也可能有错误 const functions requ
  • 有什么方法可以监控 Airflow DAG 的执行时间吗?

    我想将 Airflow 与 Statsd 和 DataDog 一起使用来监控 DAG 是否需要例如是之前执行的两倍 所以 我需要某种用于 DAG 的实时计时器 或者operator 我知道 Airflow 支持一些指标 https airf