多线程池非常适合共享y
线程之间的数据帧(消除了使用共享内存的需要),但不太擅长并行运行 CPU 密集型处理。多处理池非常适合执行 CPU 密集型处理,但在跨进程共享数据而不提供内存碎片表示时效果不佳。y
数据框。
在这里,我重新排列了您的代码,以便使用多线程池来创建filtered_y
对于每个时期(其中is这是一个 CPU 密集型操作,但 pandas 确实为某些操作释放了全局解释器锁——希望是这个)。然后我们只将一个月的数据传递到多处理池,而不是整个数据y
dataframe,使用工作函数处理该月process_month
。但由于每个池进程都无权访问y
dataframe,它只返回需要用要替换的值更新的索引。
import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count
def process_month(period, filtered_y):
"""
returns a list of tuples consisting of (index, value) pairs
"""
filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2) # data-2020-01
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
results = []
for index, row in filtered_y.iterrows():
idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
for _, value in month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
results.append((index, 1))
break
if value < down_value:
results.append((index, 0))
break
return results
def process(period):
filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])] # Only get the current month records
for index, value in multiprocessing_pool.apply(process_month, (period, filtered_y)):
y.loc[index, "result"] = value
def main():
global y, multiprocessing_pool
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
MAX_THREAD_POOL_SIZE = 100
thread_pool_size = min(MAX_THREAD_POOL_SIZE, len(periods))
multiprocessing_pool_size = min(thread_pool_size, cpu_count())
with Pool(multiprocessing_pool_size) as multiprocessing_pool, \
ThreadPool(thread_pool_size) as thread_pool:
thread_pool.map(process, periods)
# Presumably y gets written out again as a CSV file here?
# Required for Windows:
if __name__ == '__main__':
main()
仅使用单个多处理池的版本
import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count
def process_month(period):
"""
returns a list of tuples consisting of (index, value) pairs
"""
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])] # Only get the current month records
filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2) # data-2020-01
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
results = []
for index, row in filtered_y.iterrows():
idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
for _, value in month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
results.append((index, 1))
break
if value < down_value:
results.append((index, 0))
break
return results
def main():
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
multiprocessing_pool_size = min(len(periods), cpu_count())
with Pool(multiprocessing_pool_size) as multiprocessing_pool:
results_list = multiprocessing_pool.map(process_month, periods)
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
for results in results_list:
for index, value in results:
y.loc[index, "result"] = value
# Write out new csv file:
...
# Required for Windows:
if __name__ == '__main__':
main()
现在,它的一个变体使用更多的内存,但允许主进程将其处理与多处理池重叠。如果需要更新的索引数量非常大,这可能会很有用:
...
def main():
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
multiprocessing_pool_size = min(len(periods), cpu_count() - 1) # save a core for the main process
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
with Pool(multiprocessing_pool_size) as multiprocessing_pool:
# Process values as soon as they are returned:
for results in multiprocessing_pool.imap_unordered(process_month, periods):
for index, value in results:
y.loc[index, "result"] = value
# Write out new csv file:
...
最后一个版本可能会更优秀,因为它在将任务提交到池之前首先读取 csv 文件,并且根据平台及其缓存 I/O 操作的方式,可能会导致工作函数不必执行任何物理 I/O 来读取在其文件副本中。但那又是一个10M的文件被读入内存了。