我正在处理大型 CSV 文件并且我需要制作笛卡尔积(合并操作)。我尝试用 Pandas 来解决这个问题(你可以检查 Panda 的代码和数据格式示例对于同样的问题, here)由于内存错误而没有成功。现在,我正在尝试使用 Dask,它应该可以管理巨大的数据集,即使其大小大于可用 RAM。
首先我读了两个 CSV:
from dask import dataframe as dd
BLOCKSIZE = 64000000 # = 64 Mb chunks
df1_file_path = './mRNA_TCGA_breast.csv'
df2_file_path = './miRNA_TCGA_breast.csv'
# Gets Dataframes
df1 = dd.read_csv(
df1_file_path,
delimiter='\t',
blocksize=BLOCKSIZE
)
first_column = df1.columns.values[0]
df1.set_index(first_column)
df2 = dd.read_csv(
df2_file_path,
delimiter='\t',
blocksize=BLOCKSIZE
)
first_column = df2.columns.values[0]
df2.set_index(first_column)
# Filter common columns
common_columns = df1.columns.intersection(df2.columns)
df1 = df1[common_columns]
df2 = df2[common_columns]
然后,我将操作存储在磁盘上以防止内存错误:
# Computes a Cartesian product
df1['_tmpkey'] = 1
df2['_tmpkey'] = 1
# Neither of these two options work
# df1.merge(df2, on='_tmpkey').drop('_tmpkey', axis=1).to_hdf('/tmp/merge.*.hdf', key='/merge_data')
# df1.merge(df2, on='_tmpkey').drop('_tmpkey', axis=1).to_parquet('/tmp/')
我做了尝试使用与我正在使用的完全相同的 CSV 文件的存储库。我尝试过较小的blocksize
值,但我得到了同样的错误。我错过了什么吗?任何形式的帮助将非常感激。
我使用以下方法成功运行了您的代码,内存限制为 32GB。
我已经摆脱了争论BLOCKSIZE
并使用repartition
而是在 df1 和 df2 上。
df1 = df1.repartition(npartitions=50)
df2 = df2.repartition(npartitions=1)
请注意,df2 的大小是真的更小与 df1 相比(2.5 MB 与 23.75 MB),这就是为什么我只为 df2 保留一个分区,并将 df1 切成 50 个分区。
这样做应该会使代码适合您。
对我来说,使用的内存保持在 12GB 以下。
为了检查,我计算了结果的 len :
len(df) # 3001995
按照上述内容创建一个包含 50 个分区的 parquet 文件。
您可以使用repartition
再次获得您想要的partition_size。
NB:
添加这个应该可以加速你的代码:
from dask.distributed import Client
client = Client()
就我而言,我不得不使用这个论点Client(processes=False)
因为我的运行环境。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)