我正在使用AWS的MWAA服务(2.2.2) 运行各种 DAG,其中大多数都是使用标准 PythonOperator 类型实现的。我将 DAG 与任何共享需求一起捆绑到 S3 存储桶中,然后将 MWAA 指向相关对象和版本。到目前为止一切都进展顺利。
我现在想使用以下方法实现 DAGPython虚拟环境运算符类型,AWS 承认不支持开箱即用。我正在关注他们的向导关于如何使用自定义插件修补行为,但继续收到来自 Airflow 的错误,在仪表板顶部以大红色字体显示:
DAG 导入错误 (1)
……
AirflowException:PythonVirtualenvOperator 需要 virtualenv,请安装它。
我已经确认 Airflow 确实使用了该插件(我在管理屏幕中看到了它),为了避免产生疑问,我使用了 AWS 在 DAG 示例中提供的确切代码。 AWS 的相关文档非常简单,我还没有偶然发现任何社区讨论。
根据 AWS 的文档,我们希望该插件在启动时在处理任何 DAG 之前运行。该插件本身似乎有效地重写了 venv 命令以使用 pip 安装的版本,而不是安装在计算机上的版本,但是我一直在努力验证事情是否按照我期望的顺序发生。任何有关调试实例行为的指示都将非常感激。
有人遇到过类似的问题吗? MWAA 文档中是否存在需要解决的空白?我错过了一些非常明显的事情吗?
可能相关,但我确实在调度程序日志中看到此警告,这可能表明为什么 MWAA 正在努力解决依赖性?
警告:脚本 virtualenv 安装在 '/usr/local/airflow/.local/bin' 中,该路径不在 PATH 上。
Airflow 使用 Shutil.which 来查找 virtualenv。通过requirements.txt安装的virtualenv不在路径上。将 virtualenv 的路径添加到 PATH 可以解决此问题。
这里的文档是错误的https://docs.aws.amazon.com/mwaa/latest/userguide/samples-virtualenv.html
import os
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
#This is the added path code
os.environ["PATH"] = f"/usr/local/airflow/.local/bin:{os.environ['PATH']}"
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)