Airflow 2.x 中 DAG 导入错误的日志消息

2024-05-14

我正在本地运行 Apache Airflow 2.x,使用中提供的 Docker Compose 文件文档 https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html。在里面.\dags在我的本地文件系统(安装到 Airflow 容器中)的目录中,我创建一个新的 Python 脚本文件,并使用 TaskFlow API 实现 DAG。

我的 DAG 的更改有时无效。例如,也许我有一个ImportError由于模块名称无效或语法错误。当 Airflow 尝试导入 DAG 时,我无法从 Web 服务器、调度程序或工作线程中找到任何表明存在问题或具体问题是什么的日志消息。

相反,我必须逐行阅读代码,并查找问题。由于我在 Windows 10 上的本地 Python 环境和 Airflow 的 Python 环境是不同的版本并且安装了不同的 Python 包,因此这个问题变得更加复杂。因此,我无法可靠地使用本地开发环境来检测包导入失败,因为我期望在 Airflow 环境中安装的包与我本地的包不同。此外,我用于本地编写代码的 Python 版本与 Airflow 使用的 Python 版本不匹配。

因此,我需要某种错误日志记录来指示 DAG 导入失败。

Question:当 DAG 更新/导入失败时,日志在哪里可以指示是否发生导入失败以及确切的错误消息是什么?


目前,DAG解析日志将在$AIRFLOW_HOME/logs/EXECUTION_DATE/scheduler/DAG_FILE.py.log

Example:

假设我的 DAG 文件是example-dag.py其中包含以下内容,您可以注意到其中有一个拼写错误datetime import:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import dattime   # <-- This Line has typo  


dag = DAG(
    dag_id='example_Dag',
    schedule_interval=None,
    start_date=datetime(2019, 2, 6),
)

t1 = BashOperator(
    task_id='print_date1',
    bash_command='sleep $[ ( $RANDOM % 30 )  + 1 ]s',
    dag=dag)

