SQLAlchemy使用详细功能SqlHelper封装

2023-11-13

环境与版本

python: 3.10

SQLAlchemy: 2.0.9

网上好多sql语句查询相关的代码执行异常,不确定是否与版本有有关

说明

封装了比较实用的 复合唯一索引表的插入或更新,多条件查询,叠加条件查询,以及返回pandas DataFrame。

容易出错的问题

1. 执行sql语句错误提示:sqlalchemy.exc.ObjectNotExecutableError: Not an executable object‘......’

参考

2. pd.read_sql()  pd.read_sql_query() 报错 AttributeError: 'OptionEngine' object has no attribute 'execute'

read_sql() pd.read_sql_query()的con参数,必须是session.bind.connect() 而不是session.bind

数据库初始化以及Model定义

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, Index

# 数据库配置
db_config = {
    'sqlite': {
        'engine': 'sqlite:///mydatabase.db'
    },
    'mysql': {
        'engine': 'mysql+pymysql://username:password@host/database'
    }
}

# 根据配置选择 engine
db_choice = 'sqlite'
if db_choice == 'sqlite':
    engine = create_engine(db_config['sqlite']['engine'])
elif db_choice == 'mysql':
    engine = create_engine(db_config['mysql']['engine'])
else:
    engine = create_engine('sqlite:///mydatabase.db')

Session = sessionmaker(bind=engine)

Base = declarative_base()


class Address(Base):
    __tablename__ = 'address'
    id = Column(Integer, primary_key=True, autoincrement=True)
    address = Column(String)


class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, autoincrement=True)
    addr_id = Column(Integer, ForeignKey('address.id'))
    name = Column(String)
    age = Column(Integer)

    def __repr__(self):
        return f"<User(id='{self.id}', name='{self.name}', age='{self.age}')>"


class Author(Base):
    __tablename__ = 'author'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    books = relationship('Book', back_populates='author')


class Book(Base):
    __tablename__ = 'book'
    id = Column(Integer, primary_key=True)
    title = Column(String)
    author_id = Column(Integer, ForeignKey('author.id'))
    author = relationship('Author', back_populates='books')


class Department(Base):
    __tablename__ = 'department'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    employees = relationship('Employee', back_populates='department')


class Employee(Base):
    __tablename__ = 'employee'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    remark = Column(String)
    score = Column(Integer)
    department_id = Column(Integer, ForeignKey('department.id'))
    department = relationship('Department', back_populates='employees')


class Code(Base):
    __tablename__ = 'code'
    id = Column(Integer, primary_key=True)
    symbol = Column(String)
    exchange = Column(String)
    remark = Column(String)
    # 复合唯一索引
    __table_args__ = (
        Index('ix_vt_symbol', 'symbol', 'exchange', unique=True),
    )


Base.metadata.create_all(engine)

SqlHelper类

import pandas as pd
import sqlalchemy

from sqlalchemy import or_, and_, text
from sqlalchemy.exc import IntegrityError

from sqltool.models import Session, User, Address, Book, Author, Department, Employee, Code


def or_filters(*args):
    return or_(*args)


def and_filters(*args):
    return and_(*args)


