20220811 -
0. 引言
在进行机器学习的相关实验中,当使用sklearn的时候,通常可以通过n_jobs=-1
这个参数实现某些算法的并行化,例如集成学习的方法,或者是参数搜索的函数,通过查看相关的文档,或者直接去看这个代码,可以发现sklearn底层实现这部分功能是利用了joblib这个库,具体他是怎么实现的,我没有去深究,就是大致看了看比人的代码。
但是这里我遇到了另外的问题,我需要实现一种嵌套式的并行化代码。
例如github上有人针对类似的问题提出了这部分说法[1],不过最后的时候,我看好像不了了之了。不过好像也看到了例如dask这种开源的机器学习并行库方法等。
1. 问题具体说明
我的问题大致上是和[1]的需求一样的,我采用了一种集成分类器,他的开源代码通过n_jobs
制定了并行化的方式,而且这部分代码其实是没什么问题的。例如10个基分类器,通过进程信息可以看到10个进程在同时跑,同时工作。
但是我这里有一个另外的需求,我需要对这个10个分类器,分别求取最优的参数集合,那么就需要一个类似网格搜索的代码,但是当你在这个分类器进行训练的时候,将这部分代码加入进去之后,会发现,即使在网格搜索的部分加入了n_jobs=-1
,也依然是10个进程在跑。
从本质上来说,也就是嵌套式的并行化跑。内部的程序并没有发生作用。
每个东西跑上挺久,反正最后也能出结果,倒是问题也不大。但是,如果数据量比较大,光等结果的时候可能就什么也干不了,就挺尴尬。
所以就搜了搜,这部分内容,其实说实话,搜了挺久,好像大部分都仅仅是针对外围部分的信息,而不是里面。
2. 解决方式
实际上,我最后都不知道他是怎么解决的,因为有人跟我又同样的需求,他自己提出了问题,然后自己提出了解决方案。
from joblib import Parallel, delayed, parallel_backend
import numpy as np
def parallel_in_parallel_test(i):
a = np.ones((1000,1000))
for j in range(2000):
a *= np.random.randn(1000,1000)
return a.sum()
def parallel_in_parallel_wrapper(j, n_threads=4):
with parallel_backend("loky", inner_max_num_threads=n_threads):
out2 = Parallel(n_jobs=n_threads)(delayed(parallel_in_parallel_test)(i) for i in range(100))
return np.array(out2).sum()
out = Parallel(n_jobs=3)(delayed(parallel_in_parallel_wrapper)(j, n_threads=4) for j in range(100))
他的代码意思就是,并没有12个进程再跑。反而只有外面的4个在跑。
不过他自己也不知道为什么这样改了能够成功。
其实我自己思考了思考,出现依然是4个在跑的原因,本质上就是内部的设置,收到了外部的影响,当然具体是什么,并不好说, 但是肯定是收到了影响的。所以他修改了这个内部的环境,就修正了。
参考
[1]Nested Parallelism
[2]Joblib nested Parallel execution not making use of available cores