Python SQLAlchemy ( ORM )、dictalchemy、Flask-SQLAlchemy、Flask-migrate、flask-script、flask-upload

2023-11-19

From

SQLAlchemy 文档:https://www.sqlalchemy.org/
SQLAlchemy入门和进阶:https://zhuanlan.zhihu.com/p/27400862
SQLAlchemy 2.0 教程:https://wiki.masantu.com/sqlalchemy-tutorial/
SQLAlchemy2.0 简明指南 ( 异步 ):https://xiaotaoist.github.io/2023/04/20/sqlalchemy/

1、ORM、SQLAlchemy 简介

ORM 全称 Object Relational Mapping(对象关系映射)。是把 "关系数据库的表结构" 映射到 "Python对象" 上,这样就可以直接操纵 Python 对象,不用再写SQL进行操作,也就是在代码层面考虑的是对象,而不是SQL。

具体的实现方式是

  • 数据库表 转换为 Python类
  • 其中 数据列 作为 类的属性
  • 数据库操作 作为 方法

优点:

  1. 简洁易读:将数据表抽象为对象(数据模型),更直观易读
  2. 可移植:封装了多种数据库引擎,面对多个数据库,操作基本一致,代码易维护
  3. 更安全:有效避免 SQL 注入

在 Python 中最有名的 ORM 框架是 SQLAlchemy 。它可以与任意的第三方 web 框架相结合,如 flask、tornado、django、fastapi 等。SQLALchemy 相较于 Django ORM 来说更贴近原生的 SQL 语句,因此学习难度较低。

SQLALchemy 由以下5个部分组成:

  • Engine:框架引擎
  • Connection Pooling:数据库链接池
  • Dialect:方言,调用不同的数据库 API(Oracle, postgresql, Mysql) 并执行对应的 SQL语句。即 数据库DB API 种类。
  • Schema / Types:" 类 到 表" 之间的映射规则
  • SQL Exprression Language:SQL表达式语言

图示如下:

运行流程:

  • 首先用户输入的操作会交由ORM对象
  • 接下来ORM对象会将用户操作提交给SQLALchemy Core
  • 其次该操作会由Schema/Types以及SQL Expression Language转换为SQL语句
  • 然后Egine会匹配用户已经配置好的egine,并从链接池中去取出一个链接
  • 最终该链接会通过Dialect调用DBAPI,将SQL语句转交给DBAPI去执行

相关概念

常见数据类型

安装 sqlalchemy

安装:pip install sqlalchemy

数据库 连接 字符串

SQLAlchemy 必须依赖其他操纵数据库的模块才能进行使用,也就是上面提到的 DBAPI。

SQLAlchemy 配合 DBAPI 使用时,链接字符串也有所不同,如下所示:

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

连接 引擎

任何 SQLAlchemy 应用程序的开始都是一个 Engine 对象,此对象充当连接到特定数据库的中心源,提供被称为connection pool的对于这些数据库连接。

Engine对象通常是一个只为特定数据库服务器创建一次的全局对象,并使用一个URL字符串进行配置,该字符串将描述如何连接到数据库主机或后端。

>>> from sqlalchemy import create_engine
>>> engine = create_engine('sqlite:///:memory:', echo=True)

初始化创建 engine ,engine 内部维护了一个Pool(连接池)和Dialect(方言),方言来识别具体连接数据库种类。

创建好了 engine 的同时,Pool 和 Dialect 也已经创建好了,但是此时并没有真正与数据库连接,等到执行具体的语句.connect()等时才会连接到数据库。

create_engine 的参数有很多,我列一些比较常用的:

  • echo=False -- 如果为真,引擎将记录所有语句以及 repr() 其参数列表的默认日志处理程序。
  • enable_from_linting -- 默认为True。如果发现给定的SELECT语句与将导致笛卡尔积的元素取消链接,则将发出警告。
  • encoding -- 默认为 utf-8
  • future -- 使用2.0样式
  • hide_parameters -- 布尔值,当设置为True时,SQL语句参数将不会显示在信息日志中,也不会格式化为 StatementError 对象。
  • listeners -- 一个或多个列表 PoolListener 将接收连接池事件的对象。
  • logging_name -- 字符串标识符,默认为对象id的十六进制字符串。
  • max_identifier_length -- 整数;重写方言确定的最大标识符长度。
  • max_overflow=10 -- 允许在连接池中“溢出”的连接数,即可以在池大小设置(默认为5)之上或之外打开的连接数。
  • pool_size=5 -- 在连接池中保持打开的连接数。默认为5个,设置为0时表示连接无限制
  • pool_recycle   设置时间以限制数据库多久没连接自动断开
  • plugins -- 要加载的插件名称的字符串列表。

声明 映射

也就是在 Python 中创建的一个类,对应着数据库中的一张表,类的每个属性,就是这个表的字段名,这种 类对应于数据库中表的类,就称为映射类。

我们要创建一个映射类,是基于基类定义的,每个映射类都要继承这个基类 declarative_base()。

from sqlalchemy.orm import declarative_base
Base = declarative_base()

from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

declarative_base() 是 sqlalchemy 内部封装的一个方法,通过其构造一个基类,这个基类以及它的子类,可以将Python类和数据库表关联映射起来。

数据库表模型类通过 __tablename__ 和表关联起来,Column 表示数据表的列。

示例:

  • 新建一张名为 users 的表,也就是用户表。
  • 新建一个名为 User 类将是我们映射此表的类。在类中,我们定义了要映射到的表的详细信息,主要是表名以及列的名称和数据类型:
from sqlalchemy import Column, Integer, String


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    nickname = Column(String)

    def __repr__(self):
        return "<User(name='%s', fullname='%s', nickname='%s')>" % (
            self.name,
            self.fullname,
            self.nickname,
        )
  • __tablename__  代表表名
  • Column : 代表数据表中的一列,内部定义了数据类型
  • primary_key:主键

创建 表到数据库

通过定义 User类,我们已经定义了关于表的信息,称为 table metadata,也就是表的元数据。我们可以通过检查 __table__ 属性:

User.__table__ 
Table('users', MetaData(),
            Column('id', Integer(), table=<users>, primary_key=True, nullable=False),
            Column('name', String(), table=<users>),
            Column('fullname', String(), table=<users>),
            Column('nickname', String(), table=<users>), schema=None)

开始 创建表:如果存在则忽略,执行下面代码,就会发现在 db 中创建了 users 表。

Base.metadata.create_all(engine)

创建 会话 ( session )

sqlalchemy 中使用 session 用于创建程序和数据库之间的会话,所有对象的载入和保存都需要通过session对象 。即 对表的所有操作,都是通过会话实现的。

通过 sessionmaker 调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源:

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)
# 实例化
session = Session()

session 的常见操作方法包括:

  1. flush:预提交,提交到数据库文件,还未写入数据库文件中
  2. commit:提交了一个事务
  3. rollback:回滚
  4. close:关闭

"添加、更新" 对象

>>> ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
>>> session.add(ed_user)

新增了一个用户,此时这个数据并没有被同步的数据库中,而是处于等待的状态。
上面代码,实例对象只是在环境的内存中有效,并没有在表中真正生成数据。
只有执行了 commit() 方法后,才会真正在数据表中创建数据。
如果我们查询数据库,则首先刷新所有待处理信息,然后立即发出查询。

>>> our_user = session.query(User).filter_by(name='ed').first() 
>>> our_user
<User(name='ed', fullname='Ed Jones', nickname='edsnickname')>

此时得到的结果也并不是数据库表中的最终数据,而是映射类的一个对象。

增、删、改、查

add_user = Users("test", "test123@qq.com")
session.add(add_user)
session.commit()

session.add() 将会把 Model 加入当前 session 维护的持久空间(可以从session.dirty看到)中,直到commit 时提交到数据库。
add 之后执行 db.session.flush(),这样便可在session中get到对象的属性。
批量插入共有以下几种方法,对它们的批量做了比较,分别是:
session.add_all() < bulk_save_object() < bulk_insert_mappings() < SQLAlchemy_core()

查询是最常用的一个操作了,举个最简单的查询例子:

users = session.query(Users).filter_by(id=1).all()
for item in users:
    print(item.name)

通常我们通过以上查询模式获取数据,需要注意的是,通过session.query()我们查询返回了一个Query对象,此时还没有去具体的数据库中查询,只有当执行具体的.all(),.first()等函数时才会真的去操作数据库。

其中,query 有 filter 和 filter_by 两个过滤方法,通常这两个方法都会用到的,

上述例子也可写为:

users = session.query(Users).filter_by(Users.id == 1).all()

更新数据有两种方法,一种是使用 query 中的 update 方法:

session.query(Users).filter_by(id=1).update({'name': "Jack"})

另一种是操作对应的表模型:

users = session.query(Users).filter_by(name="Jack").first()
users.name = "test"
session.add(users)

一般批量更新的话可以选前者,而要对查询获取对象属性之后再更新的场景就需要使用后者。

和更新数据类似,删除数据也有两种方法,第一种:

delete_users = session.query(Users).filter(Users.name == "test").first()
if delete_users:
    session.delete(delete_users)
    session.commit()

第二种:( 批量删除时推荐 )

session.query(Users).filter(Users.name == "test").delete()
session.commit()

回滚

在 commit() 之前,对实例对象的属性所做的更改,可以进行回滚,回到更改之前。

>>> session.rollback()

本质上只是把某一条数据(也就是映射类的实例)从内存中删除而已,并没有对数据库有任何操作。

查询

通过 query 关键字查询。

>>> for instance in session.query(User).order_by(User.id):
...     print(instance.name, instance.fullname)
ed Ed Jones
wendy Wendy Williams
mary Mary Contrary
fred Fred Flintstone

  • query.filter() 过滤
  • query.filter_by() 根据关键字过滤
  • query.all() 返回列表
  • query.first() 返回第一个元素
  • query.one() 有且只有一个元素时才正确返回
  • query.one_or_none(),类似one,但如果没有找到结果,则不会引发错误
  • query.scalar(),调用one方法,并在成功时返回行的第一列
  • query.count() 计数
  • query.order_by() 排序

query.join() 连接查询

>>> session.query(User).join(Address).\
...         filter(Address.email_address=='jack@google.com').\
...         all()
[<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>]

query(column.label()) 可以为字段名(列)设置别名:

>>> for row in session.query(User.name.label('name_label')).all():
...    print(row.name_label)
ed
wendy
mary
fred

aliased()为查询对象设置别名:

>>> from sqlalchemy.orm import aliased
>>> user_alias = aliased(User, name='user_alias')

SQL>>> for row in session.query(user_alias, user_alias.name).all():
...    print(row.user_alias)
<User(name='ed', fullname='Ed Jones', nickname='eddie')>
<User(name='wendy', fullname='Wendy Williams', nickname='windy')>
<User(name='mary', fullname='Mary Contrary', nickname='mary')>
<User(name='fred', fullname='Fred Flintstone', nickname='freddy')>

查询常用筛选器运算符

# 等于
query.filter(User.name == 'ed')

# 不等于
query.filter(User.name != 'ed')

# like和ilike
query.filter(User.name.like('%ed%'))
query.filter(User.name.ilike('%ed%')) # 不区分大小写

# in
query.filter(User.name.in_(['ed', 'wendy', 'jack']))
query.filter(User.name.in_(
    session.query(User.name).filter(User.name.like('%ed%'))
))
# not in
query.filter(~User.name.in_(['ed', 'wendy', 'jack'])) 

# is
query.filter(User.name == None)
query.filter(User.name.is_(None))

# is not
query.filter(User.name != None)
query.filter(User.name.is_not(None))

# and
from sqlalchemy import and_
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')

# or
from sqlalchemy import or_
query.filter(or_(User.name == 'ed', User.name == 'wendy'))

# match
query.filter(User.name.match('wendy'))

使用文本 SQL

文字字符串可以灵活地用于Query 查询。

>>> from sqlalchemy import text
SQL>>> for user in session.query(User).\
...             filter(text("id<224")).\
...             order_by(text("id")).all():
...     print(user.name)
ed
wendy
mary
fred 

使用冒号指定绑定参数。要指定值,请使用Query.params()方法:

>>> session.query(User).filter(text("id<:value and name=:name")).\
...     params(value=224, name='fred').order_by(User.id).one()
<User(name='fred', fullname='Fred Flintstone', nickname='freddy')>

一对多

一个用户可以有多个邮件地址,意味着我们要新建一个表与用户表进行映射和查询。

>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy.orm import relationship

>>> class Address(Base):
...     __tablename__ = 'addresses'
...     id = Column(Integer, primary_key=True)
...     email_address = Column(String, nullable=False)
...     user_id = Column(Integer, ForeignKey('users.id'))
...
...     user = relationship("User", back_populates="addresses")
...
...     def __repr__(self):
...         return "<Address(email_address='%s')>" % self.email_address

>>> User.addresses = relationship(
...     "Address", order_by=Address.id, back_populates="user")

ForeignKey定义两列之间依赖关系,表示关联了用户表的用户ID

relationship 告诉ORMAddress类本身应链接到User类,back_populates 表示引用的互补属性名,也就是本身的表名。

多对多

除了表的一对多,还存在多对多的关系,例如在一个博客网站中,有很多的博客BlogPost,每篇博客有很多的Keyword,每一个Keyword又能对应很多博客。

对于普通的多对多,我们需要创建一个未映射的Table构造以用作关联表。如下所示:

>>> from sqlalchemy import Table, Text
>>> # association table
>>> post_keywords = Table('post_keywords', Base.metadata,
...     Column('post_id', ForeignKey('posts.id'), primary_key=True),
...     Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
... ) 