class SqlHelper:
    def __init__(self):
        self.session = Session()

    def add(self, obj):
        self.session.add(obj)
        self.session.commit()

    def delete(self, obj):
        self.session.delete(obj)
        self.session.commit()

    def update(self, obj):
        self.session.merge(obj)
        self.session.commit()

    def upsert(self, model):
        self._upsert(model)
        self.session.commit()

    def _upsert(self, model):
        '''
        根据唯一索引查找记录,再决定更新(补全primary_key的自增列)或新增记录
        '''
        unique = self.get_unique_keys(model)
        obj = self.session.query(model.__class__).filter(
            # getattr(model.__class__, primary_key) == getattr(model, primary_key),
            *[(getattr(model.__class__, c) == getattr(model, c)) for c in unique]
        ).first()
        if obj:
            primary_key = self.get_primary_key(model)
            val = getattr(obj, primary_key)
            setattr(model, primary_key, val)
            self.session.merge(model)
        else:
            self.session.add(model)

    def add_all(self, models):
        try:
            self.session.begin()
            self.session.add_all(models)
            self.session.commit()
        except IntegrityError:
            # 如果出现唯一键冲突错误,回滚事务
            self.session.rollback()

    def update_all(self, models):
        try:
            for m in models:
                self.session.merge(m)
            self.session.commit()
        except IntegrityError:
            self.session.rollback()

    def delete_all(self, models):
        try:
            for m in models:
                self.session.delete(m)
            self.session.commit()
        except IntegrityError:
            self.session.rollback()

    def upsert_all(self, models):
        try:
            self.session.begin()
            for m in models:
                self._upsert(m)
            self.session.commit()
        except IntegrityError:
            self.session.rollback()

    def query(self, model):
        return self.session.query(model)

    def update_by(self, model, filters, col_value_map):
        return self.session.query(model).filter(filters).update(col_value_map)

    def query_by(self, model, *args):
        return self.session.query(model).filter(and_(*args))

    def query_or(self, model, *args):
        return self.session.query(model).filter(or_(*args))

    def query_by_func(self, model, func):
        '''
        使用 or_filters 和 and_filters 叠加查询条件
        demo
            and_filter(or_filters(User.age < 30, User.age > 60), User.name == 'lisi_6')
            等同于
            (User.age < 30 or User.age > 60) and User.name == 'lisi_6'
        '''
        return self.session.query(model).filter(func)

    def get_by(self, model, **kwargs):
        return self.session.query(model).filter_by(**kwargs).first()

    def count_by(self, model, **kwargs):
        return self.session.query(model).filter_by(**kwargs).count()

    def exists(self, model, **kwargs):
        return self.session.query(model).filter_by(**kwargs).exists()

    def join(self, model1, model2):
        return self.session.query(model1).join(model2)

    def join_ex(self, model1, model2):
        return self.session.query(model1, model2)

    def first(self, query):
        return query.first()

    def commit(self):
        self.session.commit()

    def close(self):
        self.session.commit()
        self.session.close()

    def get_primary_key(self, model):
        for col in model.__table__.columns:
            if col.primary_key:
                return col.name

    def get_unique_keys(self, model) -> list[str]:
        ''' 获取唯一键列 '''
        # 1.从列中unique属性获取列名
        unique = [c.name for c in model.__table__.columns if c.unique]
        # 2.从__table_args__自定义索引(Index)中获取unique的列名
        idxs = list(model.__table__.indexes)
        for idx in idxs:
            if not idx.unique:
                continue
            for c in idx.columns:
                unique.append(c.name)
        return unique

    def execute_sql(self, sqlstr) -> list[tuple]:
        return self.session.execute(text(sqlstr)).fetchall()

    def read_from(self, sql) -> pd.DataFrame:
        '''
            查询sql语句,必须使用text()
            pandas 需要connection,不是engine
        '''
        return pd.read_sql_query(text(sql), self.session.bind.connect())

    def read_model(self, model) -> pd.DataFrame:
        '''
            查询sql语句,必须使用text()
            pandas 需要connection,不是engine
        '''
        return pd.read_sql(self.session.query(model).statement, self.session.bind.connect())

demo


