我了解您链接的 SO 帖子中发生的情况。这对我来说很有意义。队列的大小有限
不完全是,队列只是管道的高级实现,允许同时多个读取器和写入器。这些底层管道的大小有限,在实现队列时被抽象出来。
但是,我仍然不明白为什么会这样,为什么有人会设计队列以这种方式运行?这是没有意义的,特别是如果只涉及一个进程的话
它更多的是一种设计结果比它是一个设计choice,稍后我将详细讨论这一点。至于为什么即使您通过单个进程传输数据,队列也会以这种方式运行,这是因为队列根本无法知道哪个进程将消耗项目。如果您只想从单个进程中放置和检索项目,那么您不应该使用multiprocessing.Queue
无论如何,因为它是专门为进程间通信创建的(考虑使用threading.Queue
反而)。
另外,正如您所说,如果队列足够小,仍然存在进程不会挂起/睡眠的异常情况。也许数据存储在一个简单的缓冲区中,直到它变得太大?
缓冲区是通过一个collections.deque
默认情况下,它们具有无限大小(仅受计算机拥有的物理内存的限制)。当后台“供给器”线程尝试将数据从缓冲区刷新到管道时(在此之后数据实际上被放入“队列”中),就会出现问题。当管道变满时,所有将数据放入管道中的调用(通过方法send https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection.send和同等的send_bytes https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection.send_bytes)将挂起,直到管道的另一端删除一些数据(当您调用时,这会在内部发生queue.get()
)。当这种死锁发生时,供给线程无法再响应主线程设置为 true 的退出条件(它通过在缓冲区末尾放置一个哨兵来通知线程在队列为垃圾时退出)集)。
这很重要,因为即使供给线程是守护线程,默认情况下主线程也会尝试加入它,以便队列可以正常关闭。并且由于供给线程在尝试清空缓冲区时被卡住,因此在有人清空缓冲区之前它永远不会加入queue.get()
足够的时间可以清空缓冲区。
这也解释了为什么您的案例会发生异常。基本上,如果放入队列的项目大小小于或等于管道允许的最大大小,则供给线程可以立即清空缓冲区,而无需等待其他人执行queue.get()
和死锁(所以即使你不这样做,进程也不会挂起queue.get()
如果您放入队列的数据量很小)。这本质上就是answer https://stackoverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em/31665635#31665635我链接到试图解释。
如果这一切仍然太抽象,那么您可以使用类似版本的代码来复制馈线线程如何被阻塞以及机器上管道的最大尺寸是多少。希望这有助于澄清一些事情:
import multiprocessing
def make_pipe(size):
"""
Function to check the maximum data a pipe can store. Try inputting a high value for arg size.
"""
w, r, = multiprocessing.Pipe()
for i in range(1, size):
print(i)
w.send(i) # If the argument size is high enough, this will eventually deadlock and the process will hang
return w, r
w, r = make_pipe(10000)
print('done')