Airflow apply_defaults 装饰器报告 参数是必需的

2023-12-10

我最近遇到了这个令人讨厌的错误,其中 Airflowapply_defaults 装饰者正在抛出以下堆栈跟踪 (my **kwargs确实包含job_flow_id)

File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/mysql_import/dags/mysql_import_dag.py", line 23, in <module>
    sync_dag_builder.build_sync_dag()
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/mysql_import/dags/builders/sync_dag_builders/emr_sync_dag_builder.py", line 26, in build_sync_dag
    create_emr_task, terminate_emr_task = self._create_job_flow_tasks()
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/mysql_import/dags/builders/sync_dag_builders/emr_sync_dag_builder.py", line 44, in _create_job_flow_tasks
    task_id=GlobalConstants.EMR_TERMINATE_STEP)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/aws/operators/emr_terminate_ancestor_job_flows_operator.py", line 31, in __init__
    EmrTerminateJobFlowOperator.__init__(self, *args, **kwargs)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/contrib/operators/emr_terminate_job_flow_operator.py", line 44, in __init__
    super(EmrTerminateJobFlowOperator, self).__init__(*args, **kwargs)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/utils/decorators.py", line 94, in wrapper
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Argument ['job_flow_id'] is required

令人不安的部分是

  • 异常目前源自__init__ of the built-in EmrTerminateJobFlowOperator
  • 早些时候它来自EmrCreateJobFlowOperator,尽管这并不需要job_flow_id参数;但它已经消失了

调查decorators.py,我觉得sig_cache可能会搞乱一些事情。事实上,从提交引入的它,我不知道如何函数签名缓存正在工作(至少它不工作this way)?


我尝试过全部删除__pycache__并重新启动scheduler, webserver没有运气(我正在单独运行它们Linux screens)

  • 什么可能导致错误?
  • 如何sig_cache是否有效,在任何情况下都需要强制清除吗?如果有,如何清除?

