简短的摘要
截至 CY2023,此答案中描述的技术已经过时。这些天,您可以使用pebble, mpire or concurrent.futures.ProcessPoolExecutor()...
无论您使用哪种 python 并发工具,OP 问题的答案仍然有效,如下所示。
ProcessPoolExector()
不需要Pipe()
or Queue()
传达任务/结果。
原答案
-
A Pipe()只能有两个端点。
-
A Queue()可以有多个生产者和消费者。
何时使用它们
如果您需要两个以上的点进行通信,请使用Queue().
如果您需要绝对的性能,Pipe()速度更快,因为Queue()
是建立在Pipe()
.
性能基准测试
假设您想要生成两个进程并尽快在它们之间发送消息。这些是使用类似测试进行拉力赛的计时结果Pipe()
and Queue()
...
仅供参考,我输入了结果SimpleQueue() and JoinableQueue()作为奖励。
-
JoinableQueue()
考虑任务时queue.task_done()
被调用(它甚至不知道具体的任务,它只是计算队列中未完成的任务),这样queue.join()
知道工作已经完成。
这个答案底部的每个代码......
# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.9.2
$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.14316844940185547 seconds
Sending 100000 numbers to Pipe() took 1.3749017715454102 seconds
Sending 1000000 numbers to Pipe() took 14.252539157867432 seconds
$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.17014789581298828 seconds
Sending 100000 numbers to Queue() took 1.7723784446716309 seconds
Sending 1000000 numbers to Queue() took 17.758610725402832 seconds
$ python multi_simplequeue.py
Sending 10000 numbers to SimpleQueue() took 0.14937686920166016 seconds
Sending 100000 numbers to SimpleQueue() took 1.5389132499694824 seconds
Sending 1000000 numbers to SimpleQueue() took 16.871352910995483 seconds
$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.15144729614257812 seconds
Sending 100000 numbers to JoinableQueue() took 1.567549228668213 seconds
Sending 1000000 numbers to JoinableQueue() took 16.237736225128174 seconds
# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.7.0
(py37_test) [mpenning@mudslide ~]$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.13469791412353516 seconds
Sending 100000 numbers to Pipe() took 1.5587594509124756 seconds
Sending 1000000 numbers to Pipe() took 14.467186689376831 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.1897726058959961 seconds
Sending 100000 numbers to Queue() took 1.7622203826904297 seconds
Sending 1000000 numbers to Queue() took 16.89015531539917 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.2238149642944336 seconds
Sending 100000 numbers to JoinableQueue() took 1.4744081497192383 seconds
Sending 1000000 numbers to JoinableQueue() took 15.264554023742676 seconds
# This is on a ThinkpadT61 running Ubuntu 11.10, and Python 2.7.2
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
总之:
- 在Python 2.7下,
Pipe()
比a快约300%Queue()
。甚至不要考虑JoinableQueue()
除非你确实必须拥有这些好处。
- 在 python 3.x 下,
Pipe()
仍然比其他国家有(大约 20%)优势Queue()
s,但之间的性能差距Pipe()
and Queue()
不像 Python 2.7 中那样引人注目。各种种类Queue()
各实施方案之间的差异大约在 15% 以内。我的测试也使用整数数据。有些人评论说,他们发现多处理所使用的数据类型存在性能差异。
python 3.x 的底线:YMMV...考虑使用您自己的数据类型(即整数/字符串/对象)运行您自己的测试,以得出有关您自己感兴趣的平台和用例的结论.
我还应该提到,我的 python3.x 性能测试不一致并且有所不同。我在几分钟内进行了多次测试,以获得每种情况的最佳结果。我怀疑这些差异与在 VMWare/虚拟化下运行我的 python3 测试有关;然而,虚拟化诊断只是猜测。
*** 响应关于测试技术的评论 ***
评论里@JJCsaid:
更公平的比较是运行 N 个工作线程,每个工作线程通过点对点管道与主线程通信,与运行 N 个工作线程全部从单个点对多点队列拉取的性能相比。
本来这个答案只考虑了一名工人和一名生产者的表现;这是基准用例Pipe()
。您的评论需要为多个工作进程添加不同的测试。虽然这是对常见现象的有效观察Queue()
用例,它可以轻松地沿全新的轴分解测试矩阵(即添加具有不同数量的工作进程的测试)。
奖励材料 2
多重处理引入了信息流的微妙变化,这使得调试变得困难,除非您知道一些快捷方式。例如,您可能有一个脚本,在许多条件下通过字典进行索引时都可以正常工作,但在处理某些输入时很少会失败。
通常当整个Python进程崩溃时我们就能得到失败的线索;但是,如果多处理函数崩溃,您不会将未经请求的崩溃回溯打印到控制台。如果不知道进程崩溃的原因,追踪未知的多处理崩溃是很困难的。
我发现跟踪多处理崩溃信息的最简单方法是将整个多处理函数包装在一个try
/ except
并使用traceback.print_exc()
:
import traceback
def run(self, args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
现在,当您发现崩溃时,您会看到类似以下内容:
FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(args)
File "foo.py", line 46, in run
KeyError: 'that'
源代码:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in range(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_simplequeue.py
"""
from multiprocessing import Process, SimpleQueue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = SimpleQueue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to SimpleQueue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))