具有多个绑定的 SQLAlchemy - 动态选择绑定到查询

2023-12-01

我有 4 个不同的数据库,每个数据库对应我的一位客户(医疗诊所),所有这些数据库都具有完全相同的结构。

在我的应用程序中,我有这样的模型Patient, Doctor, Appointment, etc.

我们以其中一个为例:

class Patient(db.Model):
    __tablename__ = "patients"

    id = Column(Integer, primary_key=True)
    first_name = Column(String, index=True)
    last_name = Column(String, index=True)
    date_of_birth = Column(Date, index=True)

我发现在绑定的帮助下,我可以创建不同的数据库并将每个模型关联到不同的绑定。所以我有这样的配置:

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:pass@localhost/main'
app.config['SQLALCHEMY_BINDS'] = {
    'clinic1':'mysql://user:pass@localhost/clinic1',
    'clinic2':'mysql://user:pass@localhost/clinic2',
    'clinic3':'mysql://user:pass@localhost/clinic3',
    'clinic4':'mysql://user:pass@localhost/clinic4'
}

现在我正在努力实现两件事:

  1. 我希望当我使用创建表时db.create_all()它将创建patients所有 4 个数据库中的表 (clinic1->clinic4)
  2. 我希望能够动态选择特定的绑定(在运行时),以便任何查询,例如Patient.query.filter().count()将针对所选的绑定数据库运行

理想情况下,它的行为如下:

with DbContext(bind='client1'):
    patients_count = Patient.query.filter().count()
    print(patients_count)

# outside of the `with` context we are back to the default bind

但是,这样做:

patients_count = Patient.query.filter().count()

不指定绑定,将引发错误(如patients默认绑定中不存在表)

任何可以指导如何完成此操作的代码示例将不胜感激!

附:您可能会建议不要使用不同的数据库,而是使用具有不同列/表的数据库,但请坚持我的示例,并尝试解释如何使用多个相同数据库的这种模式来完成此操作
Thanks!


1.在所有绑定中创建表

观察:db.create_all() calls self.get_tables_for_bind().

解决方案:覆盖SQLAlchemy get_tables_for_bind()支持'__all__'.

class MySQLAlchemy(SQLAlchemy):

    def get_tables_for_bind(self, bind=None):
        result = []
        for table in self.Model.metadata.tables.values():
            # if table.info.get('bind_key') == bind:
            if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
                result.append(table)
        return result

Usage:

# db = SQLAlchemy(app)  # Replace this
db = MySQLAlchemy(app)  # with this

db.create_all()

2.动态选择特定的绑定

观察:SignallingSession get_bind()负责确定绑定。

解决方案:

  1. 覆盖SignallingSession get_bind()从某些上下文中获取绑定密钥。
  2. 覆盖SQLAlchemy create_session()使用我们的自定义会话类。
  3. 支持上下文选择特定绑定db为了方便访问。
  4. 强制为表指定上下文'__all__'作为绑定键,通过覆盖SQLAlchemy get_binds()恢复默认引擎。
class MySignallingSession(SignallingSession):
    def __init__(self, db, *args, **kwargs):
        super().__init__(db, *args, **kwargs)
        self.db = db

    def get_bind(self, mapper=None, clause=None):
        if mapper is not None:
            info = getattr(mapper.persist_selectable, 'info', {})
            if info.get('bind_key') == '__all__':
                info['bind_key'] = self.db.context_bind_key
                try:
                    return super().get_bind(mapper=mapper, clause=clause)
                finally:
                    info['bind_key'] = '__all__'
        return super().get_bind(mapper=mapper, clause=clause)


class MySQLAlchemy(SQLAlchemy):
    context_bind_key = None

    @contextmanager
    def context(self, bind=None):
        _context_bind_key = self.context_bind_key
        try:
            self.context_bind_key = bind
            yield
        finally:
            self.context_bind_key = _context_bind_key

    def create_session(self, options):
        return orm.sessionmaker(class_=MySignallingSession, db=self, **options)

    def get_binds(self, app=None):
        binds = super().get_binds(app=app)
        # Restore default engine for table.info.get('bind_key') == '__all__'
        app = self.get_app(app)
        engine = self.get_engine(app, None)
        tables = self.get_tables_for_bind('__all__')
        binds.update(dict((table, engine) for table in tables))
        return binds

    def get_tables_for_bind(self, bind=None):
        result = []
        for table in self.Model.metadata.tables.values():
            if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
                result.append(table)
        return result

