您在这里询问了一大堆不同的事情,所以我会尽力涵盖所有内容:
您传递给的函数callback
一旦工作进程返回结果,就会在主进程(而不是工作进程)中执行。它是在一个线程中执行的Pool
对象在内部创建。该线程消耗来自 a 的对象result_queue
,用于获取所有工作进程的结果。线程从队列中取出结果后,执行callback
。当回调正在执行时,无法从队列中提取其他结果,因此回调快速完成非常重要。以你的例子为例,一旦有一个电话X
or Y
你通过apply_async
完成后,结果将被放入result_queue
由工作进程执行,然后结果处理线程将从result_queue
, 和你的callback
将被执行。
其次,我怀疑您没有看到示例代码发生任何事情的原因是因为您的所有工作函数调用都失败了。如果工作函数失败,callback
永远不会被执行。除非您尝试从AsyncResult https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult调用返回的对象apply_async
。但是,由于您没有保存任何这些对象,因此您永远不会知道发生了故障。如果我是你,我会尝试使用pool.apply
当您进行测试时,以便您在错误发生时立即看到它们。
工作人员可能失败的原因(至少在您提供的示例代码中)是因为X
and Y
被定义为另一个函数内的函数。multiprocessing
通过在主进程中对它们进行 pickle 并在工作进程中取消它们来将函数和对象传递给工作进程。在其他函数内部定义的函数是不可picklable的,这意味着multiprocessing
将无法在工作进程中成功取消它们。要解决此问题,请在模块的顶层定义这两个函数,而不是嵌入到模块中dirwalker
功能。
你绝对应该继续打电话Z
from X
and Y
,不在results
。那样,Z
可以在所有工作进程中同时运行,而不必在主进程中一次运行一个调用。请记住,你的callback
函数应该尽可能快,这样就不会阻碍处理结果。执行中Z
在那里会减慢速度。
下面是一些与您正在执行的操作类似的简单示例代码,希望能让您了解代码应该是什么样子:
import multiprocessing as mp
import os
# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f):
fileinfo = Z(f)
return fileinfo
# Y() reads other types of files and does the same thing
def Y(f):
fileinfo = Z(f)
return fileinfo
# helper function
def Z(arr):
return arr + "zzz"
def dirwalker(directory):
ahlala = []
# results() is the callback function
def results(r):
ahlala.append(r) # or .append, haven't yet decided
for _,_,files in os.walk(directory):
pool = mp.Pool(mp.cpu_count())
for f in files:
if len(f) > 5: # Just an arbitrary thing to split up the list with
pool.apply_async(X, args=(f,), callback=results) # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :(
else:
pool.apply_async(Y, args=(f,), callback=results)
pool.close()
pool.join()
return ahlala
if __name__ == "__main__":
print(dirwalker("/usr/bin"))
Output:
['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ]
Edit:
您可以使用以下命令创建在父进程和子进程之间共享的 dict 对象multiprocessing.Manager
class:
pool = mp.Pool(mp.cpu_count())
m = multiprocessing.Manager()
helper_dict = m.dict()
for f in files:
if len(f) > 5:
pool.apply_async(X, args=(f, helper_dict), callback=results)
else:
pool.apply_async(Y, args=(f, helper_dict), callback=results)
然后使X
and Y
采用第二个参数称为helper_dict
(或者任何你想要的名字),一切就都准备好了。
需要注意的是,这是通过创建一个包含普通字典的服务器进程来实现的,并且所有其他进程都通过代理对象与该字典进行通信。因此,每次读取或写入字典时,您都在进行 IPC。这使得它比真正的听写慢很多。