使用 Dask 的新 to_sql 来提高效率(内存/速度)或替代方案将数据从 dask 数据帧获取到 SQL Server 表

2024-01-15

我的最终目标是结合使用 SQL/Python 来处理一个项目,该项目的数据量太大,以至于 pandas 无法处理(至少在我的机器上)。所以,我已经和dask to:

  1. 从多个源读取数据(主要是 SQL Server 表/视图)
  2. 将数据操作/合并到一个大型 dask 数据帧表中,该表包含约 1000 万行和 52 列,其中一些具有一些长的唯一字符串
  3. 每天将其写回SQL Server,以便我的PowerBI报表可以自动刷新数据。

对于 #1 和 #2,使用最少的内存执行它们总共需要约 30 秒(多个 SQL 查询约 200 行代码,使用 dask 操作大型数据集)。又快又有趣!!!

但是,上面的#3 一直是主要瓶颈。使用 dask 或其他替代方案在(1. 内存和 2. 速度(执行时间))方面有哪些有效的方法来实现#3?查看更多背景信息,以及我尝试过的内容和我得出的一些结论。


对于上面的 #1、#2 和 #3,由于内存限制/执行时间长,我发现这是一项无法用 pandas 完成的任务,但是dask出色地解决了上面的#1 和#2,但我仍在努力解决#3——以自动方式将数据返回到 SQL 表中,而我没有发送到 .csv,然后导入到 SQL Server。我试过.compute()将 dask 数据帧转换为 pandas 数据帧,然后写入to_sql,但这违背了使用 dask 读取/数据模型的目的,并且再次耗尽内存/无论如何都需要永远执行。

所以,新的计划是使用to_csv每天生成一个新的 .csv 并使用查询将数据批量插入到表中。我认为这仍然是一个可行的解决方案;但是,今天,我很高兴发现 dask 发布了一个新的to_sql功能 (https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_sql https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_sql)。利用有关此主题的现有 StackOverflow 文章/博客(例如来自 Francois Leblanc -https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html),我修改了所有参数,以找到执行时间最快的最有效组合(当您每天为报告编写大型数据集时,这非常重要)。这是我发现的,和很多帖子类似pd.to_sql包括勒布朗的:

import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)

使用以下非默认参数的任意组合慢下来我的执行时间to_sql(再次同意勒布朗在他的博客中提到的内容):

  1. chunksize=40(40 是我可以根据 2098 SQL Server 参数限制传递 52 列的最大值),
  2. method='multi',
  3. parallel=True)

注意:我意识到除了(或替代)通过chunksize=40,我可以循环遍历 33 个 dask 数据帧分区并处理每个块to_sql单独。这样内存效率会更高,而且速度可能也会更快。一个分区需要 45 秒到 1 分钟,而所有分区一次处理整个 dask 数据帧需要超过 1 小时。我将尝试循环遍历所有分区并发布更新(如果速度更快)。一个小时似乎很多,但当我尝试用 pandas 进行计算时,我感觉完全受阻,这花了一整夜或耗尽了内存,所以这是一个步骤。老实说,我对此很满意,我现在可能会构建一个 .exepyinstaller并让 .exe 每天运行,这样就可以完全自动化并从那里开始,但我认为这对其他人会有帮助,因为在过去的几周里我一直在努力解决各种解决方案。


选择将 dask 数据帧作为分区插入不应加快插入过程所需的总时间。

每次你打电话insert,无论是有分区还是整个数据要插入.compute()调用方法从内存中提取数据并使用它,并且不能通过此优化它。我真的怀疑是否有必要提取分区,我认为后面的方法to_sql()dask 已经使用了这种方法。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Dask 的新 to_sql 来提高效率(内存/速度)或替代方案将数据从 dask 数据帧获取到 SQL Server 表 的相关文章