下一步我们定义BlogPostKeyword,使用互补 relationship 构造,每个引用post_keywords表作为关联表:

>>> class BlogPost(Base):
...     __tablename__ = 'posts'
...
...     id = Column(Integer, primary_key=True)
...     user_id = Column(Integer, ForeignKey('users.id'))
...     headline = Column(String(255), nullable=False)
...     body = Column(Text)
...
...     # many to many BlogPost<->Keyword
...     keywords = relationship('Keyword',
...                             secondary=post_keywords,
...                             back_populates='posts')
...
...     def __init__(self, headline, body, author):
...         self.author = author
...         self.headline = headline
...         self.body = body
...
...     def __repr__(self):
...         return "BlogPost(%r, %r, %r)" % (self.headline, self.body, self.author)


>>> class Keyword(Base):
...     __tablename__ = 'keywords'
...
...     id = Column(Integer, primary_key=True)
...     keyword = Column(String(50), nullable=False, unique=True)
...     posts = relationship('BlogPost',
...                          secondary=post_keywords,
...                          back_populates='keywords')
...
...     def __init__(self, keyword):
...         self.keyword = keyword

多对多关系的定义特征是secondary关键字参数引用Table表示关联表的对象。

示例 代码 ( 创建 "库、表" )

from sqlalchemy.dialects.mysql import INTEGER, VARCHAR
from sqlalchemy import Table, Column, Date, Integer, String, ForeignKey
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy_utils import database_exists, create_database

############################################################################
conn_mysql_string = 'mysql+pymysql://root:root@127.0.0.1:3306'
db_name = 'local_test'
conn_db_string = f'{conn_mysql_string}/{db_name}'
base_table = declarative_base()


############################################################################
# users表结构
class Users(base_table):
    __tablename__ = 'users'

    user_id = Column(INTEGER, primary_key=True)
    # user_id = Column(Integer, primary_key=True)
    user_name = Column(String(50))
    fullname = Column(String(50))
    nickname = Column(String(50))
    age = Column(Integer)
    place = Column(String(50), nullable=False)
    descript = Column(String(50), nullable=False)

    def __init__(self, user_id, user_name, fullname, nickname, age, place, descript):
        self.user_id = user_id
        self.user_name = user_name
        self.fullname = fullname
        self.nickname = nickname
        self.age = age
        self.place = place
        self.descript = descript

    def __repr__(self):
        return "<User(name='%s', fullname='%s', nickname='%s')>" % (
            self.name,
            self.fullname,
            self.nickname,
        )


############################################################################


def create_db_1():
    # 创建引擎,并连接 mysql
    engine = create_engine(conn_mysql_string, encoding="utf-8", echo=True)
    engine.execute(f'CREATE DATABASE {db_name}')
    print('创建 "数据库" 成功')


def create_db_2():
    with create_engine(conn_mysql_string, isolation_level='AUTOCOMMIT').connect() as connection:
        connection.execute(f'CREATE DATABASE {db_name} charset="utf8"')


def create_db_3():
    # 利用 sqlalchemy_utils 库的 create_database 模块
    engine = create_engine(conn_db_string)
    if not database_exists(engine.url):
        create_database(engine.url)
    print(database_exists(engine.url))


def create_table_1():
    # 创建引擎,并连接数据库
    engine = create_engine(conn_db_string, encoding="utf-8", echo=True)
    # declarative_base() 是 sqlalchemy 内部封装的一个方法
    # 通过其构造一个基类,这个基类以及它的子类,可以将Python类和数据库表关联映射起来。
    base_table.metadata.create_all(engine)
    pass


def create_table_2():
    engine = create_engine(conn_db_string, encoding="utf-8", echo=True)
    # 绑定引擎
    metadata = MetaData(engine)
    # 定义表格
    user_table = Table(
        'user', metadata,
        Column('id', Integer, primary_key=True),
        Column('name', String(50)),
        Column('fullname', String(100))
    )

    address_table = Table(
        'address', metadata,
        Column('id', Integer, primary_key=True),
        Column('user_id', None, ForeignKey('user.id')),
        Column('email', String(128), nullable=False)
    )
    metadata.create_all()
    pass


def insert_data():
    # 初始化数据库连接
    engine = create_engine(conn_db_string, encoding="utf-8")
    # 创建 DBSession类型
    db_session = sessionmaker(bind=engine)
    # 创建session对象
    session = db_session()
    # 插入单条数据
    # 创建新User对象
    new_user = Users(
        user_id=1, user_name='Jack', fullname='fullname', nickname='nickname',
        age=25, place='USA', descript='descript'
    )
    # 添加到session
    session.add(new_user)
    # 提交即保存到数据库
    session.commit()

    # 插入多条数据
    user_list = [
        Users(user_id=2, user_name='Green', fullname='fullname', nickname='nickname', age=26, place='UK',
              descript='descript'),
        Users(user_id=3, user_name='Alex', fullname='fullname', nickname='nickname', age=31, place='GER',
              descript='descript'),
        Users(user_id=4, user_name='Chen', fullname='fullname', nickname='nickname', age=52, place='CHN',
              descript='descript'),
        Users(user_id=5, user_name='Zhang', fullname='fullname', nickname='nickname', age=42, place='CHN',
              descript='descript')
    ]
    session.add_all(user_list)
    session.commit()
    # 关闭session
    session.close()
    print('数据插入成功')


def query_data():
    # 初始化数据库连接
    engine = create_engine(conn_db_string, encoding="utf-8")
    # 创建 DBSession类型
    db_session = sessionmaker(bind=engine)
    # 创建session对象
    session = db_session()
    # 查询所有place是CHN的人名
    # 创建Query查询,filter是where条件
    # 调用one()返回唯一行,如果调用all()则返回所有行:
    users = session.query(Users).filter(Users.place == 'CHN').all()
    print([use.user_name for use in users])
    # 或者用如下查询
    users = session.query(Users.user_name).filter(Users.place == 'CHN').all()
    print(users)
    session.close()


def update_data():
    # 初始化数据库连接
    engine = create_engine(conn_db_string, encoding="utf-8")
    # 创建 DBSession类型
    db_session = sessionmaker(bind=engine)
    # 创建session对象
    session = db_session()
    # 数据更新,将Jack的place修改为CHN
    update_obj = session.query(Users).filter(Users.user_name == 'Jack').update({"place": "CHN"})
    session.commit()
    session.close()
    print("更新数据成功")


def delete_data():
    # 初始化数据库连接
    engine = create_engine(conn_db_string, encoding="utf-8")
    # 创建 DBSession类型
    db_session = sessionmaker(bind=engine)
    # 创建session对象
    session = db_session()
    # 数据更新,将Jack的记录删除
    update_obj = session.query(Users).filter(Users.name == 'Jack').delete()
    session.commit()
    session.close()
    print("Delete data successfully!")


if __name__ == '__main__':
    # create_db_1()
    # create_db_2()
    # create_db_3()
    # create_table_1()
    # create_table_2()
    # insert_data()
    query_data()
    # update_data()
    # delete_data()
    pass

2、使用 SQLAlchemy 操作 表

创建单表

SQLAlchemy 不允许修改表结构,如果需要修改表结构则必须删除旧表,再创建新表,或者执行原生的 SQL 语句 ALERT TABLE 进行修改。

这意味着在使用非原生SQL语句修改表结构时,表中已有的所有记录将会丢失,所以我们最好一次性的设计好整个表结构避免后期修改:

# models.py
import datetime
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session

from sqlalchemy import (
    create_engine,
    Column,
    Integer,
    String,
    Enum,
    DECIMAL,
    DateTime,
    Boolean,
    UniqueConstraint,
    Index,
)
from sqlalchemy.ext.declarative import declarative_base

# 基础类
Base = declarative_base()

# 创建引擎
engine = create_engine(
    "mysql+pymysql://tom:123@192.168.0.120:3306/db1?charset=utf8mb4",
    # "mysql+pymysql://tom@127.0.0.1:3306/db1?charset=utf8mb4", # 无密码时
    # 超过链接池大小外最多创建的链接
    max_overflow=0,
    # 链接池大小
    pool_size=5,
    # 链接池中没有可用链接则最多等待的秒数,超过该秒数后报错
    pool_timeout=10,
    # 多久之后对链接池中的链接进行一次回收
    pool_recycle=1,
    # 查看原生语句(未格式化)
    echo=True,
)

# 绑定引擎
Session = sessionmaker(bind=engine)
# 创建数据库链接池,直接使用session即可为当前线程拿出一个链接对象conn
# 内部会采用threading.local进行隔离
session = scoped_session(Session)


class UserInfo(Base):
    """必须继承Base"""

    # 数据库中存储的表名
    __tablename__ = "userInfo"
    # 对于必须插入的字段,采用nullable=False进行约束,它相当于NOT NULL
    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
    name = Column(String(32), index=True, nullable=False, comment="姓名")
    age = Column(Integer, nullable=False, comment="年龄")
    phone = Column(DECIMAL(6), nullable=False, unique=True, comment="手机号")
    address = Column(String(64), nullable=False, comment="地址")
    # 对于非必须插入的字段,不用采取nullable=False进行约束
    gender = Column(Enum("male", "female"), default="male", comment="性别")
    create_time = Column(DateTime, default=datetime.datetime.now, comment="创建时间")
    last_update_time = Column(
        DateTime, onupdate=datetime.datetime.now, comment="最后更新时间"
    )
    delete_status = Column(Boolean(), default=False, comment="是否删除")

    __table__args__ = (
        UniqueConstraint("name", "age", "phone"),  # 联合唯一约束
        Index("name", "addr", unique=True),  # 联合唯一索引
    )

    def __str__(self):
        return f"object : <id:{self.id} name:{self.name}>"


if __name__ == "__main__":
    # 删除表
    Base.metadata.drop_all(engine)
    # 创建表
    Base.metadata.create_all(engine)

记录操作

新增记录

新增单条记录:

# 获取链接池、ORM表对象
import models
​
​
user_instance = models.UserInfo(
    name="Jack",
    age=18,
    phone=330621,
    address="Beijing",
    gender="male"
)
​
models.session.add(user_instance)
​
# 提交
models.session.commit()
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

批量新增

批量新增能减少TCP链接次数,提升插入性能:

# 获取链接池、ORM表对象
import models
​
​
user_instance1 = models.UserInfo(
    name="Tom",
    age=19,
    phone=330624,
    address="Shanghai",
    gender="male"
)
​
user_instance2 = models.UserInfo(
    name="Mary",
    age=20,
    phone=330623,
    address="Chongqing",
    gender="female"
)
​
​
models.session.add_all(
    (
        user_instance1,
        user_instance2
    )
)
​
# 提交
models.session.commit()
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

修改记录

修改某些记录:

# 获取链接池、ORM表对象
import models
​
# 修改的信息:
#  - Jack -> Jack + son
# 在SQLAlchemy中,四则运算符号只能用于数值类型
# 如果是字符串类型需要在原本的基础值上做改变,必须设置
#  - age -> age + 1
# synchronize_session=False
​
models.session.query(models.UserInfo)\
    .filter_by(name="Jack")\
    .update(
        {
            "name": models.UserInfo.name + "son",
            "age": models.UserInfo.age + 1
        },
        synchronize_session=False
)
# 本次修改具有字符串字段在原值基础上做更改的操作,所以必须添加
# synchronize_session=False
# 如果只修改年龄,则不用添加
​
# 提交
models.session.commit()
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

删除记录

删除记录用的比较少,了解即可,一般都是像上面那样增加一个delete_status的字段,如果为1则代表删除:

# 获取链接池、ORM表对象
import models
​
models.session.query(models.UserInfo).filter_by(name="Mary").delete()
​
# 提交
models.session.commit()
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

单表查询

基本查询

查所有记录、所有字段,all()方法将返回一个列表,内部包裹着每一行的记录对象:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(models.UserInfo)\
    .all()
​
print(result)
# [<models.UserInfo object at 0x7f4d3d606fd0>, <models.UserInfo object at 0x7f4d3d606f70>]
​
for row in result:
    print(row)
# object : <id:1 name:Jackson>
# object : <id:2 name:Tom>
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

查所有记录、某些字段(注意,下面返回的元组实际上是一个命名元组,可以直接通过.操作符进行操作):

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.UserInfo.id,
    models.UserInfo.name,
    models.UserInfo.age
).all()
​
print(result)
# [(1, 'Jackson', 19), (2, 'Tom', 19)]
​
for row in result:
    print(row)
# (1, 'Jackson', 19)
# (2, 'Tom', 19)
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

只拿第一条记录,first()方法将返回单条记录对象(注意,下面返回的元组实际上是一个命名元组,可以直接通过.操作符进行操作):

# 获取链接池、ORM表对象
import models

result = models.session.query(
    models.UserInfo.id,
    models.UserInfo.name,
    models.UserInfo.age
).first()

print(result)
# (1, 'Jackson', 19)

# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

AS别名

通过字段的label()方法,我们可以为它取一个别名:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.UserInfo.name.label("s_name"),
    models.UserInfo.age.label("s_age")
).all()
​
for row in result:
    print(row.s_name)
    print(row.s_age)
​
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

条件查询

