在 Airflow 中编写和导入自定义插件

2024-04-12

这实际上是两个问题合二为一。

My AIRFLOW_HOME结构如下

airflow
+-- dags
+-- plugins
    +-- __init__.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py
        +-- another_hook.py
    +-- operators
        +-- __init__.py
        +-- my_operator.py
        +-- another_operator.py
    +-- sensors
    +-- utils

我一直在这里关注 astronomer.io 的示例https://github.com/airflow-plugins https://github.com/airflow-plugins。我的定制operators使用我的习惯hooks,并且所有导入都是相对于顶级文件夹的plugins.

# my_operator.py
from plugins.hooks.my_hook import MyHook

但是,当我尝试将整个存储库移动到插件文件夹中时,运行后出现导入错误airflow list_dags这么说plugins找不到。

我读了一些相关内容,显然 Airflow 将插件加载到其核心模块中,以便可以像这样导入它们

# my_operator.py
from airflow.hooks.my_hook import MyHook

所以我将所有导入更改为直接读取airflow.plugin_type反而。不过,我收到另一个导入错误,这次说my_hook找不到。我每次都会重新启动我的工作人员、调度程序和网络服务器,但这似乎不是问题。我查看了类似问题中提出的解决方案,但它们也不起作用。

官方文档也是这样说明的https://airflow.apache.org/plugins.html https://airflow.apache.org/plugins.html的延长AirflowPlugin类,但我不确定这个“接口”应该驻留在哪里。我还更喜欢拖放选项。

最后,我的代码仓库显然没有意义plugins文件夹本身,但如果我将它们分开,测试就会变得不方便。每次在钩子/操作上运行单元测试时,是否都必须修改 Airflow 配置以指向我的存储库?测试自定义插件的最佳实践是什么?


我通过一些试验和错误发现了这一点。这是我的最终结构AIRFLOW_HOME folder

airflow 
+-- dags 
+-- plugins
    +-- __init__.py
    +-- plugin_name.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py 
        +-- another_hook.py 
    +-- operators
        +-- __init__.py
        +-- my_operator.py 
        +-- another_operator.py 
    +-- sensors 
    +-- utils

In plugin_name.py,我扩展AirflowPlugin class

# plugin_name.py

from airflow.plugins_manager import AirflowPlugin
from hooks.my_hook import *
from operators.my_operator import *
from utils.my_utils import *
# etc

class PluginName(AirflowPlugin):

    name = 'plugin_name'

    hooks = [MyHook]
    operators = [MyOperator]
    macros = [my_util_func]

在使用自定义挂钩的自定义运算符中,我将它们导入为

# my_operator.py

from hooks.my_hook import MyHook

然后在我的 DAG 文件中,我可以这样做

# sample_dag.py

from airflow.operators.plugin_name import MyOperator

需要重新启动网络服务器和调度程序。我花了一段时间才弄清楚。

这也有利于测试,因为自定义类中的导入是相对于文件夹中的子模块的plugins。我想知道我是否可以省略__init__.py里面的文件plugins,但由于一切正常,我没有尝试这样做。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Airflow 中编写和导入自定义插件 的相关文章

随机推荐