使用 dask.delayed 和 pandas.DataFrame 将字典的 dask.bag 转换为 dask.dataframe

2023-11-27

我正在努力转换dask.bag的字典到dask.delayed pandas.DataFrames进入决赛dask.dataframe

我有一个函数(make_dict)将文件读入相当复杂的嵌套字典结构,另一个函数(make_df)将这些字典转换为pandas.DataFrame(每个文件生成的数据帧约为 100 mb)。我想将所有数据帧附加到一个单一的dask.dataframe以供进一步分析。

到目前为止我一直在使用dask.delayed对象来加载、转换和附加所有工作正常的数据(参见下面的示例)。然而,对于未来的工作,我想将加载的字典存储在dask.bag using dask.persist().

我设法将数据加载到dask.bag,产生一个字典列表或列表pandas.DataFrame我可以在调用后在本地使用compute()。当我尝试转动dask.bag into a dask.dataframe using to_delayed()但是,我遇到了一个错误(见下文)。

感觉我在这里错过了一些相当简单的东西,或者也许是我的方法dask.bag是错的?

下面的示例显示了我使用简化函数的方法并引发了相同的错误。任何有关如何解决此问题的建议都将受到赞赏。

import numpy as np
import pandas as pd
import dask
import dask.dataframe
import dask.bag

print(dask.__version__) # 1.1.4
print(pd.__version__) # 0.24.2

def make_dict(n=1):
    return {"name":"dictionary","data":{'A':np.arange(n),'B':np.arange(n)}}

def make_df(d):
    return pd.DataFrame(d['data'])

k = [1,2,3]

# using dask.delayed
dfs = []
for n in k:
    delayed_1 = dask.delayed(make_dict)(n)
    delayed_2 = dask.delayed(make_df)(delayed_1)
    dfs.append(delayed_2)
ddf1 = dask.dataframe.from_delayed(dfs).compute() # this works as expected

# using dask.bag and turning bag of dicts into bag of DataFrames
b1 = dask.bag.from_sequence(k).map(make_dict)
b2 = b1.map(make_df)

df = pd.DataFrame().append(b2.compute()) # <- I would like to do this using delayed dask.DataFrames like above
ddf2 = dask.dataframe.from_delayed(b2.to_delayed()).compute() # <- this fails

# error:
# ValueError: Expected iterable of tuples of (name, dtype), got [   A  B
# 0  0  0]

我最终想使用分布式调度程序做什么:

b = dask.bag.from_sequence(k).map(make_dict)
b = b.persist()
ddf = dask.dataframe.from_delayed(b.map(make_df).to_delayed())

在 bag 的情况下,延迟对象指向元素列表,因此您有一个 pandas 数据帧列表的列表,这不完全是您想要的。两个建议

  1. 只要坚持使用 dask.delayed 即可。看起来很适合你
  2. Use the Bag.to_dataframe方法,它需要一包字典,并自行进行数据帧转换
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 dask.delayed 和 pandas.DataFrame 将字典的 dask.bag 转换为 dask.dataframe 的相关文章

随机推荐