pd.DataFrame.groupby.apply 确实给了我们很大的灵活性(与 agg/filter/transform 不同,它允许您将每个子组重塑为任何形状,在您的情况下,从 538 x 122 到 N_categories x 122)。但它确实是有代价的:逐一应用灵活的函数并且缺乏矢量化。
我仍然认为解决这个问题的方法是使用多处理。您遇到的 pickle 错误很可能是因为您在 multi_processing_function 中定义了一些函数。规则是您必须将所有功能移至顶层。请参阅下面的代码。
import pandas as pd
import numpy as np
# simulate your data with int 0 - 9 for categorical values
df = pd.DataFrame(np.random.choice(np.arange(10), size=(538, 122)))
# simulate your groupby operations, not so cracy with 8700 sub-groups, just try 800 groups for illustration
sim_keys = ['ROW' + str(x) for x in np.arange(800)]
big_data = pd.concat([df] * 800, axis=0, keys=sim_keys)
big_data.shape
big_data.shape
Out[337]: (430400, 122)
# Without multiprocessing
# ===================================================
by_keys = big_data.groupby(level=0)
sample_group = list(by_keys)[0][1]
sample_group.shape
def your_func(g):
return g.apply(lambda s: s.value_counts()) / len(g.index)
def test_no_multiprocessing(gb, apply_func):
return gb.apply(apply_func)
%time result_no_multiprocessing = test_no_multiprocessing(by_keys, your_func)
CPU times: user 1min 26s, sys: 4.03 s, total: 1min 30s
Wall time: 1min 27
这里相当慢。让我们使用多处理模块:
# multiprocessing for pandas dataframe apply
# ===================================================
# to void pickle error, must define functions at TOP level, if we move this function 'process' into 'test_with_multiprocessing', it raises a pickle error
def process(df):
return df.groupby(level=0).apply(your_func)
def test_with_multiprocessing(big_data, apply_func):
import multiprocessing as mp
p = mp.Pool(processes=8)
# split it into 8 chunks
split_dfs = np.array_split(big_data, 8, axis=0)
# define the mapping function, wrapping it to take just df as input
# apply to each chunk
df_pool_results = p.map(process, split_dfs)
p.close()
# combine together
result = pd.concat(df_pool_results, axis=0)
return result
%time result_with_multiprocessing = test_with_multiprocessing(big_data, your_func)
CPU times: user 984 ms, sys: 3.46 s, total: 4.44 s
Wall time: 22.3 s
现在,它的速度要快得多,尤其是在 CPU 时间方面。尽管当我们拆分和重新组合结果时会产生一些开销,但在使用 8 核处理器时,预计速度会比非多处理情况快 4 - 6 倍。
最后检查两个结果是否相同。
import pandas.util.testing as pdt
pdt.assert_frame_equal(result_no_multiprocessing, result_with_multiprocessing)
漂亮地通过测试。