一个条件的过滤:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.UserInfo,
).filter(
    models.UserInfo.name == "Jackson"
).all()
​
# 上面是Python语句形式的过滤条件,由filter方法调用
# 亦可以使用ORM的形式进行过滤,通过filter_by方法调用
# 如下所示
# .filter_by(name="Jackson").all()
# 个人更推荐使用filter过滤,它看起来更直观,更简单,可以支持 == != > < >= <=等常见符号
​
# 过滤成功的结果数量
print(len(result))
# 1
​
# 过滤成功的结果
print(result)
# [<models.UserInfo object at 0x7f11391ea2b0>]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

AND查询:

# 获取链接池、ORM表对象
import models
# 导入AND
from sqlalchemy import and_
​
result = models.session.query(
    models.UserInfo,
).filter(
    and_(
        models.UserInfo.name == "Jackson",
        models.UserInfo.gender == "male"
    )
).all()
​
# 过滤成功的结果数量
print(len(result))
# 1
​
# 过滤成功的结果
print(result)
# [<models.UserInfo object at 0x7f11391ea2b0>]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

OR查询:

# 获取链接池、ORM表对象
import models
# 导入OR
from sqlalchemy import or_
​
result = models.session.query(
    models.UserInfo,
).filter(
    or_(
        models.UserInfo.name == "Jackson",
        models.UserInfo.gender == "male"
    )
).all()
​
# 过滤成功的结果数量
print(len(result))
# 1
​
# 过滤成功的结果
print(result)
# [<models.UserInfo object at 0x7f11391ea2b0>]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

NOT查询:

# 获取链接池、ORM表对象
import models
# 导入NOT
from sqlalchemy import not_
​
result = models.session.query(
    models.UserInfo,
).filter(
    not_(
        models.UserInfo.name == "Jackson",
    )
).all()
​
# 过滤成功的结果数量
print(len(result))
# 1
​
# 过滤成功的结果
print(result)
# [<models.UserInfo object at 0x7f11391ea2b0>]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

范围查询

BETWEEN查询:

# 获取链接池、ORM表对象
import models

result = models.session.query(
    models.UserInfo,
).filter(
    models.UserInfo.age.between(15, 21)
).all()

# 过滤成功的结果数量
print(len(result))
# 1

# 过滤成功的结果
print(result)
# [<models.UserInfo object at 0x7f11391ea2b0>]

# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

包含查询

IN查询:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.UserInfo,
).filter(
    models.UserInfo.age.in_((18, 19, 20))
).all()
​
# 过滤成功的结果数量
print(len(result))
# 2
​
# 过滤成功的结果
print(result)
# [<models.UserInfo object at 0x7fdeeaa774f0>, <models.UserInfo object at 0x7fdeeaa77490>]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

NOT IN,只需要加上~即可:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.UserInfo,
).filter(
    ~models.UserInfo.age.in_((18, 19, 20))
).all()
​
# 过滤成功的结果数量
print(len(result))
# 0
​
# 过滤成功的结果
print(result)
# []
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

模糊匹配

LIKE查询:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.UserInfo,
).filter(
    models.UserInfo.name.like("Jack%")
).all()
​
# 过滤成功的结果数量
print(len(result))
# 1
​
# 过滤成功的结果
print(result)
# [<models.UserInfo object at 0x7fee1614f4f0>]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

分页查询

对结果all()返回的列表进行一次切片即可:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.UserInfo,
).all()[0:1]
​
# 过滤成功的结果数量
print(len(result))
# 1
​
# 过滤成功的结果
print(result)
# [<models.UserInfo object at 0x7fee1614f4f0>]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

排序查询

ASC升序、DESC降序,需要指定排序规则:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.UserInfo,
).filter(
    models.UserInfo.age > 12
).order_by(
    models.UserInfo.age.desc()
).all()
​
# 过滤成功的结果数量
print(len(result))
# 2
​
# 过滤成功的结果
print(result)
# [<models.UserInfo object at 0x7f90eccd26d0>, <models.UserInfo object at 0x7f90eccd2670>]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

聚合分组

聚合分组与having过滤:

# 获取链接池、ORM表对象
import models
# 导入聚合函数
from sqlalchemy import func
​
result = models.session.query(
    func.sum(models.UserInfo.age)
).group_by(
    models.UserInfo.gender
).having(
    func.sum(models.UserInfo.id > 1)
).all()
​
# 过滤成功的结果数量
print(len(result))
# 1
​
# 过滤成功的结果
print(result)
# [(Decimal('38'),)]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

多表查询

多表创建

五表关系:

建表语句:

# models.py
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import relationship
​
from sqlalchemy import (
    create_engine,
    Column,
    Integer,
    Date,
    String,
    Enum,
    ForeignKey,
    UniqueConstraint,
)
from sqlalchemy.ext.declarative import declarative_base
​
# 基础类
Base = declarative_base()
​
# 创建引擎
engine = create_engine(
    "mysql+pymysql://tom:123@192.168.0.120:3306/db1?charset=utf8mb4",
    # "mysql+pymysql://tom@127.0.0.1:3306/db1?charset=utf8mb4", # 无密码时
    # 超过链接池大小外最多创建的链接
    max_overflow=0,
    # 链接池大小
    pool_size=5,
    # 链接池中没有可用链接则最多等待的秒数,超过该秒数后报错
    pool_timeout=10,
    # 多久之后对链接池中的链接进行一次回收
    pool_recycle=1,
    # 查看原生语句
    # echo=True
)
​
# 绑定引擎
Session = sessionmaker(bind=engine)
# 创建数据库链接池,直接使用session即可为当前线程拿出一个链接对象
# 内部会采用threading.local进行隔离
session = scoped_session(Session)
​
​
class StudentsNumberInfo(Base):
    """学号表"""
    __tablename__ = "studentsNumberInfo"
    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
    number = Column(Integer, nullable=False, unique=True, comment="学生编号")
    admission = Column(Date, nullable=False, comment="入学时间")
    graduation = Column(Date, nullable=False, comment="毕业时间")
​
​
class TeachersInfo(Base):
    """教师表"""
    __tablename__ = "teachersInfo"
    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
    number = Column(Integer, nullable=False, unique=True, comment="教师编号")
    name = Column(String(64), nullable=False, comment="教师姓名")
    gender = Column(Enum("male", "female"), nullable=False, comment="教师性别")
    age = Column(Integer, nullable=False, comment="教师年龄")
​
​
class ClassesInfo(Base):
    """班级表"""
    __tablename__ = "classesInfo"
    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
    number = Column(Integer, nullable=False, unique=True, comment="班级编号")
    name = Column(String(64), nullable=False, unique=True, comment="班级名称")
    # 一对一关系必须为连接表的连接字段创建UNIQUE的约束,这样才能是一对一,否则是一对多
    fk_teacher_id = Column(
        Integer,
        ForeignKey(
            "teachersInfo.id",
            ondelete="CASCADE",
            onupdate="CASCADE",
        ),
        nullable=False,
        unique=True,
        comment="班级负责人"
    )
    # 下面这2个均属于逻辑字段,适用于正反向查询。在使用ORM的时候,我们不必每次都进行JOIN查询,而恰好正反向的查询使用频率会更高
    # 这种逻辑字段不会在物理层面上创建,它只适用于查询,本身不占据任何数据库的空间
    # sqlalchemy的正反向概念与Django有所不同,Django是外键字段在那边,那边就作为正
    # 而sqlalchemy是relationship字段在那边,那边就作为正
    # 比如班级表拥有 relationship 字段,而老师表不曾拥有
    # 那么用班级表的这个relationship字段查老师时,就称为正向查询
    # 反之,如果用老师来查班级,就称为反向查询
    # 另外对于这个逻辑字段而言,根据不同的表关系,创建的位置也不一样:
    #  - 1 TO 1:建立在任意一方均可,查询频率高的一方最好
    #  - 1 TO M:建立在M的一方
    #  - M TO M:中间表中建立2个逻辑字段,这样任意一方都可以先反向,再正向拿到另一方
    #  - 遵循一个原则,ForeignKey建立在那个表上,那个表上就建立relationship
    #  - 有几个ForeignKey,就建立几个relationship
    # 总而言之,使用ORM与原生SQL最直观的区别就是正反向查询能带来更高的代码编写效率,也更加简单
    # 甚至我们可以不用外键约束,只创建这种逻辑字段,让表与表之间的耦合度更低,但是这样要避免脏数据的产生
​
    # 班级负责人,这里是一对一关系,一个班级只有一个负责人
    leader_teacher = relationship(
        # 正向查询时所链接的表,当使用 classesInfo.leader_teacher 时,它将自动指向fk的那一条记录
        "TeachersInfo",
        # 反向查询时所链接的表,当使用 teachersInfo.leader_class 时,它将自动指向该老师所管理的班级
        backref="leader_class",
    )
​
​
class ClassesAndTeachersRelationship(Base):
    """任教老师与班级的关系表"""
    __tablename__ = "classesAndTeachersRelationship"
    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
    # 中间表中注意不要设置单列的UNIQUE约束,否则就会变为一对一
    fk_teacher_id = Column(
        Integer,
        ForeignKey(
            "teachersInfo.id",
            ondelete="CASCADE",
            onupdate="CASCADE",
        ),
        nullable=False,
        comment="教师记录"
    )
​
    fk_class_id = Column(
        Integer,
        ForeignKey(
            "classesInfo.id",
            ondelete="CASCADE",
            onupdate="CASCADE",
        ),
        nullable=False,
        comment="班级记录"
    )
    # 多对多关系的中间表必须使用联合唯一约束,防止出现重复数据
    __table_args__ = (
        UniqueConstraint("fk_teacher_id", "fk_class_id"),
    )
​
    # 逻辑字段
    # 给班级用的,查看所有任教老师
    mid_to_teacher = relationship(
        "TeachersInfo",
        backref="mid",
    )
​
    # 给老师用的,查看所有任教班级
    mid_to_class = relationship(
        "ClassesInfo",
        backref="mid"
    )
​
​
class StudentsInfo(Base):
    """学生信息表"""
    __tablename__ = "studentsInfo"
    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
    name = Column(String(64), nullable=False, comment="学生姓名")
    gender = Column(Enum("male", "female"), nullable=False, comment="学生性别")
    age = Column(Integer, nullable=False, comment="学生年龄")
    # 外键约束
    # 一对一关系必须为连接表的连接字段创建UNIQUE的约束,这样才能是一对一,否则是一对多
    fk_student_id = Column(
        Integer,
        ForeignKey(
            "studentsNumberInfo.id",
            ondelete="CASCADE",
            onupdate="CASCADE"
        ),
        nullable=False,
        comment="学生编号"
    )
    # 相比于一对一,连接表的连接字段不用UNIQUE约束即为多对一关系
    fk_class_id = Column(
        Integer,
        ForeignKey(
            "classesInfo.id",
            ondelete="CASCADE",
            onupdate="CASCADE"
        ),
        comment="班级编号"
    )
    # 逻辑字段
    # 所在班级, 这里是一对多关系,一个班级中可以有多名学生
    from_class = relationship(
        "ClassesInfo",
        backref="have_student",
    )
    # 学生学号,这里是一对一关系,一个学生只能拥有一个学号
    number_info = relationship(
        "StudentsNumberInfo",
        backref="student_info",
    )
​
​
if __name__ == "__main__":
    # 删除表
    Base.metadata.drop_all(engine)
    # 创建表
    Base.metadata.create_all(engine)

插入数据:

# 获取链接池、ORM表对象
import models
import datetime
​
​
models.session.add_all(
    (
        # 插入学号表数据
        models.StudentsNumberInfo(
            number=160201,
            admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
            graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
        ),
        models.StudentsNumberInfo(
            number=160101,
            admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
            graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
        ),
        models.StudentsNumberInfo(
            number=160301,
            admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
            graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
        ),
        models.StudentsNumberInfo(
            number=160102,
            admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
            graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
        ),
        models.StudentsNumberInfo(
            number=160302,
            admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
            graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
        ),
        models.StudentsNumberInfo(
            number=160202,
            admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
            graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
        ),
        # 插入教师表数据
        models.TeachersInfo(
            number=3341, name="David", gender="male", age=32,
        ),
        models.TeachersInfo(
            number=3342, name="Jason", gender="male", age=30,
        ),
        models.TeachersInfo(
            number=3343, name="Lisa", gender="female", age=28,
        ),
        # 插入班级表数据
        models.ClassesInfo(
            number=1601, name="one year one class", fk_teacher_id=1
        ),
        models.ClassesInfo(
            number=1602, name="one year two class", fk_teacher_id=2
        ),
        models.ClassesInfo(
            number=1603, name="one year three class", fk_teacher_id=3
        ),
        # 插入中间表数据
        models.ClassesAndTeachersRelationship(
            fk_class_id=1, fk_teacher_id=1
        ),
        models.ClassesAndTeachersRelationship(
            fk_class_id=2, fk_teacher_id=1
        ),
        models.ClassesAndTeachersRelationship(
            fk_class_id=3, fk_teacher_id=1
        ),
        models.ClassesAndTeachersRelationship(
            fk_class_id=1, fk_teacher_id=2
        ),
        models.ClassesAndTeachersRelationship(
            fk_class_id=3, fk_teacher_id=3
        ),
        # 插入学生表数据
        models.StudentsInfo(
            name="Jack", gender="male", age=17, fk_student_id=1, fk_class_id=2
        ),
        models.StudentsInfo(
            name="Tom", gender="male", age=18, fk_student_id=2, fk_class_id=1
        ),
        models.StudentsInfo(
            name="Mary", gender="female", age=16, fk_student_id=3,
            fk_class_id=3
        ),
        models.StudentsInfo(
            name="Anna", gender="female", age=17, fk_student_id=4,
            fk_class_id=1
        ),
        models.StudentsInfo(
            name="Bobby", gender="male", age=18, fk_student_id=6, fk_class_id=2
        ),
    )
)
​
models.session.commit()
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

