我正在对模拟的时间序列进行分析。基本上,它为每个时间步骤执行相同的任务。由于时间步数非常多,并且每个时间步的分析都是独立的,因此我想创建一个可以多处理另一个函数的函数。后者将有参数,并返回结果。
使用共享字典和 lib并发.futures,我设法编写了以下内容:
import concurrent.futures as Cfut
def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
# function : function that is running in parallel
# param_list : list of items
# group_size : size of the groups
# Nworkers : number of group/items running in the same time
# **param_fixed : passing parameters
manager = mlp.Manager()
dic = manager.dict()
executor = Cfut.ProcessPoolExecutor(Nworkers)
futures = [executor.submit(function, param, dic, *args)
for param in grouper(param_list, group_size)]
Cfut.wait(futures)
return [dic[i] for i in sorted(dic.keys())]
通常,我可以这样使用它:
def read_file(files, dictionnary):
for file in files:
i = int(file[4:9])
#print(str(i))
if 'bz2' in file:
os.system('bunzip2 ' + file)
file = file[:-4]
dictionnary[i] = np.loadtxt(file)
os.system('bzip2 ' + file)
Map = np.array(multiprocess_loop_grouped(read_file, list_alti, Group_size, N_thread))
或者像这样:
def autocorr(x):
result = np.correlate(x, x, mode='full')
return result[result.size//2:]
def find_lambda_finger(indexes, dic, Deviation):
for i in indexes :
#print(str(i))
# Beach = Deviation[i,:] - np.mean(Deviation[i,:])
dic[i] = Anls.find_first_max(autocorr(Deviation[i,:]), valmax = True)
args = [Deviation]
Temp = Rescal.multiprocess_loop_grouped(find_lambda_finger, range(Nalti), Group_size, N_thread, *args)
基本上,它正在发挥作用。但效果不佳。有时会崩溃。有时它实际上启动了与 Nworkers 数量相同的 python 进程,有时当我指定时,一次只有 2 或 3 个进程在运行Nworkers = 15
.
例如,我遇到的一个经典错误在我提出的以下主题中进行了描述:在多处理之后调用 matplotlib 有时会导致错误:主线程不在主循环中 https://stackoverflow.com/questions/53045307/calling-matplotlib-after-multiprocessing-sometimes-results-in-error-main-threa
什么是更Pythonic的方式来实现我想要的?如何改进该功能的控制?如何控制更多正在运行的python进程的数量?