如何编写使用两个队列的 Python 多进程脚本?:
- 一个作为工作队列,以一些数据开始,并且根据要并行化的函数的条件,动态接收更多任务,
- 另一个收集结果并用于在处理完成后写下结果。
我基本上需要根据我在初始项目中发现的内容在工作队列中放入更多任务。我在下面发布的示例很愚蠢(我可以根据需要转换该项目并将其直接放入输出队列中),但其机制很清晰并且反映了我需要开发的部分概念。
特此我的尝试:
import multiprocessing as mp
def worker(working_queue, output_queue):
item = working_queue.get() #I take an item from the working queue
if item % 2 == 0:
output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
else:
working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue
if __name__ == '__main__':
static_input = range(100)
working_q = mp.Queue()
output_q = mp.Queue()
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
for proc in processes:
proc.start()
for proc in processes:
proc.join()
for result in iter(output_q.get, None):
print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.
这不会结束也不会打印任何结果。
在整个过程结束时,我想确保工作队列为空,并且所有并行函数都已完成对输出队列的写入,然后再迭代后者以取出结果。您对如何使其发挥作用有什么建议吗?
下面的代码达到了预期的效果。它遵循@tawmas 提出的建议。
此代码允许在一个进程中使用多个内核,该进程要求向工作线程提供数据的队列可以在处理过程中由它们更新:
import multiprocessing as mp
def worker(working_queue, output_queue):
while True:
if working_queue.empty() == True:
break #this is the so-called 'poison pill'
else:
picked = working_queue.get()
if picked % 2 == 0:
output_queue.put(picked)
else:
working_queue.put(picked+1)
return
if __name__ == '__main__':
static_input = xrange(100)
working_q = mp.Queue()
output_q = mp.Queue()
results_bank = []
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
results_bank = []
while True:
if output_q.empty() == True:
break
results_bank.append(output_q.get_nowait())
print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
results_bank.sort()
print results_bank
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)