我们先看一下程序的结尾。
多处理模块使用atexit
打电话multiprocessing.util._exit_function
当你的程序结束时。
如果您删除g2.next()
,你的程序很快结束。
The _exit_function
最终打电话Pool._terminate_pool
。主线程改变了状态pool._task_handler._state
from RUN
to TERMINATE
。与此同时pool._task_handler
线程正在循环Pool._handle_tasks
当达到条件时退出
if thread._state:
debug('task handler found thread._state != RUN')
break
(参见/usr/lib/python2.6/multiprocessing/pool.py)
这就是阻止任务处理程序完全消耗生成器的原因,g()
。如果你看进去Pool._handle_tasks
你会看到的
for i, task in enumerate(taskseq):
...
try:
put(task)
except IOError:
debug('could not put task on queue')
break
这是消耗您的生成器的代码。 (taskseq
不完全是你的发电机,但作为taskseq
被消耗了,你的发电机也被消耗了。)
相反,当您调用g2.next()
主线程调用IMapIterator.next
,并在到达时等待self._cond.wait(timeout)
.
主线程正在等待而不是
呼叫_exit_function
是允许任务处理程序线程正常运行的原因,这意味着完全消耗生成器put
中的任务worker
s' inqueue
in the Pool._handle_tasks
功能。
底线是所有Pool
映射函数消耗给定的整个可迭代对象。如果您想分块使用生成器,您可以这样做:
import multiprocessing as mp
import itertools
import time
def g():
for el in xrange(50):
print el
yield el
def f(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
pool = mp.Pool(processes=4) # start 4 worker processes
go = g()
result = []
N = 11
while True:
g2 = pool.map(f, itertools.islice(go, N))
if g2:
result.extend(g2)
time.sleep(1)
else:
break
print(result)