def demo():
    sql_helper = SqlHelper()
    count = sql_helper.count_by(Address)
    addr0 = Address()
    addr0.address = f"浙江杭州_{count + 1}"
    sql_helper.add(addr0)
    # addr = sql_helper.query(Address).first()
    user = User()
    user.addr_id = addr0.id
    user.age = 33
    user.name = f"lisi_{count + 1}"
    sql_helper.add(user)

    users = sql_helper.query_by(User, User.name == 'Tom').all()
    users = sql_helper.query(User).filter(User.age > 30).all()
    users = sql_helper.query(User).filter(User.name.like('a%')).all()
    tom = sql_helper.get_by(User, name='Tom')
    count = sql_helper.count_by(User)
    exists = sql_helper.exists(User, age=30)
    first_user = sql_helper.first(sql_helper.query(User))
    query_result = sql_helper.join_ex(Address, User).filter(User.addr_id == Address.id).all()
    for address, user in query_result:
        print(user.id, user.name, address.id, address.address)
    # query_result = sql_helper.query(User).join(Address).filter(User.addr_id == Address.id).all()
    query_result = sql_helper.query(User).join(Address).all()
    for result in query_result:
        print(result)

    # 连接
    author1 = Author(name='Jam')
    author2 = Author(name='Sam')
    sql_helper.add(author1)
    sql_helper.add(author2)
    book1 = Book(title="C Language", author_id=author1.id)
    book2 = Book(title="C++ Language", author_id=author1.id)
    book3 = Book(title="Python", author_id=author2.id)
    book4 = Book(title="JavaScript", author_id=author2.id)
    sql_helper.add(book1)
    sql_helper.add(book2)
    sql_helper.add(book3)
    sql_helper.add(book4)

    # n--1 连接
    books = sql_helper.query(Book).join(Author).all()
    for book in books:
        print(f'Book: {book.title} Author:{book.author.name}')

    authors = sql_helper.query(Author).join(Book).all()
    for author in authors:
        for book in author.books:
            print(f'Author: {author.name} Book: {book.title}')

    # 1--n 连接
    department = Department(name='Sales')
    sql_helper.add(department)
    employee1 = Employee(name='John', department=department, remark='test1111')
    employee2 = Employee(name='Jane', department=department, remark='test2222')
    sql_helper.add(employee1)
    sql_helper.add(employee2)
    dept = sql_helper.query(Department).filter_by(name='Sales').first()
    employees = dept.employees
    for employee in employees:
        print(employee.name)

    # 单条件批量更新
    rst = sql_helper.update_by(Employee, Employee.name == 'John', {Employee.remark: employee1.remark})
    sql_helper.commit()
    # 多条件批量更新
    rst = sql_helper.update_by(Employee, or_filters(Employee.name == 'John', Employee.name == 'Jane'),
                               {Employee.remark: employee2.remark})
    sql_helper.commit()
    print('------------------------------------------------------------')
    # 复合唯一索引的存在更新否则插入
    code1 = Code(symbol='SA309', exchange='CZCE', remark='test11111')
    code2 = Code(symbol='SA309', exchange='CZCE', remark='test22222')
    # sql_helper.add(code1)
    sql_helper.upsert(code1)
    sql_helper.upsert(code2)
    print('------------------------------------------------------------')
    # or 查询
    users = sql_helper.query_or(User, User.age > 55, User.age < 30, User.age == 44, User.name == 'lisi_40').all()
    for user in users:
        print(user)
    print('------------------------------------------------------------')
    # and 查询
    users = sql_helper.query_by(User, User.name == 'lisi_6', User.age > 45)
    for user in users:
        print(user)
    print('------------------------------------------------------------')
    # 多条件嵌套查询 or + and
    users = sql_helper.query_by_func(User, and_filters(or_filters(User.age < 30, User.age > 60),
                                                      User.name == 'lisi_6')).all()
    for user in users:
        print(user)
    print('------------------------------------------------------------')
    result = sql_helper.execute_sql('select * from users;')
    for item in result:
        print(item)
    print('------------------------------------------------------------')
    df = sql_helper.read_model(User)
    print(df.head())
    print('------------------------------------------------------------')
    df = sql_helper.read_from('select * from users;')
    print(df.tail())
    print('------------------------------------------------------------')
    result = sql_helper.execute_sql('select * from employee;')
    for item in result:
        print(item)
    print('------------------------------------------------------------')
    sql_helper.close()


if __name__ == '__main__':
    print(sqlalchemy.__version__)
    demo()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SQLAlchemy使用详细功能SqlHelper封装 的相关文章