JOIN查询

INNER JOIN:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.StudentsInfo.name,
    models.StudentsNumberInfo.number,
    models.ClassesInfo.number
).join(
    models.StudentsNumberInfo,
    models.StudentsInfo.fk_student_id == models.StudentsNumberInfo.id
).join(
    models.ClassesInfo,
    models.StudentsInfo.fk_class_id == models.ClassesInfo.id
).all()
​
print(result)
# [('Jack', 160201, 1602), ('Tom', 160101, 1601), ('Mary', 160301, 1603), ('Anna', 160102, 1601), ('Bobby', 160202, 1602)]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

LEFT JOIN只需要在每个JOIN中指定isouter关键字参数为True即可:

session.query(
    左表.字段,
    右表.字段
)
.join(
    右表,
    链接条件,
    isouter=True
).all()

RIGHT JOIN需要换表的位置,SQLALchemy本身并未提供RIGHT JOIN,所以使用时一定要注意驱动顺序,小表驱动大表(如果不注意顺序,MySQL优化器内部也会优化):

session.query(
    左表.字段,
    右表.字段
)
.join(
    左表,
    链接条件,
    isouter=True
).all()

UNION&UNION ALL

将多个查询结果联合起来,必须使用filter(),后面不加all()方法。

因为all()会返回一个列表,而filter()返回的是一个<class 'sqlalchemy.orm.query.Query'>查询对象,此外,必须单拿某一个字段,不能不指定字段直接query():

# 获取链接池、ORM表对象
import models
​
students_name = models.session.query(models.StudentsInfo.name).filter()
students_number = models.session.query(models.StudentsNumberInfo.number)\
    .filter()
class_name = models.session.query(models.ClassesInfo.name).filter()
​
result = students_name.union_all(students_number).union_all(class_name)
​
print(result.all())
# [
#      ('Jack',), ('Tom',), ('Mary',), ('Anna',), ('Bobby',),
#      ('160101',), ('160102',), ('160201',), ('160202',), ('160301',), ('160302',),
#      ('one year one class',), ('one year three class',), ('one year two class',)
# ]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

子查询

子查询使用subquery()实现,如下所示,查询每个班级中年龄最小的人:

# 获取链接池、ORM表对象
import models
from sqlalchemy import func
​
# 子查询中所有字段的访问都需要加上c的前缀
# 如 sub_query.c.id、 sub_query.c.name等
sub_query = models.session.query(
    # 使用label()来为字段AS一个别名
    # 后续访问需要通过sub_query.c.alias进行访问
    func.min(models.StudentsInfo.age).label("min_age"),
    models.ClassesInfo.id,
    models.ClassesInfo.name
).join(
    models.ClassesInfo,
    models.StudentsInfo.fk_class_id == models.ClassesInfo.id
).group_by(
    models.ClassesInfo.id
).subquery()
​
​
result = models.session.query(
    models.StudentsInfo.name,
    sub_query.c.min_age,
    sub_query.c.name
).join(
    sub_query,
    sub_query.c.id == models.StudentsInfo.fk_class_id
).filter(
   sub_query.c.min_age == models.StudentsInfo.age
)
​
print(result.all())
# [('Jack', 17, 'one year two class'), ('Mary', 16, 'one year three class'), ('Anna', 17, 'one year one class')]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

正反查询

上面我们都是通过JOIN进行查询的,实际上我们也可以通过逻辑字段relationship进行查询。

下面是正向查询的示例,正向查询是指从有relationship逻辑字段的表开始查询:

# 查询所有学生的所在班级,我们可以通过学生的from_class字段拿到其所在班级
# 另外,对于学生来说,班级只能有一个,所以have_student应当是一个对象
​
# 获取链接池、ORM表对象
import models
​
students_lst = models.session.query(
    models.StudentsInfo
).all()
​
for row in students_lst:
    print(f"""
            student name : {row.name}
            from : {row.from_class.name}
          """)
​
# student name : Mary
# from : one year three class
​
# student name : Anna
# from : one year one class
​
# student name : Bobby
# from : one year two class
​
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

下面是反向查询的示例,反向查询是指从没有relationship逻辑字段的表开始查询:

# 查询所有班级中的所有学生,学生表中有relationship,并且它的backref为have_student,所以我们可以通过班级.have_student来获取所有学生记录
​
# 另外,对于班级来说,学生可以有多个,所以have_student应当是一个序列
​
# 获取链接池、ORM表对象
import models
​
classes_lst = models.session.query(
    models.ClassesInfo
).all()
​
for row in classes_lst:
    print("class name :", row.name)
    for student in row.have_student:
        print("student name :", student.name)
​
# class name : one year one class
#      student name : Jack
#      student name : Anna
# class name : one year two class
#      student name : Tom
# class name : one year three class
#      student name : Mary
#      student name : Bobby
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

总结,正向查询的逻辑字段总是得到一个对象,反向查询的逻辑字段总是得到一个列表。

反向方法

使用逻辑字段relationship可以直接对一些跨表记录进行增删改查。

由于逻辑字段是一个类似于列表的存在(仅限于反向查询,正向查询总是得到一个对象),所以列表的绝大多数方法都能用。

<class 'sqlalchemy.orm.collections.InstrumentedList'>
    - append()
    - clear()
    - copy()
    - count()
    - extend()
    - index()
    - insert()
    - pop()
    - remove()
    - reverse()
    - sort()

下面不再进行实机演示,因为我们上面的几张表中做了很多约束。

# 比如
# 给老师增加班级
result = session.query(Teachers).first()
# extend方法:
result.re_class.extend([
    Classes(name="三年级一班",),
    Classes(name="三年级二班",),
])
​
# 比如
# 减少老师所在的班级
result = session.query(Teachers).first()
​
# 待删除的班级对象,集合查找比较快
delete_class_set = {
    session.query(Classes).filter_by(id=7).first(),
    session.query(Classes).filter_by(id=8).first(),
}
​
# 循换老师所在的班级
# remove方法:
for class_obj in result.re_class:
    if class_obj in delete_class_set:
        result.re_class.remove(class_obj)
​
# 比如
# 清空老师所任教的所有班级
# 拿出一个老师
result = session.query(Teachers).first()
result.re_class.clear()

查询案例

1)查看每个班级共有多少学生:

JOIN查询:

# 获取链接池、ORM表对象
import models
​
from sqlalchemy import func
​
result = models.session.query(
    models.ClassesInfo.name,
    func.count(models.StudentsInfo.id)
).join(
    models.StudentsInfo,
    models.ClassesInfo.id == models.StudentsInfo.fk_class_id
).group_by(
    models.ClassesInfo.id
).all()
​
print(result)
# [('one year one class', 2), ('one year two class', 2), ('one year three class', 1)]
​
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

正反查询:

# 获取链接池、ORM表对象
import models
​
result = {}
class_lst = models.session.query(
    models.ClassesInfo
).all()
​
for row in class_lst:
    for student in row.have_student:
        count = result.setdefault(row.name, 0)
        result[row.name] = count + 1
​
print(result.items())
# dict_items([('one year one class', 2), ('one year two class', 2), ('one year three class', 1)])
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

2)查看每个学生的入学、毕业年份以及所在的班级名称:

JOIN查询:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.StudentsNumberInfo.number,
    models.StudentsInfo.name,
    models.ClassesInfo.name,
    models.StudentsNumberInfo.admission,
    models.StudentsNumberInfo.graduation
).join(
    models.StudentsInfo,
    models.StudentsInfo.fk_class_id == models.ClassesInfo.id
).join(
    models.StudentsNumberInfo,
    models.StudentsNumberInfo.id == models.StudentsInfo.fk_student_id
).order_by(
    models.StudentsNumberInfo.number.asc()
).all()
​
print(result)
# [
#     (160101, 'Tom', 'one year one class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
#     (160102, 'Anna', 'one year one class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
#     (160201, 'Jack', 'one year two class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
#     (160202, 'Bobby', 'one year two class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
#     (160301, 'Mary', 'one year three class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15))
# ]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

正反查询:

# 获取链接池、ORM表对象
import models
​
result = []
​
student_lst = models.session.query(
    models.StudentsInfo
).all()
​
for row in student_lst:
    result.append((
        row.number_info.number,
        row.name,
        row.from_class.name,
        row.number_info.admission,
        row.number_info.graduation
    ))
​
print(result)
# [
#     (160101, 'Tom', 'one year one class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
#     (160102, 'Anna', 'one year one class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
#     (160201, 'Jack', 'one year two class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
#     (160202, 'Bobby', 'one year two class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
#     (160301, 'Mary', 'one year three class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15))
# ]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

3)查看David所教授的学生中年龄最小的学生:

JOIN查询:

# 获取链接池、ORM表对象
import models
​
result = models.session.query(
    models.TeachersInfo.name,
    models.StudentsInfo.name,
    models.StudentsInfo.age,
    models.ClassesInfo.name
).join(
    models.ClassesAndTeachersRelationship,
    models.ClassesAndTeachersRelationship.fk_class_id == models.ClassesInfo.id
).join(
    models.TeachersInfo,
    models.ClassesAndTeachersRelationship.fk_teacher_id == models.TeachersInfo.id
).join(
    models.StudentsInfo,
    models.StudentsInfo.fk_class_id == models.ClassesInfo.id
).filter(
    models.TeachersInfo.name == "David"
).order_by(
    models.StudentsInfo.age.asc(),
    models.StudentsInfo.id.asc()
).limit(1).all()
​
print(result)
# [('David', 'Mary', 16, 'one year three class')]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

正反查询:

# 获取链接池、ORM表对象
import models
​
david = models.session.query(
    models.TeachersInfo
).filter(
    models.TeachersInfo.name == "David"
).first()
​
student_lst = []
​
# 反向查询拿到任教班级,反向是一个列表,所以直接for
for row in david.mid:
    cls = row.mid_to_class
    # 通过任教班级,反向拿到其下的所有学生
    cls_students = cls.have_student
    # 遍历学生
    for student in cls_students:
        student_lst.append(
            (
                david.name,
                student.name,
                student.age,
                cls.name
            )
        )
​
# 筛选出年龄最小的
min_age_student_lst = sorted(
    student_lst, key=lambda tpl: tpl[2])[0]
​
print(min_age_student_lst)
# ('David', 'Mary', 16, 'one year three class')
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

4)查看每个班级的负责人是谁,以及任课老师都有谁:

JOIN查询:

# 获取链接池、ORM表对象
import models
​
from sqlalchemy import func
​
# 先查任课老师
sub_query = models.session.query(
    models.ClassesAndTeachersRelationship.fk_class_id.label("class_id"),
    func.group_concat(models.TeachersInfo.name).label("have_teachers")
).join(
    models.ClassesInfo,
    models.ClassesAndTeachersRelationship.fk_class_id == models.ClassesInfo.id
).join(
    models.TeachersInfo,
    models.ClassesAndTeachersRelationship.fk_teacher_id == models.TeachersInfo.id
).group_by(
    models.ClassesAndTeachersRelationship.fk_class_id
).subquery()
​
result = models.session.query(
    models.ClassesInfo.name.label("class_name"),
    models.TeachersInfo.name.label("leader_teacher"),
    sub_query.c.have_teachers.label("have_teachers")
).join(
    models.TeachersInfo,
    models.ClassesInfo.fk_teacher_id == models.TeachersInfo.id
).join(
    sub_query,
    sub_query.c.class_id == models.ClassesInfo.id
).all()
​
print(result)
# [('one year one class', 'David', 'Jason,David'), ('one year two class', 'Jason', 'David'), ('one year three class', 'Lisa', 'David,Lisa')]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

正反查询:

# 获取链接池、ORM表对象
import models
​
result = []
​
# 获取所有班级
classes_lst = models.session.query(
    models.ClassesInfo
).all()
​
for cls in classes_lst:
    cls_message = [
        cls.name,
        cls.leader_teacher.name,
        [],
    ]
    for row in cls.mid:
        cls_message[-1].append(row.mid_to_teacher.name)
    result.append(cls_message)
​
print(result)
# [['one year one class', 'David', ['David', 'Jason']], ['one year two class', 'Jason', ['David']], ['one year three class', 'Lisa', ['David', 'Lisa']]]
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()

原生SQL

查看执行命令

如果一条查询语句是filter()结尾,则该对象的__str__方法会返回格式化后的查询语句:

print(
    models.session.query(models.StudentsInfo).filter()
)
​
SELECT `studentsInfo`.id AS `studentsInfo_id`, `studentsInfo`.name AS `studentsInfo_name`, `studentsInfo`.gender AS `studentsInfo_gender`, `studentsInfo`.age AS `studentsInfo_age`, `studentsInfo`.fk_student_id AS `studentsInfo_fk_student_id`, `studentsInfo`.fk_class_id AS `studentsInfo_fk_class_id`
FROM `studentsInfo`

执行原生命令

执行原生命令可使用session.execute()方法执行,它将返回一个cursor游标对象,如下所示:

# 获取链接池、ORM表对象
import models
​
cursor = models.session.execute(
    "SELECT * FROM studentsInfo WHERE id = (:uid)", params={'uid': 1})
​
print(cursor.fetchall())
​
# 关闭链接,亦可使用session.remove(),它将回收该链接
models.session.close()  # 获取链接池、ORM表对象

3、Python 库(包) dictalchemy

文档:https://pythonhosted.org/dictalchemy/