随机推荐

  • 在 R 中将多个 XML 文件合并到一个数据框中

    我有许多 XML 文件 大约 100 000 个 它们都如下所示 每个文件大约有 100 个点节点 我只展示其中五个以供说明
  • 在 F# 元组中使用 CustomComparison 和 CustomEquality 实现自定义比较

    我来这里是为了询问一个特定的主题 我确实在网上找到了一些有关此问题的信息 我正在实现 Minimax 算法的 F 版本 我现在遇到的问题是我想比较我的树的叶子 下面的数据结构 搜索 VS 给我的错误 我得到了这样的结果 我曾经拥有的树类型
  • 将 bash 环境变量发送回 python Fabric

    我正在尝试将 bash 环境变量传递回我的结构函数 如下所示 from fabric api import env def env localhost All the environment variables relating to yo
  • 从 fasta 文件中删除多个序列

    我有一个字符序列的文本文件 由两行组成 标题和下一行中的序列本身 该文件的结构如下 gt header1 aaaaaaaaa gt header2 bbbbbbbbbbb gt header3 aaabbbaaaa gt headerN a
  • 将 docker 映像从 GCR 拉入 GKE 时出错“无法拉取映像 .... 403 Forbidden”

    背景 我有一个 GKE 集群突然无法从 GCR 提取我的 docker 镜像 两者都在同一个 GCP 项目中 它已经运行良好几个月了 提取图像没有任何问题 现在在没有进行任何更改的情况下开始抛出错误 注意 我通常是团队中唯一访问 Googl
  • 记录亚马逊 Alexa 未处理的话语

    是否可以记录用户提出的触发未处理意图的问题 我打算做什么 当用户说出 Alexa 无法理解的话语时 它会触发未处理的意图 这里我想请用户重复该语句并将其存储在数据库中以供将来参考 这需要我创建一个意图 期望有一个可以接受语句的槽 即槽值不会
  • 应用程序版本未显示在 Spring Boot Banner.txt 中

    运行应用程序时 banner txt 中定义的应用程序版本不会显示在控制台上 它的定义是根据Spring Boot 文档 http docs spring io spring boot docs current SNAPSHOT refer
  • Python:在PDF中搜索文本

    我想写一个这样的函数 输入 一个 PDF 文件 一个字符串 该 PDF 是可搜索的 例如 它是由 MS Word 创建的 输出 PDF 文件中字符串的页面和位置 坐标 x 和 y 如果有 你能给我一些提示 什么库 方法 来用Python来做
  • C++ 标准库或其他广泛使用的库中的单链表?

    好像C 标准库里只有双向链表 没有单链表 吧 是否有广泛使用的带有单链表的 C 库 有的是slist http www boost org doc libs 1 35 0 doc html intrusive slist htmlBoost
  • 将 git 子模块与 python 一起使用

    我在这个网站上阅读了很多关于 git 子模块使用的博客文章和问题 但仍然不知道如何更好地在 python 中使用它们 我的意思是 如果我有这样的包 管理依赖项的更简单方法是什么 mypkg init py setup py submodul
  • 在 Spark RDD 和/或 Spark DataFrame 中重塑/旋转数据

    我有一些以下格式的数据 RDD 或 Spark DataFrame from pyspark sql import SQLContext sqlContext SQLContext sc rdd sc parallelize X01 41
  • 为什么scanf可以将errno设置为零?(当输入“ctrl+D”时)

    手册告诉我们 errno永远不会被任何系统调用或库函数设置为零 但我想知道 为什么以下代码中的errno可以通过scanf设置为零 当scanf 输入 ctrl D 时 include
  • 没有让 JSR303 注释与 Tomcat 7 一起使用

    经过几个小时的谷歌和几个教程后 我被击败了 事实并不是我收到的错误可以给我提示问题是什么 而是完全没有错误让我发疯 下面的代码可以工作 只是没有达到应有的效果 用于检查输入不为空或长度小于 3 个字符的注释永远不会运行 当部署项目或写入 n
  • 如何确定 Java 应用程序的主类?

    我们正在开发一个平台 许多开发人员将在其中编写自己的 ETL 应用程序 这些应用程序使用供应商的 API 然后将其提交到平台上执行 我们希望限制开发人员在编写 Main 类 通常只使用供应商的 API 时不要只顾自己的事情 以促进一些牢固的
  • 版本号 float、decimal 或 double

    我有一个文档管理系统 其中文档可以有多个版本 每个版本都会被保存 用户可以查看版本历史记录 我想知道的是 我应该使用什么数据类型作为版本号 小数 浮点还是双精度 我正在使用 NET 和 C 版本号开始于0 1以及每一个发表的major版本将
  • 分段函数 lmfit

    我正在尝试定义一个分段函数 以便由 Python 中的 lmfit 库拟合 我遇到的问题是我为该函数定义的参数不会与我提交的数据一起评估 我有一个与我的案例有点相似的例子here https stackoverflow com questi
  • 如何在 bash 中使用正则表达式验证版本号是否有效?

    我正在尝试验证版本号是否与版本模式匹配 但似乎检查由于某些奇怪的原因而失败 bin bash VERSION 1 2 3 if VERSION d d d then echo INFO lt gt Version VERSION else
  • Rails 将参数传递给 ActiveRecord 回调函数

    我的一个 AR 模型中有以下代码片段 after update cache bust The cache bust模型中的方法接受一个参数 布尔值 该参数将自身的值设置为false默认情况下 我怎样才能通过true从我上面定义的 Activ
  • 带参数的 Flask 表单

    我正在尝试用一个参数定义 Flask 表单 这是我的方法 forms py class RegisterPatternForm FlaskForm cursorPatients MongoClient localhost 27017 myD
  • 使用 Dask 的新 to_sql 来提高效率(内存/速度)或替代方案将数据从 dask 数据帧获取到 SQL Server 表

    我的最终目标是结合使用 SQL Python 来处理一个项目 该项目的数据量太大 以至于 pandas 无法处理 至少在我的机器上 所以 我已经和dask to 从多个源读取数据 主要是 SQL Server 表 视图 将数据操作 合并到一