随机推荐

  • Linux命令_lsof & 网络/文件监控

    官方描述 一个打开的文件可以是一个常规文件 一个目录 一个块特殊文件 一个字符特殊文件 一个执行文本引用 一个库 一个流或一个网络文件 Internet套接字 NFS文件或UNIX域套接字 可以通过路径选择文件系统中的某个文件 也可以选择文
  • Python数据可视化的例子——条形图(bar)

    1 matplotlib模块 应用matplotlib模块绘制条形图 需要调用bar函数 关于该函数的语法和参数含义如下 bar x height width 0 8 bottom None color None edgecolor Non
  • Axure RP 9软件安装步骤

    1 官网下载软件 第一步 点击安装文件 建议安装到D盘 请记住具体安装位置 后续汉化需要用到 本人实际位置 D Program Files x86 Axure Axure RP 9 第二步 激活 打开软件中的激活 第三步 汉化 复制汉化文件
  • discuz数据库密码修改

    在源码config目录下找到这两个文件 然后打开修改密码
  • [开发过程]<软件设计>UML建模初体验

    0 引言 前文提到UML的相关工具 前文链接如下 开发过程 软件设计 关于统一建模语言UML 崭蓝码农的博客 CSDN博客从某一个需求出发 开发中有4个重点问题 1 业务逻辑 2 程序逻辑 3 各进程之间的关系 4 物理实现 为了根据需求
  • 【AI工具】 一款多SOTA模型集成的高精度自动标注工具(直接安装使用,附源码)

    目录 高精度自动标注工具简介及其特性 标注工具的安装 开启自动标注 简介 X AnyLabeling 是一款全新的交互式自动标注工具 其基于AnyLabeling进行构建和二次开发 在此基础上扩展并支持了许多的模型和功能 并借助Segmen
  • java--基础--21.2--注解--案例

    java 基础 21 2 注解 案例 1 类注解 可以在运行时获取类 方法或字段的注解 下面是获取类注解的示例 Class aClass TheClass class Annotation annotations aClass getAnn
  • jar包快速启动和远程监听

    jar包快速启动 制作bat文件 设置窗口背景和字体颜色 设置窗口大小 设置启动内存大小 设置依赖lib文件路径 设置远程debug 制作bat文件 if root set root d root cd root jar bat color
  • STM32的12位ADC过采样实现16位分辨率

    1 什么是过采样 过采样技术是一种以牺牲采样速度来提高ADC分辨率的技术 部分STM32单片机是支持硬件过采样的 如STM32G0系列 通过过采样 可以将12位的ADC提升到16位 非常实用 根据过采样技术 每提高1位ADC分辨率 需要增加
  • CSV文件简介及C++实现

    逗号分隔值 Comma Separated Values CSV 有时也称为字符分隔值 因为分隔字符也可以不是逗号 其文件以纯文本形式存储表格数据 数字和文本 纯文本意味着该文件是一个字符序列 不含必须象二进制数字那样被解读的数据 CSV文
  • Java之继承

    继承 继承 为什么使用继承 继承是什么 继承的语法 访问父类成员 访问父类成员变量 访问父类成员方法 super关键字 子类构造方法 super和this 异同 分别的使用方法 继承的方式 final关键字 作者简介 zoro 1 目前大一
  • 解决安装android studio时用户文件夹为中文名

    第一步 使用登陆管理员账号登陆电脑 更改c盘用户文件夹的名字 创建管理员账户 百度经验 第二步 win r调出运行界面 输入regedit 依此进入 HKEY LOCAL MACHINE SOFTWARE Microsoft Windows
  • Python读取Excel,日期列读出来是数字的处理

    Python读取Excel 里面如果是日期 直接读出来是float类型 无法直接使用 通过判断读取表格的数据类型ctype 进一步处理 返回的单元格内容的类型有5种 ctype 0 empty 1 string 2 number 3 dat
  • Spring学习总结【二】---IoC(控制反转)

    文章目录 IoC理论推导 IoC本质 工作原理 IoC创建对象的方式 IoC理论推导 在我们之前的业务中 用户的需求可能会影响我们原来的代码 我们需要根据用户的需求去修改原代码 如果程序代码量十分大 修改一次的成本代价十分昂贵 之前 程序是
  • 免费送书啦!细数Github大神们的开源书籍![二]

    计算机软件设计 软件设计的哲学 软件设计的哲学 斯坦福教授 Tcl 语言发明者 John Ousterhout 的著作 A Philosophy of Software Design 自出版以来 好评如潮 按照 IT 图书出版的惯例 如果冠
  • 第八课,OpenGL光照之基本光照

    冯氏光照模型 Phong Lighting Model 环境光照 Ambient Lighting 即使在黑暗的情况下 世界上通常也仍然有一些光亮 月亮 远处的光 所以物体几乎永远不会是完全黑暗的 为了模拟这个 我们会使用一个环境光照常量
  • 程序员如何找到女朋友?

    文 转载自公众号51CTO技术栈 生活中我们常常发现很多程序员拿着高薪 却常常沦为单身狗 每当情人节来临 却只能形单影只的一个人 过得十分凄惨 自从程序员毕业出来工作进入 IT 行业之后 常常接触不到女性 一不小心就到了被催恋催婚的年纪 前
  • “算法详解”系列第3卷贪心算法和动态规划出版

    算法详解 系列图书共有4卷 目前1到3卷已经出版 最新出版的是第3卷 贪心算法和动态规划 算法详解 卷3 贪心算法和动态规划 算法详解 系列图书共有4卷 本书是第3卷 贪心算法和动态规划 其中贪心算法主要包括调度 最小生成树 集群 哈夫曼编
  • 小程序的page.json如何配置

    本文小编为大家详细介绍 小程序的page json如何配置 内容详细 步骤清晰 细节处理妥当 希望这篇 小程序的page json如何配置 文章能帮助大家解决疑惑 下面跟着小编的思路慢慢深入 一起来学习新知识吧 JSON 配置 我们可以看到
  • SQLAlchemy使用详细功能SqlHelper封装

    环境与版本 python 3 10 SQLAlchemy 2 0 9 网上好多sql语句查询相关的代码执行异常 不确定是否与版本有有关 说明 封装了比较实用的 复合唯一索引表的插入或更新 多条件查询 叠加条件查询 以及返回pandas Da