执行批量插入 SQLAlchemy 的最佳方法

2024-03-17

我有一张桌子叫products

其中有以下列id, product_id, data, activity_id

我本质上想做的是复制大量现有产品并更新它activity_id并在产品表中创建新条目。 例子:

我已经有 70 个 Activity_id 为 2 的产品条目

现在我想创建另外 70 个具有相同数据的条目(更新除外)activity_id

我可能有数千个现有条目,我想复制这些条目并将复制的条目 Activity_id 更新为新 ID。

products = self.session.query(model.Products).filter(filter1, filter2).all()

这将返回过滤器的所有现有产品。

然后我迭代产品,然后简单地克隆现有产品并更新 Activity_id 字段。

 for product in products:
                product.activity_id = new_id

 self.uow.skus.bulk_save_objects(simulation_skus)
 self.uow.flush()
 self.uow.commit()

执行这些批量条目的最佳/最快方法是什么,这样可以消磨时间,到目前为止性能还可以,是否有更好的解决方案?


您不需要在本地加载这些对象,您真正想做的就是拥有database创建这些行。

您本质上想要运行一个从现有行创建行的查询:

INSERT INTO product (product_id, data, activity_id)
SELECT product_id, data, 2  -- the new activity_id value
FROM product
WHERE activity_id = old_id

上述查询将完全在数据库服务器上运行;这比将查询加载到 Python 对象中,然后将所有 Python 数据发送回服务器进行填充要好得多INSERT每个新行的语句。

像这样的查询是你可以做的事情SQL炼金术core https://docs.sqlalchemy.org/en/stable/core/index.html,处理生成 SQL 语句的 API 的一半。然而,你can使用从 a 构建的查询声明式 ORM 模型 https://docs.sqlalchemy.org/en/stable/orm/extensions/declarative/index.html作为起点。你需要

  1. 访问Table实例 https://docs.sqlalchemy.org/en/13/core/metadata.html#sqlalchemy.schema.Table对于模型,因为这样您就可以创建一个INSERT声明通过Table.insert() method https://docs.sqlalchemy.org/en/stable/core/metadata.html#sqlalchemy.schema.Table.insert.
    您还可以从以下位置获取相同的对象models.Product查询,稍后详细介绍。
  2. 访问通常会为您的 Python 实例获取数据的语句以进行过滤models.Product询问;您可以通过Query.statement财产 https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.statement.
  3. 更新语句以替换包含的activity_id列与您的新值,并删除主键(我假设您有一个自动递增的主键列)。
  4. 将更新后的语句应用到Insert表的对象通过Insert.from_select() https://docs.sqlalchemy.org/en/stable/core/dml.html#sqlalchemy.sql.expression.Insert.from_select.
  5. 执行生成的INSERT INTO ... FROM ... query.

步骤 1 可以通过使用SQLAlchemy 自省 API https://docs.sqlalchemy.org/en/stable/core/inspection.html; the inspect()功能 https://docs.sqlalchemy.org/en/stable/core/inspection.html#sqlalchemy.inspect,应用于模型类,给你一个Mapper实例 https://docs.sqlalchemy.org/en/stable/orm/mapping_api.html#sqlalchemy.orm.Mapper,这又具有Mapper.local_table属性 https://docs.sqlalchemy.org/en/stable/orm/mapping_api.html#sqlalchemy.orm.Mapper.local_table.

步骤 2 和 3 需要稍微调整一下Select.with_only_columns() method https://docs.sqlalchemy.org/en/stable/core/selectable.html#sqlalchemy.sql.expression.Select.with_only_columns产生一个新的SELECT我们交换列的语句。你不能轻易removeselect 语句中的列,但是我们可以使用循环查询中现有的列 https://docs.sqlalchemy.org/en/stable/core/selectable.html#sqlalchemy.sql.expression.Select.columns将它们“复制”到新的SELECT,同时进行我们的更换。

第 4 步就很简单了,Insert.from_select()需要插入的列和SELECT询问。我们都有SELECT我们拥有的对象也给了我们它的列。

这是生成您的代码INSERT; the **replace关键字参数是插入时要替换的列:

from sqlalchemy import inspect, literal
from sqlalchemy.sql import ClauseElement

def insert_from_query(model, query, **replace):
    # The SQLAlchemy core definition of the table
    table = inspect(model).local_table
    # and the underlying core select statement to source new rows from
    select = query.statement

    # validate asssumptions: make sure the query produces rows from the above table
    assert table in select.froms, f"{query!r} must produce rows from {model!r}"
    assert all(c.name in select.columns for c in table.columns), f"{query!r} must include all {model!r} columns"

    # updated select, replacing the indicated columns
    as_clause = lambda v: literal(v) if not isinstance(v, ClauseElement) else v
    replacements = {name: as_clause(value).label(name) for name, value in replace.items()}
    from_select = select.with_only_columns([
        replacements.get(c.name, c)
        for c in table.columns
        if not c.primary_key
    ])
        
    return table.insert().from_select(from_select.columns, from_select)

我包含了一些关于模型和查询关系的断言,并且代码接受任意列子句作为替换,而不仅仅是文字值。你可以使用func.max(models.Product.activity_id) + 1例如,作为替换值(包装为子选择)。

上述函数执行步骤1-4,产生所需的INSERT打印时的 SQL 语句(我创建了一个products我认为可能具有代表性的模型和查询):

>>> print(insert_from_query(models.Product, products, activity_id=2))
INSERT INTO products (product_id, data, activity_id) SELECT products.product_id, products.data, :param_1 AS activity_id
FROM products
WHERE products.activity_id != :activity_id_1

您所要做的就是执行它:

insert_stmt = insert_from_query(models.Product, products, activity_id=2)
self.session.execute(insert_stmt)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

