这实际上是两个问题合二为一。
My AIRFLOW_HOME
结构如下
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
我一直在这里关注 astronomer.io 的示例https://github.com/airflow-plugins https://github.com/airflow-plugins。我的定制operators
使用我的习惯hooks
,并且所有导入都是相对于顶级文件夹的plugins
.
# my_operator.py
from plugins.hooks.my_hook import MyHook
但是,当我尝试将整个存储库移动到插件文件夹中时,运行后出现导入错误airflow list_dags
这么说plugins
找不到。
我读了一些相关内容,显然 Airflow 将插件加载到其核心模块中,以便可以像这样导入它们
# my_operator.py
from airflow.hooks.my_hook import MyHook
所以我将所有导入更改为直接读取airflow.plugin_type
反而。不过,我收到另一个导入错误,这次说my_hook
找不到。我每次都会重新启动我的工作人员、调度程序和网络服务器,但这似乎不是问题。我查看了类似问题中提出的解决方案,但它们也不起作用。
官方文档也是这样说明的https://airflow.apache.org/plugins.html https://airflow.apache.org/plugins.html的延长AirflowPlugin
类,但我不确定这个“接口”应该驻留在哪里。我还更喜欢拖放选项。
最后,我的代码仓库显然没有意义plugins
文件夹本身,但如果我将它们分开,测试就会变得不方便。每次在钩子/操作上运行单元测试时,是否都必须修改 Airflow 配置以指向我的存储库?测试自定义插件的最佳实践是什么?
我通过一些试验和错误发现了这一点。这是我的最终结构AIRFLOW_HOME
folder
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- plugin_name.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
In plugin_name.py
,我扩展AirflowPlugin
class
# plugin_name.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_hook import *
from operators.my_operator import *
from utils.my_utils import *
# etc
class PluginName(AirflowPlugin):
name = 'plugin_name'
hooks = [MyHook]
operators = [MyOperator]
macros = [my_util_func]
在使用自定义挂钩的自定义运算符中,我将它们导入为
# my_operator.py
from hooks.my_hook import MyHook
然后在我的 DAG 文件中,我可以这样做
# sample_dag.py
from airflow.operators.plugin_name import MyOperator
需要重新启动网络服务器和调度程序。我花了一段时间才弄清楚。
这也有利于测试,因为自定义类中的导入是相对于文件夹中的子模块的plugins
。我想知道我是否可以省略__init__.py
里面的文件plugins
,但由于一切正常,我没有尝试这样做。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)