Usage:

class Patient(db.Model):
    __tablename__ = "patients"
    __bind_key__ = "__all__"  # Add this

测试用例:

with db.context(bind='clinic1'):
    db.session.add(Patient())
    db.session.flush()         # Flush in 'clinic1'
    with db.context(bind='clinic2'):
        patients_count = Patient.query.filter().count()
        print(patients_count)  # 0 in 'clinic2'
    patients_count = Patient.query.filter().count()
    print(patients_count)      # 1 in 'clinic1'

关于引用默认绑定的外键

您必须指定schema.

限制:

  • MySQL:
    • 绑定必须位于同一个 MySQL 实例中。否则,它必须是一个普通的列。
    • The foreign object in the default bind must already be committed.
      Otherwise, when inserting an object that references it, you will get this lock error:

      MySQLdb._exceptions.OperationalError:(1205,'超出锁定等待超时;尝试重新启动事务')

  • SQLite:不强制执行跨数据库的外键。

Usage:

# app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:pass@localhost/main'


class PatientType(db.Model):
    __tablename__ = "patient_types"
    __table_args__ = {"schema": "main"}  # Add this, based on database name

    id = Column(Integer, primary_key=True)
    # ...


class Patient(db.Model):
    __tablename__ = "patients"
    __bind_key__ = "__all__"

    id = Column(Integer, primary_key=True)
    # ...
    # patient_type_id = Column(Integer, ForeignKey("patient_types.id"))     # Replace this
    patient_type_id = Column(Integer, ForeignKey("main.patient_types.id"))  # with this
    patient_type = relationship("PatientType")

测试用例:

patient_type = PatientType.query.first()
if not patient_type:
    patient_type = PatientType()
    db.session.add(patient_type)
    db.session.commit()        # Commit to reference from other binds