环境

  • Python 3.6.6
  • Airflow 1.10.2
  • LocalExecutor

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Airflow apply_defaults 装饰器报告 参数是必需的 的相关文章

  • 气流中任务的粒度

    对于一项任务 有许多辅助任务 从文件 数据库获取 保存属性 验证 审核 这些辅助方法并不耗时 一个样本 DAG 流 fetch data gt gt actual processing gt gt validation gt gt save
  • 如何导入 2.2.5 版本的 Airflow 运算符?

    我刚刚将 Airflow 升级到 2 2 5 但无法使用 EmptyOperator 应该很简单from airflow operators empty import EmptyOperator但我得到了错误ModuleNotFoundEr
  • Docker 上的 Airflow - 路径问题

    使用气流我尝试简单的 DAG 工作 我编写了自定义运算符和其他文件 我想将它们导入到 DAG 逻辑所在的主文件中 这里是文件夹的结构 airflow cfg dags init py dag py sql statements sql do
  • Airflow BigQueryOperator:如何将查询结果保存在分区表中?

    我有一个简单的 DAG from airflow import DAG from airflow contrib operators bigquery operator import BigQueryOperator with DAG da
  • 清除后气流强制重新运行上游任务,即使下游任务标记为成功

    我在 Airflow 中有任务 A gt B gt C 当我运行 DAG 并全部成功完成时 我希望能够单独清除 B 同时将 C 标记为成功 B 清除并进入 no status 状态 但当我尝试重新运行 B 时 什么也没有发生 我尝试过 ig
  • Airflow - 分支连接运算符

    我正在尝试加入 Airflow 中的分支操作员 我这样做了 op1 gt gt op2 op3 op4 op2 gt gt op5 op3 gt gt op6 op4 gt gt op7 op5 op6 op7 gt gt op8 它给出了
  • Airflow - 跳过未来的任务实例而不更改 dag 文件

    我有一个 DAG abc 计划在每天上午 7 点 美国中部标准时间 运行 并且该 DAG 中有任务 xyz 由于某种原因 我不想为明天的实例运行任务 xyz 之一 如何跳过该特定任务实例 我不想对代码进行任何更改 因为我无权访问 Prod
  • 如何在 Apache Airflow 中正确处理夏令时?

    在气流中 一切都应该是 UTC 不受 DST 影响 但是 我们的工作流程可以根据受 DST 影响的时区交付内容 一个示例场景 我们安排了一项作业 开始日期为东部时间上午 8 00 计划间隔为 24 小时 每天东部时间上午 8 点 调度程序会
  • 使用 Airflow 将 mysql 数据加载到 bigquery 的 dag 出现“无效参数传递”错误

    我运行一个 DAG 提取 MySQL 数据并将其加载到气流中的 BigQuery 我目前收到以下错误 usr local lib python2 7 dist packages airflow models py 1927 PendingD
  • 如何在 Airflow 中使用 HashiCorp Vault?

    我开始使用 Apache Airflow 我想知道如何有效地使其使用存储在 Vault 中的秘密和密码 不幸的是 搜索不会返回超出范围的有意义的答案Airflow 中尚未实现的钩子 https issues apache org jira
  • Airflow - 处理 DAG 回调的正确方法

    我有一个DAG然后每当它成功或失败时 我希望它触发一个发布到 Slack 的方法 My DAG args就像下面这样 default args on failure callback slack slack message sad mess
  • 在 Airflow 中编写和导入自定义插件

    这实际上是两个问题合二为一 My AIRFLOW HOME结构如下 airflow dags plugins init py hooks init py my hook py another hook py operators init p
  • 如何在 Google Composer 上重新启动气流服务器?

    当我需要在本地重新启动网络服务器时 我会这样做 ps ef grep airflow awk print 2 xargs kill 9 airflow webserver p 8080 D 我如何在 Google Composer 上执行此
  • 为每个文件运行气流 DAG

    所以我在airflow中有一个非常好的DAG 它基本上在二进制文件上运行几个分析步骤 作为airflow插件实现 DAG 由 ftp 传感器触发 该传感器仅检查 ftp 服务器上是否有新文件 然后启动整个工作流程 所以目前的工作流程是这样的
  • 我怎样才能得到dag中的execution_date?运算符的外部?

    我怎样才能获得execution date参数在 dag 之外 execution min execution date strftime M if execution min 00 logging info YES It s 00 fin
  • 如何记录 Airflow DAG 的输出以进行调试?

    我正在编写 Airflow DAG 但在函数方面遇到一些问题 我正在尝试通过将数据打印到标准输出并使用logging图书馆 我的示例 DAG 是 from datetime import timedelta import airflow i
  • airflow webserver 命令失败并显示 {filesystemcache.py:224} 错误 - 不允许操作

    我正在 Cent OS 7 上安装 Airflow 我已经配置了 Airflow db init 并检查了 nginx 服务器的状态及其工作正常 但是当我运行airflow webserver命令时 我收到下面提到的错误 2021 03 2
  • 使用不同间隔的任务运行 DAG

    我有 3 个任务 A B 和 C 我只想运行任务 A 一次 然后每月运行任务 B 直到 end date 然后仅运行任务 C 一次以进行清理 这与这个问题类似 但不适用 如何在气流中的单个 Dag 上处理不同的任务间隔 https stac
  • 气流获取重试次数

    在我的 Airflow DAG 中 我有一个任务需要知道它是第一次运行还是重试运行 如果是重试尝试 我需要调整任务中的逻辑 我对如何存储任务的重试次数有一些想法 但我不确定其中是否有合法的 或者是否有更简单的内置方法可以在任务中获取此信息
  • 气流 - 未知的蓝色任务状态

    我刚刚收到一个蓝色任务 该任务没有出现在状态图例中 我很好奇这是一个错误还是未记录的状态 正如您所看到的 蓝色没有显示在右侧的潜在状态列表中 我刚刚完成了所有过去 未来和上游尝试的清理 仅供参考 这是一个已知的 TaskInstance 状

