Dask dataframe:“set_index”可以将单个索引放入多个分区吗?

2024-01-29

根据经验,似乎每当你set_index在 Dask 数据帧上,Dask 始终将具有相同索引的行放入单个分区中,即使这会导致分区严重不平衡。

这是一个演示:

import pandas as pd
import dask.dataframe as dd

users = [1]*1000 + [2]*1000 + [3]*1000

df = pd.DataFrame({'user': users})
ddf = dd.from_pandas(df, npartitions=1000)

ddf = ddf.set_index('user')

counts = ddf.map_partitions(lambda x: len(x)).compute()
counts.loc[counts > 0]
# 500    1000
# 999    2000
# dtype: int64

然而,我在任何地方都找不到这种行为的保证。

我曾尝试自己筛选代码,但放弃了。我相信这些相互关联的函数之一可能包含答案:

  • set_index https://github.com/dask/dask/blob/8aea537d925b794a94f828d35211a5da05ad9dce/dask/dataframe/shuffle.py#L118
  • set_partitions https://github.com/dask/dask/blob/8aea537d925b794a94f828d35211a5da05ad9dce/dask/dataframe/shuffle.py#L210
  • rearrange_by_column https://github.com/dask/dask/blob/8aea537d925b794a94f828d35211a5da05ad9dce/dask/dataframe/shuffle.py#L398
  • rearrange_by_column_tasks https://github.com/dask/dask/blob/8aea537d925b794a94f828d35211a5da05ad9dce/dask/dataframe/shuffle.py#L534
  • SimpleShuffleLayer https://github.com/dask/dask/blob/cf82bb07d7070f0950b1cd7b33ff77fd384406f8/dask/layers.py#L364

当你set_index,是否单个索引永远不能位于两个不同的分区中?如果不是,那么该财产在什么条件下成立?


赏金:我将向来自信誉良好的来源的答案授予赏金。例如,引用实现来表明该属性必须成立。


单个索引是否永远不能位于两个不同的分区中?

不,这当然是允许的。达斯克甚至打算让这种情况发生。然而,由于一个bug https://github.com/dask/dask/issues/8437 in set_index,所有数据仍将位于一个分区中。

一个极端的例子(除了一个之外,每一行都是相同的值):

In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = pd.DataFrame({"A": [0] + [1] * 20})
In [4]: ddf = dd.from_pandas(df, npartitions=10)
In [5]: s = ddf.set_index("A")
In [6]: s.divisions
Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)

如您所见,Dask 的目的是0要在多个分区之间分割。然而,当洗牌真正发生时,所有的0s 仍然最终位于一个分区中:

In [7]: import dask
In [8]: dask.compute(s.to_delayed())  # easy way to see the partitions separately
Out[8]: 
([Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)

这是因为code https://github.com/dask/dask/blob/a5aecac8313fea30c5503f534c71f325b1775b9c/dask/dataframe/shuffle.py#L796决定一行属于哪个输出分区不考虑重复项divisions。治疗divisions作为一个系列,它使用searchsorted https://pandas.pydata.org/docs/reference/api/pandas.Series.searchsorted.html with side="right",这就是为什么所有数据总是在最后一个分区中结束的原因。

问题解决后我会更新这个答案。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Dask dataframe:“set_index”可以将单个索引放入多个分区吗? 的相关文章

随机推荐