dictalchemy 是一个用于将 SQLAlchemy 模型转换为字典的 Python 库。它提供了一种简单的方式来将 SQLAlchemy 模型对象转换为字典形式,方便在 Python 中进行处理和序列化操作。

安装:pip install dictalchemy

安装:pip install dictalchemy3   (移除对 Python2的支持,兼容最新的SQLAlchemy )

示例

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from dictalchemy import make_class_dictable

# 创建 SQLAlchemy 数据库引擎和会话
engine = create_engine('sqlite:///mydatabase.db')
Base = declarative_base(bind=engine)
session = scoped_session(sessionmaker(bind=engine))


# 定义 SQLAlchemy 模型
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String)
    email = Column(String)


# 将模型类转换为可序列化的字典类型
make_class_dictable(User)

# 查询数据库获取模型对象
user = session.query(User).first()

# 将模型对象转换为字典
user_dict = user.to_dict()
print(user_dict)

4、Flask-SQLAlchemy

快速开始

flask 中配置 Flask-SQLAlchemy 扩展

示例代码:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

# create the extension
db = SQLAlchemy()

# create the app
app = Flask(__name__)

# configure the SQLite database, relative to the app instance folder
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"

# initialize the app with the extension
db.init_app(app)

上面代码中 db 对象 ( 就是 SQLAlchemy 实例 ) 

  • 允许您访问类 db.Model 定义的所有 models,
  • db.session 可以执行查询的

2 种创建方式

# 导入扩展包flask_sqlalchemy
from flask_sqlalchemy import SQLAlchemy

# 方法 1
# 直接实例化sqlalchemy对象,传⼊app
db = SQLAlchemy(app)

# 方法 2
# 通过⼿动调⽤初始话app的函数
db = SQLAlchemy()
db.init_app(app)

在单独运行调试时,对数据库操作需要在Flask的应用上下文中进行

with app.app_context():
    User.query.all()
  • SQLALCHEMY_DATABASE_URI: 数据库的连接信息

Postgres: postgresql://user:password@localhost/mydatabase
MySQL: mysql://user:password@localhost/mydatabase
Oracle: oracle://user:password@127.0.0.1:1521/sidname
SQLite: sqlite:absolute/path/to/foo.db

  • SQLALCHEMY_TRACK_MODIFICATIONS : 动态追踪修改,可以设置为True或False,⼀般情况下设置False
  • SQLALCHEMY_ECHO :显示生成的SQL语句,可用于调试

在 Flask 应用中引入相关模块和配置数据库连接。

配置参数放在Flask的应用配置 app.config 中

示例 1:

from flask import Flask
app = Flask(__name__)

# 定义配置对象
class Config(object):
    SQLALCHEMY_DATABASE_URI = 'mysql://root:mysql@127.0.0.1:3306/db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_ECHO = True

app.config.from_object(Config)

示例 2:

from flask import Flask
app = Flask(__name__)

# 配置数据库的连接信息
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/db'
# 关闭动态追踪修改的警告信息
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 展示sql语句
app.config['SQLALCHEMY_ECHO'] = True

配置参数说明

名字 备注
SQLALCHEMY_DATABASE_URI 连接数据库URI
SQLALCHEMY_BINDS 一个映射 binds 到连接 URI 的字典。更多 binds 的信息见用 Binds 操作多个数据库
SQLALCHEMY_ECHO 如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,对调试有用
SQLALCHEMY_RECORD_QUERIES 可以用于显式地禁用或启用查询记录。查询记录 在调试或测试模式自动启用。更多信息见get_debug_queries()
SQLALCHEMY_NATIVE_UNICODE 可以用于显式禁用原生 unicode 支持。当使用 不合适的指定无编码的数据库默认值时,这对于 一些数据库适配器是必须的(比如 Ubuntu 上 某些版本的 PostgreSQL )
SQLALCHEMY_POOL_SIZE 数据库连接池的大小。默认是引擎默认值(通常 是 5 )
SQLALCHEMY_POOL_TIMEOUT 设定连接池的连接超时时间。默认是 10
SQLALCHEMY_POOL_RECYCLE 多少秒后自动回收连接。这对 MySQL 是必要的, 它默认移除闲置多于 8 小时的连接。注意如果 使用了 MySQL , Flask-SQLALchemy 自动设定 这个值为 2 小时

ORM 模型类参数说明

字段类型

类型名 python中类型 说明
Integer int 普通整数,一般是32位
SmallInteger int 取值范围小的整数,一般是16位
BigInteger int或long 不限制精度的整数
Float float 浮点数
Numeric decimal.Decimal 普通整数,一般是32位
String str 变长字符串
Text str 变长字符串,对较长或不限长度的字符串做了优化
Unicode unicode 变长Unicode字符串
UnicodeText unicode 变长Unicode字符串,对较长或不限长度的字符串做了优化
Boolean bool 布尔值
Date datetime.date 时间
Time datetime.datetime 日期和时间
LargeBinary str 二进制文件

列选项

选项名 说明
primary_key 如果为True,代表表的主键
unique 如果为True,代表这列不允许出现重复的值
index 如果为True,为这列创建索引,提高查询效率
nullable 如果为True,允许有空值,如果为False,不允许有空值
default 为这列定义默认值

关系选项

选项名 说明
backref 在关系的另一模型中添加反向引用
primaryjoin 明确指定两个模型之间使用的联结条件
uselist 如果为False,不使用列表,而使用标量值
order_by 指定关系中记录的排序方式
secondary 指定多对多关系中关系表的名字
secondary join 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件

定义 Models

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String, unique=True, nullable=False)
    email = db.Column(db.String)

如果没有设置 __tablename__ 属性,根据默认规则,表名将会是类名小写形式的 "user"

如果想要设置自定义的表名,可以在模型类中显式地定义 __tablename__ 属性

定义、创建 models and tables:https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/models/

创建 表

定义所有模型和表后,调用以创建 数据库中的表架构。这需要应用程序上下文。因为你不是 此时,在请求中,手动创建一个。

所有的 models 和 tables 被定义后,就可以在 "应用上下文(application context)" 中,调用 SQLAlchemy.create_all() 在数据库中创建 table schema。

with app.app_context():
    db.create_all()

如果在其他模块中定义的有 models ,则必须在调用 create_all 之前导入它们,否则 SQLAlchemy 将不知道它们。

如果表已经存在数据库中,则 create_all 不会更新表。如果要更改 model 的 列,则可以使用迁移库( 如带有 Flask-Alembic 或 Flask-Migrate 的 Alembic)来生成更新数据库架构的迁移。

查询数据

在 Flask 视图函数中,或者 CLI 命令中,可以使用 db.session 执行 查询、修改 模型 数据。

SQLAlchemy 自动为每个模型定义一个 __init__ 方法,该方法分配任何相应数据库列和其他属性的关键字参数。

  • db.session.add(obj)  添加一个对象到session, 也可以用来插入一个对象,修改一个对象的属性,更新对象
  • db.session.delete(obj)  删除一个对象。执行 "添加、修改、删除"后,必须执行 db.session.commit() 才能生效
  • db.session.execute(db.select(...))  查询数据,通过 Result.scalars() 获取查询的结果列表。Result.scalar() 获取单条数据

示例:

@app.route("/users")
def user_list():
    users = db.session.execute(db.select(User).order_by(User.username)).scalars()
    return render_template("user/list.html", users=users)

@app.route("/users/create", methods=["GET", "POST"])
def user_create():
    if request.method == "POST":
        user = User(
            username=request.form["username"],
            email=request.form["email"],
        )
        db.session.add(user)
        db.session.commit()
        return redirect(url_for("user_detail", id=user.id))

    return render_template("user/create.html")

@app.route("/user/<int:id>")
def user_detail(id):
    user = db.get_or_404(User, id)
    return render_template("user/detail.html", user=user)

@app.route("/user/<int:id>/delete", methods=["GET", "POST"])
def user_delete(id):
    user = db.get_or_404(User, id)

    if request.method == "POST":
        db.session.delete(user)
        db.session.commit()
        return redirect(url_for("user_list"))

    return render_template("user/delete.html", user=user)

旧的查询方法:Model.query
新的查询方法:db.session.execute(db.select(...))

查询:https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/queries/

SQLAlchemy 通常使用方式

完整使用示例

https://juejin.cn/post/7239296984985288765

定义ORM模型类

创建 User 类继承自 db.Model类,同时定义id、name、mobile、gender、....等属性,对应数据库中表user的列。

class User(db.Model):
    __tablename__ = 'user'

    class GENDER:
        MALE = 0
        FEMALE = 1

    id = db.Column('user_id', db.Integer, primary_key=True, doc='用户ID')
    mobile = db.Column(db.String, doc='手机号')
    password = db.Column(db.String, doc='密码')
    name = db.Column('user_name', db.String, doc='昵称')
    gender = db.Column(db.Integer, default=GENDER.FEMALE, doc='性别')
    birthday = db.Column(db.Date, doc='生日')
    is_delete = db.Column(db.Boolean, default=False, doc='是否删除')
    # 当模型类字段与表字段不一致,可在Column函数第一个参数指定
    time = db.Column('create_time', db.DateTime, default=datetime.now, doc='创建时间')
    update_time = db.Column('update_time', db.DateTime, default=datetime.now, onupdate=datetime.now, doc='更新时间')

    # primaryjoin定义连接条件 : param1:另外一方类名 param2: 具体连接条件
    follows = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')

创建 Car类 继承自 db.Model类

class Car(db.Model):
    __tablename__ = 'car'

    class TYPE:
        SUV = 0
        SEDAN = 1
        PICKUP = 2

    id = db.Column('car_id', db.Integer, primary_key=True, doc='主键ID')
    user_id = db.Column(db.Integer, doc='用户ID')
    type = db.Column(db.Integer, doc='类型')
    name = db.Column(db.String, doc='名称')
    price = db.Column(db.Numeric, default=0.00, doc='价格')

常用参数说明:

db.Model:所有模型类都应该继承自 db.Model。

__tablename__:指定模型类对应的数据库表名。如果不指定,则默认为类名的小写形式。

db.Column:用来定义模型类中的各个字段,需要指定字段类型。

primary_key=True:用来指定主键字段。

default:用来指定字段的默认值。

unique=True:用来指定字段的唯一性约束。

index=True:用来指定字段是否需要创建索引。

db.ForeignKey():用来定义外键关系。需要传入对应的表格的主键作为参数

db.relationship():用来定义模型之间的关系。第一个参数需要传入要关联的模型类名,第二个参数可以通过 backref 来指定反向引用

lazy:用来指定关系的加载方式,有两种常见的方式:
     lazy=True:表示使用惰性加载,即在首次访问相关属性时才会加载数据。
     lazy=False:表示立即加载,即在查询时同时加载相关数据。

创建数据库表

可以手动创建数据库表,也可以通过迁移的方式,创建数据库表。

