1.在所有绑定中创建表
观察:db.create_all()
calls self.get_tables_for_bind()
.
解决方案:覆盖SQLAlchemy
get_tables_for_bind()
支持'__all__'
.
class MySQLAlchemy(SQLAlchemy):
def get_tables_for_bind(self, bind=None):
result = []
for table in self.Model.metadata.tables.values():
# if table.info.get('bind_key') == bind:
if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
result.append(table)
return result
Usage:
# db = SQLAlchemy(app) # Replace this
db = MySQLAlchemy(app) # with this
db.create_all()
2.动态选择特定的绑定
观察:SignallingSession
get_bind()
负责确定绑定。
解决方案:
- 覆盖
SignallingSession
get_bind()
从某些上下文中获取绑定密钥。
- 覆盖
SQLAlchemy
create_session()
使用我们的自定义会话类。
- 支持上下文选择特定绑定
db
为了方便访问。
- 强制为表指定上下文
'__all__'
作为绑定键,通过覆盖SQLAlchemy
get_binds()
恢复默认引擎。
class MySignallingSession(SignallingSession):
def __init__(self, db, *args, **kwargs):
super().__init__(db, *args, **kwargs)
self.db = db
def get_bind(self, mapper=None, clause=None):
if mapper is not None:
info = getattr(mapper.persist_selectable, 'info', {})
if info.get('bind_key') == '__all__':
info['bind_key'] = self.db.context_bind_key
try:
return super().get_bind(mapper=mapper, clause=clause)
finally:
info['bind_key'] = '__all__'
return super().get_bind(mapper=mapper, clause=clause)
class MySQLAlchemy(SQLAlchemy):
context_bind_key = None
@contextmanager
def context(self, bind=None):
_context_bind_key = self.context_bind_key
try:
self.context_bind_key = bind
yield
finally:
self.context_bind_key = _context_bind_key
def create_session(self, options):
return orm.sessionmaker(class_=MySignallingSession, db=self, **options)
def get_binds(self, app=None):
binds = super().get_binds(app=app)
# Restore default engine for table.info.get('bind_key') == '__all__'
app = self.get_app(app)
engine = self.get_engine(app, None)
tables = self.get_tables_for_bind('__all__')
binds.update(dict((table, engine) for table in tables))
return binds
def get_tables_for_bind(self, bind=None):
result = []
for table in self.Model.metadata.tables.values():
if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
result.append(table)
return result
Usage:
class Patient(db.Model):
__tablename__ = "patients"
__bind_key__ = "__all__" # Add this
测试用例:
with db.context(bind='clinic1'):
db.session.add(Patient())
db.session.flush() # Flush in 'clinic1'
with db.context(bind='clinic2'):
patients_count = Patient.query.filter().count()
print(patients_count) # 0 in 'clinic2'
patients_count = Patient.query.filter().count()
print(patients_count) # 1 in 'clinic1'
关于引用默认绑定的外键
您必须指定schema
.
限制:
- MySQL:
- 绑定必须位于同一个 MySQL 实例中。否则,它必须是一个普通的列。
- The foreign object in the default bind must already be committed.
Otherwise, when inserting an object that references it, you will get this lock error:
MySQLdb._exceptions.OperationalError:(1205,'超出锁定等待超时;尝试重新启动事务')
- SQLite:不强制执行跨数据库的外键。
Usage:
# app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:pass@localhost/main'
class PatientType(db.Model):
__tablename__ = "patient_types"
__table_args__ = {"schema": "main"} # Add this, based on database name
id = Column(Integer, primary_key=True)
# ...
class Patient(db.Model):
__tablename__ = "patients"
__bind_key__ = "__all__"
id = Column(Integer, primary_key=True)
# ...
# patient_type_id = Column(Integer, ForeignKey("patient_types.id")) # Replace this
patient_type_id = Column(Integer, ForeignKey("main.patient_types.id")) # with this
patient_type = relationship("PatientType")
测试用例:
patient_type = PatientType.query.first()
if not patient_type:
patient_type = PatientType()
db.session.add(patient_type)
db.session.commit() # Commit to reference from other binds
with db.context(bind='clinic1'):
db.session.add(Patient(patient_type=patient_type))
db.session.flush() # Flush in 'clinic1'
with db.context(bind='clinic2'):
patients_count = Patient.query.filter().count()
print(patients_count) # 0 in 'clinic2'
patients_count = Patient.query.filter().count()
print(patients_count) # 1 in 'clinic1'