有没有更简洁的方法从 Postgres 挂钩获取完整的 URI?.get_uri()
不包含“额外”参数,所以我像这样附加它们:
def pg_conn_id_to_uri(postgres_conn_id):
hook = PostgresHook(postgres_conn_id)
uri = hook.get_uri()
extra = hook.get_connection(postgres_conn_id).extra_dejson
params = [ f'{k}={v}' for k, v in extra.items() ]
if params:
params = '&'.join(params)
uri += f'?{params}'
return uri
如果更清洁并不一定意味着简洁,那么这里有一些可能有用的东西
from typing import Dict, Any
from psycopg2 import extensions
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models.connection import Connection
def pg_conn_id_to_uri(postgres_conn_id: str) -> str:
# create hook & conn
hook: PostgresHook = PostgresHook(postgres_conn_id=postgres_conn_id)
conn: Connection = hook.get_connection(conn_id=postgres_conn_id)
# retrieve conn_args & extras
extras: Dict[str, Any] = conn.extra_dejson
conn_args: Dict[str, Any] = dict(
host=conn.host,
user=conn.login,
password=conn.password,
dbname=conn.schema,
port=conn.port)
conn_args_with_extras: Dict[str, Any] = {**conn_args, **extras}
# build and return string
conn_string: str = extensions.make_dsn(dsn=None, **conn_args_with_extras)
return conn_string
请注意,该片段未经测试
当然,我们仍然可以从这里修剪更多的行(例如通过使用python
的语法糖conn.__dict__.items()
),但我更喜欢清晰而不是简洁
提示已取自Airflow
的&pyscopg2
的代码本身
- PostgresHook https://github.com/apache/airflow/blob/4a1d71d23df683ca1d164c7a7803beeda6595c51/airflow/providers/postgres/hooks/postgres.py#L80
- pyscopg2 https://github.com/psycopg/psycopg2/blob/master/lib/__init__.py#L126
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)