CREATE TABLE `user` (
  `user_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `mobile` char(11) NOT NULL COMMENT '手机号',
  `password` varchar(93) NULL COMMENT '密码',
  `user_name` varchar(32) NULL COMMENT '昵称',
  `gender` tinyint(1) NOT NULL DEFAULT '0' COMMENT '性别',
    `birthday` date NULL COMMENT '生日',    
    `is_delete` tinyint(1) NOT NULL DEFAULT '1' COMMENT '是否删除',
    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
                                
  PRIMARY KEY (`user_id`),
  UNIQUE KEY `mobile` (`mobile`),
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户信息表';

CREATE TABLE `car` (
  `car_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用户ID',
  `type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '类型',
    `name` varchar(20) NOT NULL COMMENT '名称',
    `price` decimal(10,2)  DEFAULT '0.00' COMMENT '价格',
  PRIMARY KEY (`car_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='车辆表';

完整代码

from datetime import datetime

from flask import Flask
# 导入扩展包flask_sqlalchemy
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
# 配置数据库的连接信息
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:123456@localhost/demo'
# 关闭动态追踪修改的警告信息
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 展示sql语句
app.config['SQLALCHEMY_ECHO'] = True

# 实例化sqlalchemy对象,并且和程序实例关联
db = SQLAlchemy(app)


class User(db.Model):
    __tablename__ = 'user'

    class GENDER:
        MALE = 0
        FEMALE = 1

    id = db.Column('user_id', db.Integer, primary_key=True, doc='用户ID')
    mobile = db.Column(db.String, doc='手机号')
    password = db.Column(db.String, doc='密码')
    name = db.Column('user_name', db.String, doc='昵称')
    gender = db.Column(db.Integer, default=GENDER.FEMALE, doc='性别')
    birthday = db.Column(db.Date, doc='生日')
    is_delete = db.Column(db.Boolean, default=False, doc='是否删除')
    # 当模型类字段与表字段不一致,可在Column函数第一个参数指定
    time = db.Column('create_time', db.DateTime, default=datetime.now, doc='创建时间')
    update_time = db.Column('update_time', db.DateTime, default=datetime.now, onupdate=datetime.now, doc='更新时间')

    # primaryjoin定义连接条件 : param1:另外一方类名 param2: 具体连接条件
    follows = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')


class Car(db.Model):
    __tablename__ = 'car'

    class TYPE:
        SUV = 0
        SEDAN = 1
        PICKUP = 2

    id = db.Column('car_id', db.Integer, primary_key=True, doc='主键ID')
    user_id = db.Column(db.Integer, doc='用户ID')
    type = db.Column(db.Integer, doc='类型')
    name = db.Column(db.String, doc='名称')
    price = db.Column(db.Numeric, default=0.00, doc='价格')


if __name__ == '__main__':
    app.run()

简单的 CRUD 操作

# 插入一条记录
user = User(name='张三', mobile='12345678910')
db.session.add(user)
db.session.commit()

# 查询记录
users = User.query.all()
print(users)

# 更新记录
user = User.query.filter_by(name='张三').first()
user.mobile  = '12345678910'
db.session.commit()

# 删除记录
user = User.query.filter_by(name='张三').first()
db.session.delete(user)
db.session.commit()

数据库迁移

首先在MySQL中创建数据库,接着定义模型类,通过迁移的方式,创建数据库表。

实现数据库迁移,需要用到扩展包:

flask-script:提供程序运行、迁移的脚本命令
flask-migrate:提供数据库迁移的功能

创建启动文件manage.py实现数据库迁移

app = Flask(__name__)
 
# 从flask_script中导入脚本管理器
from flask_script import Manager
# 从flask_migrate导入迁移工具、迁移命令
from flask_migrate import Migrate, MigrateCommand

# 实例化脚本管理器对象
manager = Manager(app)

# 创建SQLAlchemy对象
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

# 让迁移工具和程序实例app、sqlalchemy实例关联
Migrate(app, db)

# 添加迁移命令
manager.add_command('db', MigrateCommand)

if __name__ == '__main__':
    manager.run()

在终端中通过命令执行迁移

初始化迁移仓库:python manage.py db init
生成迁移脚本文件:
        python manage.py db migrate 
        python manage.py db migrate -m init_tables
执行迁移脚本文件:python manage.py db upgrade

SQLAlchemy 的 CRUD 操作

通过模型类的数据库会话对象db.session进行ORM类的CRUD操作,其封装了对数据库的基本操作,如:提交数据、回滚、添加、删除等

增、删、改

增加

@app.route('/add')
def add():
    # 添加数据
    user = User(mobile='12345678910', name='flask')
    # 把创建的模型类对象添加给数据库会话对象
    db.session.add(user)
    # 提交数据到数据库中
    db.session.commit()
    # 添加多个数据
    user1 = User(mobile='12345678911', name='flask1')
    user2 = User(mobile='12345678912', name='flask2')
    db.session.add_all([user1, user2])
    # 提交数据到数据库中
    db.session.commit()
    return 'add ok'


if __name__ == '__main__':
    app.run()

修改。更新和删除都必须要commit提交数据

user = User.query.get(1)
user.name = 'flask'
db.session.add(user) 
db.session.commit()

User.query.filter_by(id=1).update({'name':'flask'})
  db.session.commit()

删除

user = User.query.order_by(User.id.desc()).first()
db.session.delete(user)
db.session.commit()

User.query.filter(User.mobile='12345678910').delete()
db.session.commit()

查询 

all():查询所有,返回列表
        select * from 表名;
        User.query.all()

first():查询第一个,返回对象
        select * from 表名 limit 1;
        User.query.first()

get():根据主键ID获取对象,若主键不存在返回None
        select * from 表名 where id=1;
        User.query.get(1)

另一种查询方式
        db.session.query(User).all()
        db.session.query(User).first()
        db.session.query(User).get(1)

filter_by 过滤查询
        条件可以为空,默认查询所有,参数为模型类的字段名。
        只能使用赋值运算符,必须使用查询执行器;
        select mobile from 表名 where mobile='12345678910'
        User.query.filter_by(mobile='12345678910').all()
        User.query.filter_by(mobile='12345678910').first()
        # 查询条件是and关系
        User.query.filter_by(mobile='12345678910', id=1).first()  

filter:过虑查询。
        条件可以为空,默认查询所有,参数为模型类名加上字段名,
        可以使用丰富的运算符,保修使用查询执行器;
        User.query.filter(User.mobile=='12345678910').first()

# 查询所有字段
        user = User.query.filter_by(id=1).first()  

# 查询指定字段
        from sqlalchemy.orm import load_only
        User.query.options(load_only(User.name, User.mobile)).filter_by(id=1).first() 

复合 查询

多条件复合查询:手机号以123开始,按用户id倒序排序,起始位置2开始,返回3条符合的数据
User.query.filter(User.name.startswith('123')).order_by(User.id.desc()).offset(2).limit(3).all()

query = User.query.filter(User.name.startswith('123'))
query = query.order_by(User.id.desc())
query = query.offset(2).limit(3)
ret = query.all()

优化 查询

ORM 默认是全表扫描,使用load_only函数可以指定字段

聚合 查询

查询所有用户的拥有的SUV类型的车辆数
from sqlalchemy import func

db.session.query(Car.user_id, func.count(Car.name)).filter(Car.relation == Car.TYPE.SUV).group_by(Car.user_id).all()

关联 查询 ( 使用 ForeignKey )

# 一方
class User(db.Model):
    # relationship:指定关联对象Car,表示一个用户可以拥有多辆车
    cars = db.relationship('Car')

# 多方
class Car(db.Model):
    # ForeignKey: 指定car属于那个用户
    user_id = db.Column(db.Integer, db.ForeignKey('user.user_id'), doc='用户ID')
    
    # 在flask-sqlalchemy中返回模型类对象的数据
    def __repr__(self):
        car = {
            'car_id': self.id,
            'name': self.name,
            'type': self.type,
            'price': self.price,
        }
        return str(car)

@app.route('/test')
def test():
    # select *  from user where user_id=1
    user = User.query.get(1)
    print(user)
    # select * from car where user_id=1
    print(user.cars)
    for car in user.cars:
        print(car.name)
    return 'ok'

uselist:返回数据是否已列表形式返回。

  • Talse:user.cars得到的是一个对象,
  • 否则是一个InstrumentedList类型,需要遍历

class User(db.Model):
    cars = db.relationship('Car', uselist=False)

@app.route('/test')
def test():
    user = User.query.get(1)
    print(user)
    print(user.cars)
    print(user.cars.name)
    return 'ok'

backref: 反向引用

在查询时使用反向引用来获取关联对象的属性值

class User(db.Model):
    cars = db.relationship('Car', uselist=False, backref='myuser')


@app.route('/test')
def test():
    car = Car.query.get(1)
    print(car.myuser)
    return 'ok'

使用 primaryjoin

# 一方
class User(db.Model):
    cars = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')

# 多方
class Car(db.Model):
    id = db.Column('car_id', db.Integer, primary_key=True, doc='主键ID')
    user_id = db.Column(db.Integer, doc='用户ID')

@app.route('/test')
def test():
    user = User.query.get(1)
    print(user)
    print(user.cars)
    for car in user.cars:
        print(car.name)
    return 'ok'

指定字段关联查询

class User(db.Model):
    cars = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')

from sqlalchemy.orm import contains_eager,load_only
 
@app.route('/test')
def test():
    # 使用了 join() 和 contains_eager() 方法来实现关联查询

    # User.query.join(User.cars):在查询 User 表时,关联查询 Cars 表
    # options:为查询添加选项
    # load_only:指定只加载部分字段,以提高查询效率
    # contains_eager:加载 User 表与 Cars 表的关联数据。Cars是User模型中定义的cars属性,它是一个 relationship 属性,表示一个用户可以拥有多个车辆
    # oad_only(Car.name):指定只加载 Cars 表中的 user_id 字段,而不加载其他字段
    # filter:对查询结果进行过滤
    # all:执行查询,并返回查询结果
    # sql:  SELECT car.car_id AS car_car_id, car.name AS car_name, user.user_id AS user_user_id, user.user_name AS user_user_name FROM user INNER JOIN car ON user.user_id = car.user_id WHERE user.user_name = %s
    all = User.query.join(User.cars).options(load_only(User.name),contains_eager(User.cars).load_only(Car.name)).filter(User.name == 'flask').all()
    print(all)
    for item in all:
        print(item)
    return 'ok'

事务

flask-sqlalchemy 中自带事务支持,默认开启事务

可以手动触发回滚:db.session.rollback()

@app.route('/test')
def test():
    try:
        User.query.filter_by(id=1).update({'name': 'rollback'})
        1/0
        db.session.commit()
    except:
        db.session.rollback()
    return 'ok'

"逻辑、比较" 运算符

逻辑运算符:"与或非" 都需要导入才能使用。多条件默认是and关系,非就是不等于
比较运算符:>、<、>=、<=、!=、==

逻辑或
from sqlalchemy import or_
User.query.filter(or_(User.mobile=='12345678910', User.name.endswith('sk'))).all()

逻辑与
from sqlalchemy import and_
User.query.filter(and_(User.name != '12345678910', User.mobile.startswith('sk'))).all()

逻辑非
from sqlalchemy import not_
User.query.filter(not_(User.mobile == '12345678910')).all()

偏移与限制

        offset:偏移,表示起始位置
        User.query.offset(2).all()

        limit:限制返回条数
        User.query.limit(2).all()

排序

order_by:asc表示升序,desc表示降序

# 正序
User.query.order_by(User.id).all()  

# 倒序
User.query.order_by(User.id.desc()).all() 

数据库进阶实践 --- 事件监听

事件监听:https://docs.sqlalchemy.org/en/20/core/event.html

在 flask 中可以使用 Flask 提供的装饰器注册请求回调函数,他们会在 "特定的请求(事件)" 处理环节被执行。类似的,SQLAlchemy 也提供了一个 listen_for() 装饰器,他可以用来注册 事件 回调函数。

listen_for()装饰器主要接收两个参数,target表示监听的对象,这个对象可以是模型类、类实例或类属性等。identifier参数表示被监听事件的标识符,比如,用于监听属性的事件标识符有set、append、remove、init_scalar、init_collection等。

创建一个Draft模型类表示草稿,其中包含body字段和edit_tine字段,分别存储草稿正文和被修改的次数,其中edit_time字段的默认值为0,如下所示:

class Draft(db.Model):
    id = db.Column(db.Integer, primary_key = True)
    body = db.Column(db.Text)
    edit_time = db.Column(db.Integer, default = 0)

通过注册事件监听,我们可以实现在 body 列修改时,自动叠加表示被修改次数的edit_time字段。在SQLAlchemy中,每个事件都会有一个对应的事件方法,不同的事件方法支持不同的参数。被注册的监听函数需要接收对应事件方法的所有参数,所以具体的监听函数用法因使用的事件而异。设置某个字段值将处罚set事件。

app.py:   set 事件监听函数

@db.event.listens_for(Draft.body, 'set')
def increment_edit_time(target, value, oldvalue, initiator):
    if target.edit_time is not None:
        target.edit_time += 1

我们在listen_for()装饰器中分别传入Draft.body和set作为target和identifier参数的值,监听函数接收所有set()事件方法接收的参数,其中的target参数表示触发时间的模型类实例,使用target.edit_time即可获取我们需要叠加的字段。其他的参数也需要照常写出,虽然这里没有用到。value表示被设置的值,oldvalue表示被取代的旧值。

当set事件发生在目标对象Draft.body上时,这个监听函数就会被执行,从而自动叠加Draft.edit_time列的值,如下所示:

>>> draft = Draft(body = 'init')
>>> db.session.add(draft)
>>> db.session.commit()
>>> draft.edit_time
0
>>> draft.body
u'init'
>>> draft.body = 'edited'
>>> draft.edit_time
1
>>> draft.body = 'edited again'
>>> draft.edit_time
2
>>> draft.body = 'edited again again'
>>> draft.edit_time
3
>>> db.session.commit()

除了这种传统的参数接收方式,即接收所有事件方法接收的参数,还有一种更简单的方法。通过在listen_for()装饰器中将关键字参数name设为True,可以在监听函数中接收**kwargs作为参数(可变长关键字参数), 即“named argument”(命名参数)。然后在函数中可以使用参数名作为键来从**kwargs字典获取对应的参数值:

@db.event.listens_for(Draft.body, 'set', named = True)
def increment_edit_time(**kwargs):
    if kwargs['target'].edit_time is not None:
        kwargs['target'].edit_time += 1

>>> draft = Draft.query.first()
>>> draft
<Draft 1>
>>> draft.body
u'edited again again'
>>> draft.edit_time
3
>>> draft.body = 'edited 3 times'
>>> draft.edit_time
4

SQLAlchemy 作为SQL工具基本身包含两大主要组件:SQLAlchemy ORM 和 SQLAlchemy Core。前者实现了ORM功能,后者实现了数据库接口等核心功能,这两类组件都提供了大量的监听事件,几乎覆盖了SQLAlchemy使用的声明周期。

除了使用listen_for装饰器,我们还可以直接使用它内部调用的 listen() 函数注册事件监听函数,第三个参数传入被注册的函数对象,比如 db.event.listen(SomeClass, ‘load’, my_load_listener)。

数据库进阶实践 --- 级联操作

Cascade意为 "级联操作",就是在操作一个对象的同时,对相关的对象也执行某些操作。我们通过一个Post模型和Comment模型来演示级联操作,分别表示文章(帖子)和评论,两者是一对多关系:

class Post(db.Model):
    id = db.Column(db.Integer, primary_key = True)
    title = db.Column(db.String(50), unique = True)
    body = db.Column(db.Text)
    comments = db.relationship('Comment', back_populates = 'post')
    
    

class Comment(db.Model):
    id = db.Column(db.Integer, primary_key = True)
    body = db.Column(db.Text)
    post_id = db.Column(db.Integer, db.ForeignKey('post.id'))
    post = db.relationship('Post', back_populates = 'comments')

级联行为通过关系函数relationship()的cascade参数设置。我们希望在操作Post对象时,处于附属地位的Comment对象也被相应执行某些操作,这时应该在Post类的关系函数中定义级联参数。设置了cascade参数的一侧将被视为父对象,相关的对象则被视为子对象。

cascade通常使用多个组合值,级联值之间使用逗号分隔,比如:

class Post(db.Model):
    …
    comments = db.relationship('Comment', cascade = 'save-update, merge,delete',back_populates = 'post')

常用的配置组合如下所示:

  • 1)      save-update、merge(默认值)
  • 2)      save-update、merge、delete
  • 3)      all
  • 4)      all、delete-orphan

