只是为了讨论“记住”问题:
another_module.global_variable = []
p = multiprocessing.pool.Pool(processes=os.cpu_count())
def dispatcher():
another_module_global_variable = huge_list
params = range(len(another_module.global_variable))
multiprocessing_result = list(p.imap_unordered(big_function, params))
return multiprocessing_result
问题似乎出在您创建时Pool
实例。
这是为什么?
这是因为当你创建实例时Pool
,它确实设置了工作线程的数量(默认情况下等于 CPU 核心的数量),并且它们都在那时启动(分叉)。这意味着工人拥有父母全球状态的副本(并且another_module.global_variable
除其他事项外),并且使用写时复制策略,当您更新another_module.global_variable
你在父母的过程中改变它。工人对旧值有参考。这就是为什么你有问题。
以下是几个链接,可以为您提供更多解释:this https://stackoverflow.com/a/42149043 and this https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods.
这是一个小片段,您可以在其中切换全局变量值更改的行和启动进程的行,并检查子进程中打印的内容。
from __future__ import print_function
import multiprocessing as mp
glob = dict()
glob[0] = [1, 2, 3]
def printer(a):
print(globals())
print(a, glob[0])
if __name__ == '__main__':
p = mp.Process(target=printer, args=(1,))
p.start()
glob[0] = 'test'
p.join()
这是Python2.7代码,但它也适用于Python3.6。
这个问题的解决方案是什么?
好吧,回到第一个解决方案。您更新导入模块变量的值,然后创建进程池。
现在真正的问题是缺乏加速。
这是有趣的部分文档 https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled关于如何腌制函数:
请注意,函数(内置函数和用户定义函数)是通过“完全
合格的”名称引用,而不是值。这意味着只有
函数名称与模块名称一起被腌制
函数是在中定义的。既不是函数的代码,也不是它的任何代码
函数属性被腌制。因此定义模块必须是
可在 unpickling 环境中导入,并且该模块必须包含
命名对象,否则将引发异常。
这意味着您的函数酸洗不应该是一个浪费时间的过程,或者至少不是一个浪费时间的过程。导致缺乏加速的原因是,对于您传递给的列表中的约 600 个数据对象imap_unordered
调用时,您将它们中的每一个传递给一个工作进程。再次,底层实现multiprocessing.Pool
可能是这个问题的原因。
如果你深入multiprocessing.Pool
实施后,你会看到两个Threads
using Queue
正在处理父进程和所有子(工作)进程之间的通信。因此,所有进程都不断需要函数参数并不断返回响应,最终导致父进程非常繁忙。这就是为什么“大量”时间花在“分派”工作上,将数据传入或传出工作进程。
对此该怎么办?
尝试随时增加工作进程中进程的数据对象数量。在您的示例中,您一个接一个地传递数据对象,并且您可以确保每个工作进程在任何时候都只处理一个数据对象。为什么不增加传递给工作进程的数据对象的数量?这样,您可以使每个进程更加繁忙,处理 10 个、20 个甚至更多的数据对象。据我所见,imap_unordered
has an chunksize
争论。它设置为1
默认情况下。尝试增加它。像这样的事情:
import multiprocessing.pool
from contextlib import closing
import os
def big_function(params):
results = []
for p in params:
results.append(process(another_module.global_variable[p]))
return results
def dispatcher():
# sharing read-only global variable taking benefit from Unix
# which follows policy copy-on-update
# https://stackoverflow.com/questions/19366259/
another_module.global_variable = huge_list
# send indices
params = range(len(another_module.global_variable))
with closing(multiprocessing.pool.Pool(processes=os.cpu_count())) as p:
multiprocessing_result = list(p.imap_unordered(big_function, params, chunksize=10))
return multiprocessing_result
几个建议:
- 我看到你创造了
params
作为索引列表,您可以使用它来选择特定的数据对象big_function
。您可以创建代表第一个和最后一个索引的元组并将它们传递给big_function
。这可能是增加工作量的一种方式。这是我上面提出的方法的另一种方法。
- 除非你明确喜欢
Pool(processes=os.cpu_count())
,可以省略。默认情况下它需要 CPU 核心数。
对于答案的长度或可能潜入的任何拼写错误,我们深表歉意。