现在,如果您检查下面的日志$AIRFLOW_HOME/logs/scheduler/2021-04-07/example-dag.py.log where $AIRFLOW_HOME/logs是我设置的$AIRFLOW__LOGGING__BASE_LOG_FOLDER or [logging] base_log_folder in airflow.cfg (https://airflow.apache.org/docs/apache-airflow/2.0.1/configurations-ref.html#base-log-folder https://airflow.apache.org/docs/apache-airflow/2.0.1/configurations-ref.html#base-log-folder)

该文件应包含如下日志:

[2021-04-07 21:39:02,222] {scheduler_job.py:182} INFO - Started process (PID=686) to work on /files/dags/example-dag.py
[2021-04-07 21:39:02,230] {scheduler_job.py:633} INFO - Processing file /files/dags/example-dag.py for tasks to queue
[2021-04-07 21:39:02,233] {logging_mixin.py:104} INFO - [2021-04-07 21:39:02,233] {dagbag.py:451} INFO - Filling up the DagBag from /files/dags/example-dag.py
[2021-04-07 21:39:02,368] {logging_mixin.py:104} INFO - [2021-04-07 21:39:02,357] {dagbag.py:308} ERROR - Failed to import: /files/dags/example-dag.py
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/dagbag.py", line 305, in _load_modules_from_file
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/files/dags/example-dag.py", line 3, in <module>
    from datetime import dattime
ImportError: cannot import name 'dattime'
[2021-04-07 21:39:02,380] {scheduler_job.py:645} WARNING - No viable dags retrieved from /files/dags/example-dag.py
[2021-04-07 21:39:02,407] {scheduler_job.py:190} INFO - Processing /files/dags/example-dag.py took 0.189 seconds

您将在 Web 服务器中看到如下错误:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Airflow 2.x 中 DAG 导入错误的日志消息 的相关文章

  • 气流日志文件不存在:

    Airflow 在几周内工作正常 但突然开始出现几天错误 Dags 会因此错误而随机失败 日志文件不存在 airflow path 1 log获取自 http 8793 airflow path 1 log 无法从工作人员获取日志文件 对
  • 如何自动重新安排气流任务

    我正在运行一个每小时的进程 从一个位置 源 获取数据并将其移动到另一个位置 目的地 在大多数情况下 数据在特定时间到达我的来源 一切正常 但可能会出现延迟 当发生这种情况时 气流中的任务会失败 需要手动重新运行 解决此问题的一种方法是为数据
  • 如何导入 2.2.5 版本的 Airflow 运算符?

    我刚刚将 Airflow 升级到 2 2 5 但无法使用 EmptyOperator 应该很简单from airflow operators empty import EmptyOperator但我得到了错误ModuleNotFoundEr
  • 如何在 Airflow 2.x 中将 XComArg 转换为字符串值?

    Code from airflow models import BaseOperator from airflow utils decorators import apply defaults from airflow providers
  • Airflow Worker 没有监听默认的 RabbitMQ 队列

    我已经使用rabbitmq代理配置了Airflow 服务 airflow worker airflow scheduler airflow webserver 正在运行 没有任何错误 调度程序正在推动任务执行default兔子MQ队列 即使
  • Airflow DAG动态结构

    我正在寻找一个可以决定 dag 结构的解决方案当 dag 被触发时因为我不确定我必须运行的操作员数量 请参阅下面我计划创建的执行顺序 Task B 1 Task C 1 Task B 2 Task C 2 Task A Task B 3 g
  • Airflow DAG 版本控制

    DAG 版本控制是一回事吗 我通过谷歌搜索找不到太多关于这个主题的信息 我想查看 Airflow 中的 DAG 屏幕 并确定 DAG 代码是什么 最简单的解决方案是将版本号作为dag id 但我很高兴知道是否有人有更好的替代解决方案 标签也
  • 气流动态 dag 创建

    有人请告诉我气流中的 DAG 是否只是一个图表 如占位符 没有任何与其关联的实际数据 如参数 或者 DAG 是否像一个实例 对于固定参数 我想要一个系统 其中要执行的操作集 给定一组参数 是固定的 但每次运行这组操作时 该输入都会不同 简单
  • 如何从 Python 脚本中触发气流 DAG 运行?

    使用 apache airflow 我创建了一些 DAGS 其中一些不按计划运行 我正在尝试找到一种方法 可以从 Python 脚本中触发特定 DAG 的运行 这可能吗 我能怎么做 编辑 python 脚本将从与我所有 DAGS 所在的项目
  • 删除 Airflow Scheduler 日志

    我正在使用 Docker Apache Airflow 版本 1 9 0 2 https github com puckel docker airflow https github com puckel docker airflow 调度程
  • 使用 Airflow 将 mysql 数据加载到 bigquery 的 dag 出现“无效参数传递”错误

    我运行一个 DAG 提取 MySQL 数据并将其加载到气流中的 BigQuery 我目前收到以下错误 usr local lib python2 7 dist packages airflow models py 1927 PendingD
  • 在 MWAA 中设置 PYTHONPATH

    我正在尝试在 MWAA 上的 dag 内使用本地模块 文件夹结构如下 init py dags init py my dag init py dag py utils init py file py secrets py date py 我
  • 从 Airflow Postgres 挂钩检索完整连接 URI

    有没有更简洁的方法从 Postgres 挂钩获取完整的 URI get uri 不包含 额外 参数 所以我像这样附加它们 def pg conn id to uri postgres conn id hook PostgresHook po
  • 还有一个“此 DAG 在网络服务器 DagBag 对象中不可用”

    这似乎是一个相当普遍的问题 我有一个 DAG 我不仅可以手动触发它airflow trigger dag 但它甚至按照其时间表执行 但拒绝显示在 UI 中 我已经多次重新启动网络服务器和调度程序 按 刷新 十亿次 然后运行它airflow
  • Airflow Worker - 连接中断:IncompleteRead(0 字节读取)

    使用 Airflow Worker 和 Web 服务器 调度程序作为在 EC2 上的 Kubernetes Engine 上运行的 Docker 映像 我们有一个任务KubernetesPodOperator这是资源密集型的 每 15 分钟
  • 气流中的execution_date:需要作为变量访问

    我真的是这个论坛的新手 但有一段时间 我一直在为我们公司玩气流 抱歉 如果这个问题听起来很愚蠢 我正在使用一堆 BashOperators 编写一个管道 基本上 对于每个任务 我想简单地使用 curl 调用 REST api 这就是我的管道
  • 我怎样才能得到dag中的execution_date?运算符的外部?

    我怎样才能获得execution date参数在 dag 之外 execution min execution date strftime M if execution min 00 logging info YES It s 00 fin
  • 当气流 initdb 时,导入错误:无法导入名称 HiveOperator

    我最近安装了airflow对于我的工作流程 在创建项目时 我执行了以下命令 airflow initdb 返回以下错误 2016 08 15 11 17 00 314 init py 36 INFO Using executor Seque
  • Amazon MWAA Airflow - 任务容器在没有日志的情况下关闭/停止/终止

    我们使用 Amazon MWAA Airflow 很少有任务标记为 FAILED 但根本没有日志 就好像容器在我们没有注意到的情况下被关闭了一样 我找到了这个链接 https cloud google com composer docs h
  • Airflow 1.9 - 无法将日志写入 s3

    我在 aws 的 kubernetes 中运行气流 1 9 我希望将日志发送到 s3 因为气流容器本身的寿命并不长 我已经阅读了描述该过程的各种线程和文档 但我仍然无法让它工作 首先是一个测试 向我证明 s3 配置和权限是有效的 这是在我们

随机推荐