我在具有不同参数的循环中运行相同的模拟。每个模拟都使用 pandas DataFrame (data
) 只能读取,不能修改。使用ipyparallel
(IPython并行),我可以在模拟开始之前将此DataFrame放入我视图中每个引擎的全局变量空间中:
view['data'] = data
然后,引擎可以访问 DataFrame 以进行在其上运行的所有模拟。复制数据的过程(如果是 pickled,data
是 40MB) 只需几秒钟。然而,如果模拟数量增加,内存使用量就会变得非常大。我想这个共享数据正在被复制对于每个任务而不仅仅是对于每个引擎。与引擎共享客户端静态只读数据的最佳实践是什么?每个引擎复制一次是可以接受的,但理想情况下每个主机只需复制一次(我在主机 1 上有 4 个引擎,在主机 2 上有 8 个引擎)。
这是我的代码:
from ipyparallel import Client
import pandas as pd
rc = Client()
view = rc[:] # use all engines
view.scatter('id', rc.ids, flatten=True) # So we can track which engine performed what task
def do_simulation(tweaks):
""" Run simulation with specified tweaks """
# Do sim stuff using the global data DataFrame
return results, id, tweaks
if __name__ == '__main__':
data = pd.read_sql("SELECT * FROM my_table", engine)
threads = [] # store list of tweaks dicts
for i in range(4):
for j in range(5):
for k in range(6):
threads.append(dict(i=i, j=j, k=k)
# Set up globals for each engine. This is the read-only DataFrame
view['data'] = data
ar = view.map_async(do_simulation, threads)
# Our async results should pop up over time. Let's measure our progress:
for idx, (results, id, tweaks) in enumerate(ar):
print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress / len(ar), idx, id)
# Store results as a pickle for the future
pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j'])
# Save our results to a pickle file
pd.to_pickle(results, out_file_path + pfile)
print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time)
如果模拟计数很小(~50),那么需要一段时间才能开始,但我开始看到进度打印语句。奇怪的是,多个任务将被分配给同一个引擎,并且在该引擎的所有分配的任务完成之前我看不到响应。我希望看到来自enumerate(ar)
每次单个模拟任务完成时。
如果模拟计数很大(~1000),则需要很长时间才能开始,我看到所有引擎上的 CPU 都在加速,但直到很长一段时间(~40 分钟)才看到进度打印语句,当我do查看进度,似乎有一大块(>100)任务进入了同一个引擎,并等待该引擎完成,然后才提供一些进度。当那台发动机完成后,我看到了ar
对象每 4 秒提供新响应 - 这可能是写入输出 pickle 文件的时间延迟。
最后,host1 还运行 ipycontroller 任务,并且它的内存使用量疯狂增加(Python 任务显示使用 >6GB RAM,内核任务显示使用 3GB)。 host2 引擎根本没有显示太多内存使用情况。什么会导致内存激增?