with db.context(bind='clinic1'):
    db.session.add(Patient(patient_type=patient_type))
    db.session.flush()         # Flush in 'clinic1'
    with db.context(bind='clinic2'):
        patients_count = Patient.query.filter().count()
        print(patients_count)  # 0 in 'clinic2'
    patients_count = Patient.query.filter().count()
    print(patients_count)      # 1 in 'clinic1'
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有多个绑定的 SQLAlchemy - 动态选择绑定到查询 的相关文章

  • 在 Django 中定义视图和 url。为什么调用函数时不使用括号?

    我已经在经历 Python速成课程 目前正在进行 Django Web应用程序项目 学习日志 阶段 有些东西与我已经学到的相矛盾 views py file from django shortcuts import render def i
  • Matplotlib 标准化颜色条 (Python)

    我正在尝试使用 matplotlib 当然还有 numpy 绘制轮廓图 它有效 它绘制了它应该绘制的内容 但不幸的是我无法设置颜色条范围 问题是我有很多图 并且需要所有图都具有相同的颜色条 相同的最小值和最大值 相同的颜色 我复制并粘贴了在
  • 为什么我不能导入 geopandas?

    我唯一的代码行是 import geopandas 它给了我错误 OSError Could not find libspatialindex c library file 以前有人遇到过这个吗 我的脚本运行得很好 直到出现此错误 请注意
  • Python 中的六边形自组织映射

    我在寻找六边形 自组织映射 http en wikipedia org wiki Self organizing map在Python上 准备好模块 如果存在的话 绘制六边形单元格的方法 将六边形单元作为数组或其他方式使用的算法 About
  • 如何用 python 和 sympy 解决多元不等式?

    我对使用 python 和 Sympy 还很陌生 并且遇到了使用 sympy 解决多元不等式的问题 假设我的文件中有很多函数 如下所示 cst sqrt x 2 cst exp sqrt cst x 1 4 log log sqrt cst
  • 类属性在功能上依赖于其他类属性

    我正在尝试使用静态类属性来定义另一个静态类属性 我认为可以通过以下代码来实现 f lambda s s 1 class A foo foo bar f A foo 然而 这导致NameError name A is not defined
  • Python 中 genfromtxt() 的可变列数?

    我有一个 txt具有不同长度的行的文件 每一行都是代表一条轨迹的一系列点 由于每条轨迹都有自己的长度 因此各行的长度都不同 也就是说 列数从一行到另一行不同 据我所知 genfromtxt Python 中的模块要求列数相同 gt gt g
  • Sorted(key=lambda: ...) 背后的语法[重复]

    这个问题在这里已经有答案了 我不太明白背后的语法sorted 争论 key lambda variable variable 0 Isn t lambda随意的 为什么是variable在看起来像的内容中陈述了两次dict 我认为这里的所有
  • 无法包含外部 pandas 文档 Pycharm v--2018.1.2

    我无法包含外部 pandas 文档Pycharm v 2018 1 2 例如 numpy gt http docs scipy org doc numpy reference generated module name element na
  • 将一个时间序列插入到 pandas 中的另一个时间序列中

    我有一组定期测量的值 说 import pandas as pd import numpy as np rng pd date range 2013 01 01 periods 12 freq H data pd Series np ran
  • python ttk treeview:如何选择并设置焦点在一行上?

    我有一个 ttk Treeview 小部件 其中包含一些数据行 如何设置焦点并选择 突出显示 指定项目 tree focus set 什么也没做 tree selection set 0 抱怨 尽管小部件明显填充了超过零个项目 但未找到项目
  • 当x轴不连续时如何删除冗余日期时间 pandas DatetimeIndex

    我想绘制一个 pandas 系列 其索引是无数的 DatatimeIndex 我的代码如下 import matplotlib dates as mdates index pd DatetimeIndex 2000 01 01 00 00
  • Python:随时接受用户输入

    我正在创建一个可以做很多事情的单元 其中之一是计算机器的周期 虽然我将把它转移到梯形逻辑 CoDeSys 但我首先将我的想法放入 Python 中 我将进行计数 只需一个简单的操作 counter 1 print counter 跟踪我处于
  • 在 Mac 上安装 Pygame 到 Enthought 构建中

    关于在 Mac 上安装 Pygame 有许多未解答的问题 但我将在这里提出我的具体问题并希望得到答案 我在 Mac 上安装 Pygame 时遇到了难以置信的困难 我使用 Enthought 版本 EPD 7 3 2 32 位 它是我的默认框
  • urllib2.urlopen() 是否实际获取页面?

    当我使用 urllib2 urlopen 时 我在考虑它只是为了读取标题还是实际上带回整个网页 IE 是否真的通过 urlopen 调用或 read 调用获取 HTML 页面 handle urllib2 urlopen url html
  • WindowsError:[错误 5] 访问被拒绝

    我一直在尝试终止一个进程 但我的所有选项都给出了 Windows 访问被拒绝错误 我通过以下方式打开进程 一个python脚本 test subprocess Popen sys executable testsc py 我想杀死那个进程
  • Scrapy 蜘蛛无法工作

    由于到目前为止没有任何效果 我开始了一个新项目 python scrapy ctl py startproject Nu 我完全按照教程操作 创建了文件夹和一个新的蜘蛛 from scrapy contrib spiders import
  • CSV 在列中查找最大值并附加新数据

    大约两个小时前 我问了一个关于从网站读取和写入数据的问题 从那时起 我花了最后两个小时试图找到一种方法来从输出的 A 列读取最大日期值 将该值与刷新的网站数据进行比较 并将任何新数据附加到 csv 文件而不覆盖旧的或创建重复项 目前 100
  • 如何在单独的文件中使用 FastAPI Depends 作为端点/路由?

    我在单独的文件中定义了一个 Websocket 端点 例如 from starlette endpoints import WebSocketEndpoint from connection service import Connectio
  • 如何对字符串列表进行排序?

    在 Python 中创建按字母顺序排序的列表的最佳方法是什么 基本回答 mylist b C A mylist sort 这会修改您的原始列表 即就地排序 要获取列表的排序副本而不更改原始列表 请使用sorted http docs pyt

随机推荐