子类化multiprocessing.Process
:
但是我无法取回值,如何以这种方式使用队列?
流程需要一个Queue()
接收结果...如何子类化的示例multiprocessing.Process
接下来...
from multiprocessing import Process, Queue
class Processor(Process):
def __init__(self, queue, idx, **kwargs):
super(Processor, self).__init__()
self.queue = queue
self.idx = idx
self.kwargs = kwargs
def run(self):
"""Build some CPU-intensive tasks to run via multiprocessing here."""
hash(frozenset(self.kwargs.items())) # Shameless usage of CPU for no gain...
## Return some information back through multiprocessing.Queue
## NOTE: self.name is an attribute of multiprocessing.Process
self.queue.put("Process idx={0} is called '{1}'".format(self.idx, self.name))
if __name__ == "__main__":
NUMBER_OF_PROCESSES = 5
## Create a list to hold running Processor object instances...
processes = list()
q = Queue() # Build a single queue to send to all process objects...
for i in range(0, NUMBER_OF_PROCESSES):
p=Processor(queue=q, idx=i)
p.start()
processes.append(p)
# Incorporating ideas from this answer, below...
# https://stackoverflow.com/a/42137966/667301
[proc.join() for proc in processes]
while not q.empty():
print("RESULT: {0}".format(q.get())) # get results from the queue...
在我的机器上,这会导致...
$ python test.py
RESULT: Process idx=0 is called 'Processor-1'
RESULT: Process idx=4 is called 'Processor-5'
RESULT: Process idx=3 is called 'Processor-4'
RESULT: Process idx=1 is called 'Processor-2'
RESULT: Process idx=2 is called 'Processor-3'
$
# Using `multiprocessing.Pool`:
FWIW,我发现子类化的一个缺点multiprocessing.Process
是你无法利用所有内在的优点multiprocessing.Pool
; Pool
如果你不这样做的话,会给你一个非常好的APIneed您的生产者和消费者代码通过队列相互通信。
只需一些创造性的返回值,您就可以做很多事情...在下面的示例中,我使用dict()
封装输入和输出值pool_job()
...
from multiprocessing import Pool
def pool_job(input_val=0):
# FYI, multiprocessing.Pool can't guarantee that it keeps inputs ordered correctly
# dict format is {input: output}...
return {'pool_job(input_val={0})'.format(input_val): int(input_val)*12}
pool = Pool(5) # Use 5 multiprocessing processes to handle jobs...
results = pool.map(pool_job, xrange(0, 12)) # map xrange(0, 12) into pool_job()
print results
这导致:
[
{'pool_job(input_val=0)': 0},
{'pool_job(input_val=1)': 12},
{'pool_job(input_val=2)': 24},
{'pool_job(input_val=3)': 36},
{'pool_job(input_val=4)': 48},
{'pool_job(input_val=5)': 60},
{'pool_job(input_val=6)': 72},
{'pool_job(input_val=7)': 84},
{'pool_job(input_val=8)': 96},
{'pool_job(input_val=9)': 108},
{'pool_job(input_val=10)': 120},
{'pool_job(input_val=11)': 132}
]
显然还有很多其他方面需要改进pool_job()
,例如错误处理,但这说明了要点。供参考,这个答案 https://stackoverflow.com/a/20888445/667301提供了另一个如何使用的示例multiprocessing.Pool
.