无法在 Airflow UI 中编辑 Spark_default

2024-01-01

我正在尝试使用以下存储库运行 Airflow 和 Spark 的容器化应用程序,https://github.com/cordon-thiago/airflow-spark https://github.com/cordon-thiago/airflow-spark

正如此处的步骤所示,我需要编辑 Spark_default 连接以便将我的 DAG 提交到 Spark,但是我似乎无法做到这一点。这就是我尝试这样做时所看到的,

                          ____/ (  (    )   )  \___
                         /( (  (  )   _    ))  )   )\
                       ((     (   )(    )  )   (   )  )
                     ((/  ( _(   )   (   _) ) (  () )  )
                    ( (  ( (_)   ((    (   )  .((_ ) .  )_
                   ( (  )    (      (  )    )   ) . ) (   )
                  (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
                  ( (  (   ) (  )   (  ))     ) _)(   )  )  )
                 ( (  ( \ ) (    (_  ( ) ( )  )   ) )  )) ( )
                  (  (   (  (   (_ ( ) ( _    )  ) (  )  )   )
                 ( (  ( (  (  )     (_  )  ) )  _)   ) _( ( )
                  ((  (   )(    (     _    )   _) _(_ (  (_ )
                   (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
                   ((__)        \\||lll|l||///          \_))
                            (   /(/ (  )  ) )\   )
                          (    ( ( ( | | ) ) )\   )
                           (   /(| / ( )) ) ) )) )
                         (     ( ((((_(|)_)))))     )
                          (      ||\(|(|)|/||     )
                        (        |(||(||)||||        )
                          (     //|/l|||)|\\ \     )
                        (/ / //  /|//||||\\  \ \  \ _)
-------------------------------------------------------------------------------
Node: 5fce0b10ba4b
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/flask_admin/model/base.py", line 2138, in edit_view
    form = self.edit_form(obj=model)
  File "/usr/local/lib/python3.6/site-packages/flask_admin/model/base.py", line 1340, in edit_form
    return self._edit_form_class(get_form_data(), obj=obj)
  File "/usr/local/lib/python3.6/site-packages/wtforms/form.py", line 208, in __call__
    return type.__call__(cls, *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/flask_admin/form/__init__.py", line 16, in __init__
    super(BaseForm, self).__init__(formdata=formdata, obj=obj, prefix=prefix, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/wtforms/form.py", line 274, in __init__
    self.process(formdata, obj, data=data, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/wtforms/form.py", line 126, in process
    if obj is not None and hasattr(obj, name):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/attributes.py", line 356, in __get__
    retval = self.descriptor.__get__(instance, owner)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/connection.py", line 212, in get_extra
    return fernet.decrypt(bytes(self._extra, 'utf-8')).decode()
  File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 199, in decrypt
    raise InvalidToken
cryptography.fernet.InvalidToken

我真的不知道 FERNET_KEY 是什么以及它如何在这里应用。我到底该如何设置才能运行我的 Spark 操作?

UDPATE

Under the Configuration tab in my Airflow UI, I seem to have the fernet_key configured, enter image description here

据我所知,这是通过以下命令生成的,

: "${AIRFLOW__CORE__FERNET_KEY:=${FERNET_KEY:=$(python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)")}}"

然后导出所有变量,

export \
  AIRFLOW__CELERY__BROKER_URL \
  AIRFLOW__CELERY__RESULT_BACKEND \
  AIRFLOW__CORE__EXECUTOR \
  AIRFLOW__CORE__FERNET_KEY \
  AIRFLOW__CORE__LOAD_EXAMPLES \
  AIRFLOW__CORE__SQL_ALCHEMY_CONN \

这似乎与文档中提供的内容一致。这里有什么问题?


您需要生成一个新的 fernet 密钥并将其添加到您的气流配置中。https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html

正如链接所示,airflow 使用 fernet 密钥来加密存储在连接信息中的密码。在上述情况下,fernet 密钥从未设置,因此出现错误 - 无效令牌。

Fernet 是对称加密的一种实现。但这已经超出了这个问题的范围!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法在 Airflow UI 中编辑 Spark_default 的相关文章

随机推荐