当没有设置cascade参数时,会使用默认值save-upgrate、merge。上面的all等同于除了delete-orphan以外所有可用值的组合,即save-update、merge、refresh-expire、expunge、delete。

下面介绍常用的几个级联值:

1、save-update

save-update是默认的级联行为,当cascade参数设为save-update时,如果使用db.session.add()方法将Post对象添加到数据库会话时,那么与Post相关联的Comment对象也将被添加到数据库会话。我们首先创建一个Post对象和两个Comment对象:

>>> post = Post()
>>> comment1 = Comment()
>>> comment2 = Comment()
将post1添加到数据库会话后,只有post1在数据库会话中:
>>> db.session.add(post)
>>> post in db.session
True
>>> comment1 in db.session
False
>>> comment2 in db.session
False
如果我们让post1与这两个Comment对象建立关系,那么这两个Comment对象也会自动被添加到数据库会话中:
>>> post.comments.append(comment1)
>>> post.comments.append(comment2)
>>> comment1 in db.session
True
>>> comment2 in db.session
True

当调用db.session.commit()提交数据库会话时,这三个对象都会被提交到数据库中。

2、delete

如果某个Post对象被删除,那么按照默认的行为,该Post对象相关联的所有Comment对象都将与这个Post对象取消关联,外键字段的值会被清空。如果Post类的关系函数中cascade参数设为delete时,这些相关的Comment会在关联的Post对象删除时一并删除,当需要设置delete级联时,我们会将级联值设为all或save-update、merge、delete,比如:

class Post(db.Model):
    id = db.Column(db.Integer, primary_key = True)
    title = db.Column(db.String(50), unique = True)
    body = db.Column(db.Text)
    comments = db.relationship('Comment',cascade = 'all', back_populates = 'post')

我们先创建一个文章对象post2和两个评论对象comment3和comment4,并将这两个评论对象与文章对象建立关系,将它们添加到数据库会话并提交:

>>> comment3 = Comment(body = 'very good')
>>> comment4 = Comment(body = 'excellent')
>>> post2 = Post(title = 'i have a good plan', body = 'tomorrow i will go to climbing')
>>> post2.comments.append(comment3)
>>> post2.comments.append(comment4)
>>> db.session.add(post2)
>>> db.session.commit()

现在共有两条Post记录和四条Comment记录:

>>> Post.query.all()
[<Post 1>, <Post 2>]
>>> Commment.query.all()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'Commment' is not defined
>>> Comment.query.all()
[<Comment 1>, <Comment 2>, <Comment 3>, <Comment 4>]

如果删除文对象Post2,那么对应的两个评论对象也会一并被删除:

>>> post = Post.query.get(2)
>>> post
<Post 2>
>>> db.session.delete(post)
>>> db.session.commit()
>>> Post.query.all()
[<Post 1>]
>>> Comment.query.all()
[<Comment 1>, <Comment 2>]

3、delete-orphan

这个模式是基于delete级联的,必须和delete级联一起使用,通常会设为all、delete-orphan,因为all包含delete。因此当cascade参数设为delete-orphan时,它首先包含delete级联的行为:当某个Post对象被删除时,所有相关的Comment对象都将被删除(delete级联)。除此之外,当某个Post对象(父对象)与某个Comment对象(子对象)解除关系时,也会删除该Comment对象,这个解除关系的对象被称为鼓励对象(orphan object),现在comments属性中的级联值为all、delete-orphan,如下所示:

class Post(db.Model):
    id = db.Column(db.Integer, primary_key = True)
    title = db.Column(db.String(50), unique = True)
    body = db.Column(db.Text)
    comments = db.relationship('Comment',cascade = 'all, delete-orphan', back_populates = 'post')

我们先创建一个文章对象post3和两个评论对象comment5和comment6,并将这两个评论对象与文章对象建立关系,将他们添加到数据库会话并提交:

>>> post3 = Post()
>>> post3 = Post(title = 'today learn python', body = 'python include class and object')
>>> comment5 = Comment(body = 'i also wanna learn python')
>>> comment6 = Comment(body = 'python is easy to learn, but you have to pay your time every day')
>>> post3.comments.append(comment5)
>>> post3.comments.append(comment6)
>>> db.session.add(post3)
>>> db.session.commit())
  File "<stdin>", line 1
    db.session.commit())
                       ^
SyntaxError: invalid syntax
>>> db.session.commit()

现在数据库中有两条文章记录和四条评论记录:

>>> Post.query.all()
[<Post 1>, <Post 2>]
>>> Comment.query.all()
[<Comment 1>, <Comment 2>, <Comment 3>, <Comment 4>]
下面我们将comment5和comment6与post3解除关系并提交数据库会话:
>>> post3.comments.remove(comment5)
>>> post3.comments.remove(comment6)
>>> db.session.commit()

默认情况下,相关评论对象的外键会被设为空值。因为我们设置了delete-orphan级联,所以现在你会发现解除关系的两条评论记录都被删除了:

>>> Comment.query.all()
[<Comment 1>, <Comment 2>]

delete和delete-orphan通常会在一对多关系模式中,而且“多”这一侧的对象附属于“一”这一侧的对象时使用。尤其是如果“一”这一侧的“父”对象不存在了,那么“多”这一侧的“子”对象不再有意义的情况。比如,文章和评论的关系就是一个典型的示例。当文章被删除了,那么评论也就没必要在留存。在这种情况下,如果不使用级联操作,那么我们就需要手动迭代关系另一侧的所有评论对象,然后一一进行删除操作。

对于这两个级联选项,如果你不会通过列表语义对集合关系属性调用remove()方法等方式来操作关系,那么使用delete级联即可。

虽然级联操作方便,但是容易带来安全隐患,因此要谨慎使用。默认值能够满足大部分情况,所以最好仅在需要的时候才修改它。

在SQLAlchemy中,级联的行为和配置选项等最初衍生自另一个ORM—Hibernate ORM。如果对这部分内容感到困惑,那么引用SQLAlchemy文档中关于Hibernate文档的结论:“The sections we have just covered can be a bit confusing.However, in practice, it all works out nicely. (我们刚刚介绍的这部分内容可能会有一些让人困惑,不过在实际使用中,他们都会工作的很顺利)”

用户手册

https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/

API Reference

5、Flask-migrate

Flask-migrate:https://flask-migrate.readthedocs.io/en/latest/index.html

安装:pip install Flask-Migrate

使用Flask-Migrate迁移数据库

在开发时,以删除表再重建的方式更新数据库简单直接,但明显的缺陷是会丢掉数据库中的所有数据。在生产环境下,没有人想把数据都删除掉,这时需要使用数据库迁移工具来完成这个工作。SQLAlchemy的开发者Michael Bayer写了一个数据库迁移工作—Alembic来帮助我们实现数据库的迁移,数据库迁移工具可以在不破坏数据的情况下更新数据库表的结构。蒸馏器(Alembic)是炼金术士最重要的工具,要学习SQL炼金术(SQLAlchemy),当然要掌握蒸馏器的使用。

Flask-Migrate 插件(扩展 ) 继承了Alembic,是用于处理 SQLAlchemy 数据库迁移的扩展工具。当 Model 出现变更的时候,通过migrate 去管理数据库变更。Flask-Migrate 提供了一些flask命令来简化迁移工作,可以使用它来迁移数据库。

Migrate主要有3个动作,init、migrate 和upgrade。

初始化

示例代码

from flask import Flask, render_template, flash, url_for, redirect
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'

db = SQLAlchemy(app)
migrate = Migrate(app, db)  # 在db对象创建后调用

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128))

命名为 app.py 才可正常执行。

开始迁移数据之前,需要先使用命令初始化创建一个迁移环境,命令:flask db init

迁移环境只需要创建一次。会在你的项目根目录下创建一个migrations文件夹其中包含了自动生成的配置文件和迁移版本文件夹。

Flask-Migrate 提供了一个命令集,使用 db 作为命令集名称,它提供的命令都以 flask db 开头。可以在命令行中输入 flask --help 查看所有可用的命令和说明。

D:\flask\FLASK_PRACTICE\DataBase>flask db --help
Usage: flask db [OPTIONS] COMMAND [ARGS]...

  Perform database migrations.

Options:
  --help  Show this message and exit.