随机推荐

  • perf中的时间戳是什么意思?

    我想使用 perf 来测量函数的实际执行时间 perf script 命令给出调用函数时的时间戳 Xorg 1523 001 25712 423702 probe sock write iter ffffffff95cd8b80 时间戳字段
  • 我在阅读文档上构建项目时遇到问题

    使用 sphinx quickstart 创建了 sphinx 项目 运行 make html 并且没有产生错误 将所有代码推送到 GitHub 我尝试在阅读文档上导入和构建项目 但出现以下错误 我在使用 Mac 时没有遇到任何问题 它在阅
  • Android MapView - 自定义缩放按钮

    你知道 是否有办法在 Android 地图视图中制作自定义缩放按钮 或者只有准备好的缩放控件 Thanks Hmyzak 您可以将默认缩放控件设置为 false mapView setBuiltInZoomControls false 添加
  • 从固定的快捷方式 android 中删除应用程序图标

    android 8之后 我的应用程序图标出现在快捷方式上 我知道这是专门添加的 目的是通知用户哪个应用程序创建了快捷方式 这是当前用于创建图标的代码 ShortcutManager shortcutManager ShortcutManag
  • R - 将向量中的每个元素与其他向量中的每个元素求和

    我有两个向量 我想要一个新向量 其元素是向量 1 的元素与向量 2 的元素之和 v1 lt c 1 2 3 4 5 6 v2 lt c 0 1 1 2 2 1 for i in 1 length v1 for j in 1 length v
  • 使用反射获取方法的调用层次结构

    我使用 java 反射从类中获取方法 加载这些类 现在我想获取这些方法的调用层次结构 我如何在 Eclipse IDE 中使用调用层次结构选项 有任何示例或链接吗 提出的解决方案是使用Thread currentThread getStac
  • Scala 构造函数重载?

    如何在 Scala 中提供重载构造函数 值得明确提及的是 Scala 中的辅助构造函数必须调用主构造函数 如 landon9720 中的 答案 或者调用同一类中的另一个辅助构造函数 作为其第一个操作 它们不能像在 Java 中那样简单地显式
  • 如何为通用Windows平台(UWP)应用程序创建.appx包?

    我已经为我的通用 Windows 平台应用程序创建了包 但是我无法找到为我的包生成 appx 文件的位置 它已生成 appxbundle 文件和所有其他文件 但未生成 appx 文件 An appx只是一个 zip64 文件 以及 appx
  • 如何使用 preg_match 检查全文

    您好 我对 preg match 函数有疑问 我想根据模式检查整个文本 如果整个文本与模式匹配 则返回 true 如果与模式不匹配或部分匹配 则返回 false 但我无法使用 php preg match 函数执行此操作 例如 我使用下面的
  • 更改

    我刚刚学习 JQUERY 我一直在玩delay 我写了一个小提琴来向你展示 我想做的是当单击按钮时 更改 div 的背景颜色 然后在过一会儿再次切换背景颜色 但是当我尝试时 它只是切换到第二种颜色并跳过第一种颜色 HTML div clas
  • 在列表中找到所有可能的对的最快方法是什么?

    基本上我有球员名单 我想将他们配对 以便每个球员都会与所有人比赛一次 找到这些数据的最快方法是什么 假设球员没有出现在名单中两次 则双倍for循环非常快 for int i 0 i lt playerList Count 2 i for i
  • 需要帮助在 Java 中将数字转换为单词

    我正在开发一个将数字转换为单词的程序 但我在使用 Numbers 类中的 toString 方法时遇到问题 所有的方法都给了我 我可以实现 因此 我无法删除其中任何一个 编号 4564 gt 四千五百六十四 这是代码 数字类 package
  • codeigniter 本地主机电子邮件未发送

    我有一些问题 我不明白 这是我的代码 this gt load gt library email config protocol sendmail config mailpath usr sbin sendmail config chars
  • 我可以在 .gitconfig 中为自己指定多个用户吗?

    In my gitconfig 我在下面列出了我的个人电子邮件地址 user 因为这就是我想用于 Github 存储库的内容 但是 我最近也开始使用 git 来工作 我公司的 git 存储库允许我提交 但是当它发出新变更集的公告时 它说它们
  • 将 MySQL 表 varchar 字段更改为十进制字段

    我有一个 MySQL 表 其中有几列 例如 id text 1 val 1 text 2 text 3 val 2 1 bla 1 23 blub 5 67 12 34 不幸的是所有字段都定义为varchar 现在我尝试更改表以将某些字段设
  • AFnetworking 多部分请求主体为零

    我正在尝试通过 af 2 0 多部分请求将图片上传到服务器 但似乎正文始终为空 这是代码的快照 self POST path parameters params constructingBodyWithBlock id
  • SwiftUI:按下按钮时的操作

    我正在尝试实现一个长按按钮 仅当我按下按钮时才会更改我使用的 SF 符号 我不太确定应该在哪里使用这些图像 问题是 当我停止按下时 按钮就会被触发 有没有办法不激活它 按下时除外 我正在寻找的是一个按钮 When pressed持续 2 秒
  • 当放入 XCode 中的不同编译单元时,静态 unordered_map 会被擦除

    我的类 C 中有一个静态 unordered map 如果我将类定义和声明放在与包含函数 main 的文件不同的文件中 我会遇到行为差异 问题是 我观察到 如果类 C 与函数 main 位于同一编译单元中 则一切都很好 我只看到一次文本 创
  • 赛普拉斯灯具最佳实践

    在 Cypress 文档中 他们建议以这种方式使用固定装置 cy fixture logo png then logo gt load data from logo png 但我发现它很混乱 并且有限制 因为我无法在运行测试之外获取此信息
  • Airflow apply_defaults 装饰器报告 参数是必需的

    我最近遇到了这个令人讨厌的错误 其中 Airflowapply defaults 装饰者正在抛出以下堆栈跟踪 my kwargs确实包含job flow id File