我的最终目标是结合使用 SQL/Python 来处理一个项目,该项目的数据量太大,以至于 pandas 无法处理(至少在我的机器上)。所以,我已经和dask
to:
- 从多个源读取数据(主要是 SQL Server 表/视图)
- 将数据操作/合并到一个大型 dask 数据帧表中,该表包含约 1000 万行和 52 列,其中一些具有一些长的唯一字符串
- 每天将其写回SQL Server,以便我的PowerBI报表可以自动刷新数据。
对于 #1 和 #2,使用最少的内存执行它们总共需要约 30 秒(多个 SQL 查询约 200 行代码,使用 dask 操作大型数据集)。又快又有趣!!!
但是,上面的#3 一直是主要瓶颈。使用 dask 或其他替代方案在(1. 内存和 2. 速度(执行时间))方面有哪些有效的方法来实现#3?查看更多背景信息,以及我尝试过的内容和我得出的一些结论。
对于上面的 #1、#2 和 #3,由于内存限制/执行时间长,我发现这是一项无法用 pandas 完成的任务,但是dask
出色地解决了上面的#1 和#2,但我仍在努力解决#3——以自动方式将数据返回到 SQL 表中,而我没有发送到 .csv,然后导入到 SQL Server。我试过.compute()
将 dask 数据帧转换为 pandas 数据帧,然后写入to_sql
,但这违背了使用 dask 读取/数据模型的目的,并且再次耗尽内存/无论如何都需要永远执行。
所以,新的计划是使用to_csv
每天生成一个新的 .csv 并使用查询将数据批量插入到表中。我认为这仍然是一个可行的解决方案;但是,今天,我很高兴发现 dask 发布了一个新的to_sql
功能 (https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_sql https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_sql)。利用有关此主题的现有 StackOverflow 文章/博客(例如来自 Francois Leblanc -https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html),我修改了所有参数,以找到执行时间最快的最有效组合(当您每天为报告编写大型数据集时,这非常重要)。这是我发现的,和很多帖子类似pd.to_sql
包括勒布朗的:
import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
使用以下非默认参数的任意组合慢下来我的执行时间to_sql
(再次同意勒布朗在他的博客中提到的内容):
-
chunksize=40
(40 是我可以根据 2098 SQL Server 参数限制传递 52 列的最大值),
-
method='multi'
,
-
parallel=True
)
注意:我意识到除了(或替代)通过chunksize=40
,我可以循环遍历 33 个 dask 数据帧分区并处理每个块to_sql
单独。这样内存效率会更高,而且速度可能也会更快。一个分区需要 45 秒到 1 分钟,而所有分区一次处理整个 dask 数据帧需要超过 1 小时。我将尝试循环遍历所有分区并发布更新(如果速度更快)。一个小时似乎很多,但当我尝试用 pandas 进行计算时,我感觉完全受阻,这花了一整夜或耗尽了内存,所以这是一个步骤。老实说,我对此很满意,我现在可能会构建一个 .exepyinstaller
并让 .exe 每天运行,这样就可以完全自动化并从那里开始,但我认为这对其他人会有帮助,因为在过去的几周里我一直在努力解决各种解决方案。