map_partitions
您可以使用以下命令将函数应用于数据框的所有分区map_partitions
功能。
df.map_partitions(func, columns=...)
请注意, func 一次只会给出数据集的一部分,而不是像 with 那样给出整个数据集pandas apply
(如果你想做并行性,你可能不希望这样做。)
map
/ apply
您可以将函数按行映射到系列中map
df.mycolumn.map(func)
您可以使用以下命令在数据框中按行映射函数apply
df.apply(func, axis=1)
线程与进程
从版本 0.6.0 开始dask.dataframes
与线程并行。自定义 Python 函数不会从基于线程的并行性中获得太多好处。你可以尝试流程
df = dd.read_csv(...)
df.map_partitions(func, columns=...).compute(scheduler='processes')
但要避免apply
然而,你真的应该避免apply
在 Pandas 和 Dask 中使用自定义 Python 函数。这通常是性能不佳的根源。如果您找到一种以矢量化方式执行操作的方法,那么您的 Pandas 代码可能会快 100 倍,并且您根本不需要 dask.dataframe。
考虑numba
对于您的特定问题,您可能会考虑numba http://numba.pydata.org/。这会显着提高您的表现。
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)
In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --
In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms
In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)
In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms
In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
免责声明,我在生产这两种产品的公司工作numba
and dask
并雇用了许多pandas
开发商。