Commands:
  branches   Show current branch points
  current    Display the current revision for each database.
  downgrade  Revert to a previous version
  edit       Edit a revision file
  heads      Show current available heads in the script directory
  history    List changeset scripts in chronological order.
  init       Creates a new migration repository.
  merge      Merge two revisions together, creating a new revision file
  migrate    Autogenerate a new revision file (Alias for 'revision...
  revision   Create a new revision file.
  show       Show the revision denoted by the given symbol.
  stamp      'stamp' the revision table with the given revision; don't run...
  upgrade    Upgrade to a later version

生成最初的迁移

命令:flask db migrate

此命令会在 migrations下生成一个version文件夹,下面包含了对应版本的数据库操作py脚本。

由于 migrate 并不一定全部发现你对 model 的所有改动,因此生成的 p y脚本需要 review,有错的话则需要 edit。

例如目前知道的,表名称表更,列名称变更,或给 constraints 命名等,migreate 都不能发现的。更多限制细节见此:Alembic autogenerate documentation

生成迁移脚本--flask db migrate -m "add note timestamp"

这条命令可以简单理解为在flask里对数据库(db)进行迁移(migrate)。-m选项用来添加迁移备注信息。从上面的输出信息我们可以看到,Alembic检测出了模型变化:表note新加了一个timestamp列,并且相应生成了一个迁移脚本cdd9d12762fc_add_note_timestamp.py:

"""add note timestamp

Revision ID: 7f3dae8cae4d
Revises: 
Create Date: 2019-04-01 21:56:32.469000

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '7f3dae8cae4d'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_table('draft')
    op.drop_table('post')
    op.drop_table('comment')
    op.add_column('note', sa.Column('timeStamp', sa.String(length=70), nullable=True))
    op.create_unique_constraint(None, 'note', ['timeStamp'])
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_constraint(None, 'note', type_='unique')
    op.drop_column('note', 'timeStamp')
    op.create_table('comment',
    sa.Column('id', sa.INTEGER(), nullable=False),
    sa.Column('body', sa.TEXT(), nullable=True),
    sa.Column('post_id', sa.INTEGER(), nullable=True),
    sa.ForeignKeyConstraint(['post_id'], [u'post.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_table('post',
    sa.Column('id', sa.INTEGER(), nullable=False),
    sa.Column('title', sa.VARCHAR(length=50), nullable=True),
    sa.Column('body', sa.TEXT(), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_table('draft',
    sa.Column('id', sa.INTEGER(), nullable=False),
    sa.Column('body', sa.TEXT(), nullable=True),
    sa.Column('edit_time', sa.INTEGER(), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    # ### end Alembic commands ###

从上面的代码可以看出,迁移脚本主要包含了两个函数:upgrate()函数用来将改动应用到数据库,函数中包含了向表中添加timestamp字段的命令,而downgrade()函数用来撤消改动,包含了删除timestamp字段的命令。

就像这两个函数中的注释所说的,迁移命令是有Alembic自动生成的,其中可能包含错误,所以有必要在生成后检查一下。

因为每一次迁移都会生成新的迁移脚本,而且Alemic为每一次迁移都生成了修订版本(revision)ID,所以数据库可以恢复到修改历史中的任一点。正因如此,迁移环境中的文件也要纳入版本控制。

有些复杂的错误无法实现自动迁移,这时可以使用revision命令手动创建迁移脚本。这同样会生成一个迁移脚本,不过脚本中的upgrade()和downgrade()函数都是空的。你需要使用Alembic提供的Operations对象指令在这两个函数中实现具体操作,具体可以访问Alembic官方文档查看。

数据库 升级

生成了迁移脚本后,使用upgrade子命令即可更新数据库。

命令:flask db upgrade

如果还没有创建数据库和表,这个命令会自动创建,如果已经创建,则会在不损坏数据的前提下执行更新。此命令相当于执行了 version 文件夹下的相应 py 版本,对数据库进行变更操作。

此后,对 model 有变更,只要重复 migrate 和 upgrade 操作即可。

查看帮助文档:flask db --help

使用Flask-Script的命令调用,自行参考官方文档:https://flask-migrate.readthedocs.io/en/latest/

如果你想回滚迁移,那么可以使用downgrade命令(降级),它会撤销最后一次迁移在数据库中的改动,这在开发时非常有用。比如,当执行upgrade命令后发现某些地方出错了,这时就可以执行flask db downgrade命令进行回滚,删除对应的迁移脚本,重新生成迁移脚本后再进行更新(upgrade)。

D:\flask\FLASK_PRACTICE\DataBase>flask db downgrade 5e87b4da6187
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
ERROR [root] Error: Destination 5e87b4da6187 is not a valid downgrade target from current head(s)

6、flask-script

Flask干货:访问数据库:Flask-Script工具的使用:https://zhuanlan.zhihu.com/p/269820011

什么是 flask_script ?

通过命令行的方式来操作flask, 一般用来启动定时任务,数据库的迁移与更新等。

安装 flask_script :pip install flask_script

使用 flask_script

实现一个最小应用

  # app.py
  from flask import Flask
  from flask_script import Manager
  app = Flask(__name__)
  manager = Manager(app=app)
  
  @manager.command
  def init():
  	print('初始化数据库')
  @manager.command
  def migrate():
  	print('数据迁移')
  @manager.command
  def upgrade():
  	print('数据更新')
  	
  if __name__ == '__main__':
  	manager.run()
  • 怎么去使用 flask_script
  # 命令行中启动
  > python manager.py init
  > python manager.py migrate
  > python manager.py upgrade
  • 结构升级进行功能拆分。

ext.py

  # ext.py
  from flask_script import Manager
  DBMANAGER = Manager()
  @DBMANAGER.command
  def init():
  	print('数据库初始化')
  @DBMANAGER.command
  def migrate():
  	print('数据迁移')
  @DBMANAGER.command
  def upgrade():
  	print('数据更新')

app.py

  # app.py
  from flask import Flask
  from flask_script import Manager
  from ext import DBMANAGER
  app = Flask(__name__)
  manager = Manager(app=app)
  manager.add_command('db',DBMANAGER)
  if __name__ == '__main__':
  	manager.run()
  # 命令行中启动
  > python manager.py init
  > python manager.py migrate
  > python manager.py upgrad 

7、flask-upload

pypi 搜索:https://pypi.org/search/?q=flask-upload

flask-upload:https://pythonhosted.org/Flask-Uploads/

flask-upload 插件,使 flask 能够灵活高效地处理文件上传。 可以创建不同的上传集。例如:用于图片的上传集合,用户音频的上传集合,用于视频的上传集合等

也可以直接使用 flask 处理文件上传:https://www.w3cschool.cn/flask/flask_file_uploading.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python SQLAlchemy ( ORM )、dictalchemy、Flask-SQLAlchemy、Flask-migrate、flask-script、flask-upload 的相关文章

  • Java8无符号算术

    据广泛报道 Java 8 具有对无符号整数的库支持 然而 似乎没有文章解释如何使用它以及有多少可能 有些函数 例如 Integer CompareUnsigned 很容易找到 并且似乎可以实现人们所期望的功能 但是 我什至无法编写一个简单的
  • 在浏览器中点击应用程序时播放框架挂起

    我正在 Play 中运行一个应用程序activator run 也许 5 次中有 3 次 它会挂起 当我去http localhost 9000 它就永远坐在那里旋转 我看到很多promise timed out错误也 我应该去哪里寻找这个
  • 如何在 Java 中禁用 System.out 以提高速度

    我正在用 Java 编写一个模拟重力的程序 其中有一堆日志语句 到 System out 我的程序运行速度非常慢 我认为日志记录可能是部分原因 有什么方法可以禁用 System out 以便我的程序在打印时不会变慢 或者我是否必须手动检查并
  • OnClick 事件中的 finish() 如何工作?

    我有一个Activity一键退出Activity 通过layout xml我必须设置OnClick事件至cmd exit调用 this finish 效果很好 public void cmd exit View editLayout thi
  • 请求位置更新参数

    这就是 requestLocationUpdates 的样子 我使用它的方式 requestLocationUpdates String provider long minTime float minDistance LocationLis
  • MySQL“列计数与第 1 行的值计数不匹配”是什么意思

    这是我收到的消息 ER WRONG VALUE COUNT ON ROW 列计数与第 1 行的值计数不匹配 这是我的全部代码 我的错误在哪里 DROP TABLE student CREATE TABLE employee emp id I
  • 是否可以从子查询中获取多个值?

    有没有办法让子查询在oracle db中返回多列 我知道这个特定的sql会导致错误 但它很好地总结了我想要的 select a x select b y b z from b where b v a v from a 我想要这样的结果 a
  • 归并排序中的递归:两次递归调用

    private void mergesort int low int high line 1 if low lt high line 2 int middle low high 2 line 3 mergesort low middle l
  • 将 Long 转换为 DateTime 从 C# 日期到 Java 日期

    我一直尝试用Java读取二进制文件 而二进制文件是用C 编写的 其中一些数据包含日期时间数据 当 DateTime 数据写入文件 以二进制形式 时 它使用DateTime ToBinary on C 为了读取 DateTime 数据 它将首
  • Java直接内存:在自定义类中使用sun.misc.Cleaner

    在 Java 中 NIO 直接缓冲区分配的内存通过以下方式释放 sun misc Cleaner实例 一些比对象终结更有效的特殊幻像引用 这种清洁器机制是否仅针对直接缓冲区子类硬编码在 JVM 中 或者是否也可以在自定义组件中使用清洁器 例
  • 使用 SAX 进行 XML 解析 |如何处理特殊字符?

    我们有一个 JAVA 应用程序 可以从 SAP 系统中提取数据 解析数据并呈现给用户 使用 SAP JCo 连接器提取数据 最近我们抛出了一个异常 org xml sax SAXParseException 字符引用 是无效的 XML 字符
  • Android JNI C 简单追加函数

    我想制作一个简单的函数 返回两个字符串的值 基本上 java public native String getAppendedString String name c jstring Java com example hellojni He
  • 具有不同组合的产品和产品包的数据库模型

    您将如何设计数据库来实现此功能 考虑一个场景 我们想要创建一个产品关系 封装 假设我们创建一个产品表 prod id prod name prod fee 1 prepaid A 19 usd 2 prepaid B 29 usd 3 pr
  • 将 JTextArea 内容写入文件

    我在 Java Swing 中有一个 JTextArea 和一个 提交 按钮 需要将textarea的内容写入一个带有换行符的文件中 我得到的输出是这样的 它被写为文件中的一个字符串 try BufferedWriter fileOut n
  • 使用用户定义函数 MySql 时出错

    您好 请帮我解决这个问题 提前致谢 我在数据库中定义了这些函数 CREATE FUNCTION levenshtein s1 VARCHAR 255 s2 VARCHAR 255 RETURNS INT DETERMINISTIC BEGI
  • 休眠以持久保存日期

    有没有办法告诉 Hibernate java util Date 应该持久保存 我需要这个来解决 MySQL 中缺少的毫秒分辨率问题 您能想到这种方法有什么缺点吗 您可以自己创建字段long 或者使用自定义的UserType 实施后User
  • 需要在 SQL Server 中透视字符串值

    我有一个包含值的表 描述为 Occupation String Name String Developer A Developer B Designer X Coder Y Coder Z 我需要数据透视格式的值 Designer Deve
  • 中断连接套接字

    我有一个 GUI 其中包含要连接的服务器列表 如果用户单击服务器 则会连接到该服务器 如果用户单击第二个服务器 它将断开第一个服务器的连接并连接到第二个服务器 每个新连接都在一个新线程中运行 以便程序可以执行其他任务 但是 如果用户在第一个
  • java8 Collectors.toMap() 限制?

    我正在尝试使用java8Collectors toMap on a Stream of ZipEntry 这可能不是最好的想法 因为在处理过程中可能会发生异常 但我想这应该是可能的 我现在收到一个我不明白的编译错误 我猜是类型推理引擎 这是
  • javax.persistence.Table.indexes()[Ljavax/persistence/Index 中的 NoSuchMethodError

    我有一个 Play Framework 应用程序 并且我was使用 Hibernate 4 2 5 Final 通过 Maven 依赖项管理器检索 我决定升级到 Hibernate 4 3 0 Final 成功重新编译我的应用程序并运行它

随机推荐

  • 熄灯问题

    有一个由按钮组成的矩阵 其中每行有6个按钮 共5行 每个按钮的位置上有一盏灯 当按下一个按钮后 该按钮以及周围位置 上 下 左 右 的等都会转变状态 即 如果灯原来是点亮的 就会被熄灭 如果灯原来是熄灭的 则会被点亮 在矩阵角上的按钮改变3
  • 【3D人脸】Open3D学习笔记 一

    最近头疼于点云法向量的计算 实在找不到python的相关资料 想起来Open3D这个专门的工具 一搜还真有 踩了很多坑 记录一下 Open3D官方文档 http www open3d org docs release index html
  • uniapp(二) 之 uniapp 搭建与组件库的引用

    小扩展 rpx responsive pixel 可以根据屏幕宽度自适应 规定屏幕宽度为750rpx 如果iphon6上 屏幕宽度为375px 共有750个像素 则750rpx 375培训 750物理像素 1rpx 0 5px 1物理像素
  • 自定义类型 (结构体)

    文章目录 结构体的声明 1 结构的基础知识 2 结构的声明 3 特殊的声明 4 结构的自引用 5 结构体变量的定义和初始化 6 结构体内存对齐 7 修改默认对齐数 8 结构体传参 结构体的声明 1 结构的基础知识 结构是一些值的集合 这些值
  • Mac VSCode常用快捷键

    cmd option gt cmd option lt 返回上 下一个光标位置 cmd p 打开文件搜索框 control 返回 control shift 前进 补充
  • 工具及方法 - Process Explorer以及类似工具,用来获取系统运行的进程信息

    下载Process explorer Process Explorer Sysinternals Microsoft Learn Process explorer简介 有没有想过哪个程序打开了一个特定的文件或目录 现在你可以找到了 Proc
  • BDA初级分析——可视化图形

    一 时间趋势 时间趋势要如何呈现 Excel函数补充 YEAR 提取日期中的年份 MONTH 提取日期中的月份 DAY 提取日期中的天 HOUR 提取时间中的小时 TEXT 将数值转换为按指定数字格式表示的函数 写法 TEXT value
  • 提交表单--get与post方式

    我们经常在网页上输入信息 然后通过按钮提交 有两种提交方式 get和post get方式效率高但安全性低 post是封装后进行提交安全性高 get方式经常用于搜索 查询 post常用与用户注册登陆等 提交表单标签
  • 【Sibelius】制谱软件 (西贝柳斯)入门笔记

    首先声明 本人非音乐专业 连业余爱好者都算不上 因为小女是音乐生 为了填补代沟 所以就整点音乐相关的软件玩玩 又怕自己忘了 做个笔记 笔记主要是从B站学习的 链接 https www bilibili com video BV1Kb4114
  • 成功解决VS编译时提示“已经在 LIBCMT.lib(xxx) 中定义“

    报错信息 解决方法 在项目右击 gt 属性 gt 连接器 gt 命令行 gt 附加选项中 添加 force
  • 【小程序】使用wxParse解析html

    小程序在开发时 读取到服务器的内容是html格式的 因小程序不支持html格式的内容显示的 因此要对html格式的内容进行编译 可以通过wxParse来实现 wxParse下载地址 实现方法 将下载下来的wxParse文件夹复制到开发项目的
  • Unity(纯C语言单元测试框架!不是那个Unity3d)入门文档

    译者注 译者博客 http blog csdn net lin strong 转载请保留这条 此为Unity手册的翻译 仅供学习交流使用 请勿用于商业用途 翻译的资料是公开的 在docs UnityGettingStartedGuide m
  • 计算员工工资

    请编写一个程序 可以读取一名员工的员工编号 本月工作总时长 小时 以及时薪 并输出他的工资条 工资条中包括员工编号和员工月收入 输入格式 输入包含两个整数和一个浮点数 分别代表员工编号 工作时长以及时薪 每个数占一行 输出格式 输出共两行
  • Unreal Engine4蓝图编程学习(一)

    学习内容主要介绍了蓝图进行对象交互 升级玩家技能 升级AI敌人 跟踪游戏状态完成游戏体验等内容 内容来源于 Unreal Engine4蓝图可视化编程 书籍为2017年 与现在版本有一定区别 一 制作移动标靶 1 1 首先 我们想先创建一个
  • mysql database uri,未设置SQLALCHEMY_DATABASE_URI

    I tried to work with CURD operation using Flask and SQLAlchemy But getting Error while connecting to database Here is th
  • springboot+vue教室图书馆预约管理系统、

    下载地址 https download csdn net download ouyangxiaobai123 22176771 项目介绍 springboot vue教室图书馆预约管理系统 系统说明 聪慧物联网教室预定系统 后台系统 项目简
  • 多维数组变成一维数组

    这个问题来源于一个朋友曾经问过我的问题 当时是一个二维数组变成一维数组 后面我想整理一下 整理一个多维 并且是不定维的数组 一 二维数组变成一维数组 1 遍历数组 将元素一个个放入新数组 结果 如果元素不是数组 将会报错 下面是改良版 这样
  • 信号量和自旋锁

    信号量和自旋锁 为了避免并发 防止竞争 内核提供了一组同步方法来提供对共享数据的保护 我们的重点不是介绍这些方法的详细用法 而是强调为什么使用这些方法和它们之间的差别 Linux 使用的同步机制可以说从2 0到2 6以来不断发展完善 从最初
  • python编程实验,模拟聪明版的尼姆游戏设计原理

    实验原理与内容 本实验完成一个模拟聪明版的尼姆游戏功能 尼姆游戏是个著名的游戏 有很多变种玩法 两个玩家轮流从一堆物品中拿走一部分 在每一步中 玩家可以自由选择拿走多少物品 但是必须至少拿走一个并且最多只能拿走一半物品 然后轮到下一个玩家
  • Python SQLAlchemy ( ORM )、dictalchemy、Flask-SQLAlchemy、Flask-migrate、flask-script、flask-upload

    From Python中强大的通用ORM框架 SQLAlchemy https zhuanlan zhihu com p 444930067 Python ORM之SQLAlchemy全面指南 https zhuanlan zhihu co