执行批量插入 SQLAlchemy 的最佳方法 的相关文章

  • 如何手动计算分类交叉熵?

    当我手动计算二元交叉熵时 我应用 sigmoid 来获取概率 然后使用交叉熵公式并平均结果 logits tf constant 1 1 0 1 2 labels tf constant 0 0 1 1 1 probs tf nn sigm
  • 保存为 HDF5 的图像未着色

    我目前正在开发一个将文本文件和 jpg 图像转换为 HDF5 格式的程序 用HDFView 3 0打开 似乎图像仅以灰度保存 hdf h5py File Sample h5 img Image open Image jpg data np
  • 与区域指示符字符类匹配的 python 正则表达式

    我在 Mac 上使用 python 2 7 10 表情符号中的标志由一对表示区域指示符号 https en wikipedia org wiki Regional Indicator Symbol 我想编写一个 python 正则表达式来在
  • Pandas/Google BigQuery:架构不匹配导致上传失败

    我的谷歌表中的架构如下所示 price datetime DATETIME symbol STRING bid open FLOAT bid high FLOAT bid low FLOAT bid close FLOAT ask open
  • 使用 kivy textinput 的 'input_type' 属性的问题

    您好 我在使用 kivy 的文本输入小部件的 input type 属性时遇到问题 问题是我制作了两个自定义文本输入 其中一个称为 StrText 其中设置了 input type text 然后是第二个文本输入 名为 NumText 其
  • 将 python2.7 与 Emacs 24.3 和 python-mode.el 一起使用

    我是 Emacs 新手 我正在尝试设置我的 python 环境 到目前为止 我已经了解到在 python 缓冲区中使用 python mode el C c C c将当前缓冲区的内容加载到交互式 python shell 中 显然使用了什么
  • 使用字典映射数据帧索引

    为什么不df index map dict 工作就像df column name map dict 这是尝试使用index map的一个小例子 import pandas as pd df pd DataFrame one A 10 B 2
  • 您可以格式化 pandas 整数以进行显示,例如浮点数的“pd.options.display.float_format”?

    我见过this https stackoverflow com questions 18404946 py pandas formatdataframe and this https stackoverflow com questions
  • datetime.datetime.now() 返回旧值

    我正在通过匹配日期查找 python 中的数据存储条目 我想要的是每天选择 今天 的条目 但由于某种原因 当我将代码上传到 gae 服务器时 它只能工作一天 第二天它仍然返回相同的值 例如当我上传代码并在 07 01 2014 执行它时 它
  • 使用 xlrd 打开 BytesIO (xlsx)

    我正在使用 Django 需要读取上传的 xlsx 文件的工作表和单元格 使用 xlrd 应该可以 但因为文件必须保留在内存中并且可能不会保存到我不知道如何继续的位置 本例中的起点是一个带有上传输入和提交按钮的网页 提交后 文件被捕获req
  • 如何在 Python 中解析和比较 ISO 8601 持续时间? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找一个 Python v2 库 它允许我解析和比较 ISO 8601 持续时间may处于不同单
  • 在Python中检索PostgreSQL数据库的新记录

    在数据库表中 第二列和第三列有数字 将会不断添加新行 每次 每当数据库表中添加新行时 python 都需要不断检查它们 当 sql 表中收到的新行数低于 105 时 python 应打印一条通知消息 警告 数量已降至 105 以下 另一方面
  • 如何使用python在一个文件中写入多行

    如果我知道要写多少行 我就知道如何将多行写入一个文件 但是 当我想写多行时 问题就出现了 但是 我不知道它们会是多少 我正在开发一个应用程序 它从网站上抓取并将结果的链接存储在文本文件中 但是 我们不知道它会回复多少行 我的代码现在如下 r
  • 加快网络抓取速度

    我正在使用一个非常简单的网络抓取工具抓取 23770 个网页scrapy 我对 scrapy 甚至 python 都很陌生 但设法编写了一个可以完成这项工作的蜘蛛 然而 它确实很慢 爬行 23770 个页面大约需要 28 小时 我看过scr
  • pip 列出活动 virtualenv 中的全局包

    将 pip 从 1 4 x 升级到 1 5 后pip freeze输出我的全局安装 系统 软件包的列表 而不是我的 virtualenv 中安装的软件包的列表 我尝试再次降级到 1 4 但这并不能解决我的问题 这有点类似于这个问题 http
  • Python:XML 内所有标签名称中的字符串替换(将连字符替换为下划线)

    我有一个格式不太好的 XML 标签名称内有连字符 我想用下划线替换它 以便能够与 lxml objectify 一起使用 我想替换所有标签名称 包括嵌套的子标签 示例 XML
  • 如何解决 PDFBox 没有 unicode 映射错误?

    我有一个现有的 PDF 文件 我想使用 python 脚本将其转换为 Excel 文件 目前正在使用PDFBox 但是存在多个类似以下错误 org apache pdfbox pdmodel font PDType0Font toUnico
  • 将 Python 中的日期与日期时间进行比较

    所以我有一个日期列表 datetime date 2013 7 9 datetime date 2013 7 12 datetime date 2013 7 15 datetime date 2013 7 18 datetime date
  • Scipy Sparse:SciPy/NumPy 更新后出现奇异矩阵警告

    我的问题是由大型电阻器系统的节点分析产生的 我基本上是在设置一个大的稀疏矩阵A 我的解向量b 我正在尝试求解线性方程A x b 为了做到这一点 我正在使用scipy sparse linalg spsolve method 直到最近 一切都
  • 使用 z = f(x, y) 形式的 B 样条方法来拟合 z = f(x)

    作为一个潜在的解决方案这个问题 https stackoverflow com questions 76476327 how to avoid creating many binary switching variables in gekk

随机推荐