Multiprocessing.Queue 当足够大时会挂起进程

2024-02-04

今天,我偶然发现了一些令人沮丧的行为multiprocessing.Queues.

这是我的代码:

import multiprocessing


def make_queue(size):
    ret = multiprocessing.Queue()
    for i in range(size):
        ret.put(i)
    return ret


test_queue = make_queue(3575)
print(test_queue.qsize())

当我运行此代码时,进程正常退出,退出代码为 0。

但是,当我将队列大小增加到 3576 或以上时,它会挂起。当我通过 Ctrl-C 向它发送 SIGINT 时,它会在此处引发错误:

Exception ignored in atexit callback: <function _exit_function at 0x7f91104f9360>
Traceback (most recent call last):
  File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/multiprocessing/util.py", line 360, in _exit_function
    _run_finalizers()
  File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/multiprocessing/util.py", line 300, in _run_finalizers
    finalizer()
  File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/multiprocessing/util.py", line 224, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/multiprocessing/queues.py", line 199, in _finalize_join
    thread.join()
  File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt: 

谁能解释一下这种行为吗?事实上,我已经对尺寸进行了实验,从大约 40 个不同尺寸的样本中,任何低于或等于 3575 的尺寸都可以正常工作,任何高于 3575 的尺寸都会挂起该过程。我认为这可能与队列大小(以字节为单位)有关,因为如果我插入i*i或一些随机字符串代替i,阈值发生变化。请注意,除非multiprocessing.Queue在后台做了一些可疑的事情,除了主进程之外,我没有创建任何其他进程。另外,添加test_queue.close()对结果没有影响。


我了解您链接的 SO 帖子中发生的情况。这对我来说很有意义。队列的大小有限

不完全是,队列只是管道的高级实现,允许同时多个读取器和写入器。这些底层管道的大小有限,在实现队列时被抽象出来。

但是,我仍然不明白为什么会这样,为什么有人会设计队列以这种方式运行?这是没有意义的,特别是如果只涉及一个进程的话

它更多的是一种设计结果比它是一个设计choice,稍后我将详细讨论这一点。至于为什么即使您通过单个进程传输数据,队列也会以这种方式运行,这是因为队列根本无法知道哪个进程将消耗项目。如果您只想从单个进程中放置和检索项目,那么您不应该使用multiprocessing.Queue无论如何,因为它是专门为进程间通信创建的(考虑使用threading.Queue反而)。

另外,正如您所说,如果队列足够小,仍然存在进程不会挂起/睡眠的异常情况。也许数据存储在一个简单的缓冲区中,直到它变得太大?

缓冲区是通过一个collections.deque默认情况下,它们具有无限大小(仅受计算机拥有的物理内存的限制)。当后台“供给器”线程尝试将数据从缓冲区刷新到管道时(在此之后数据实际上被放入“队列”中),就会出现问题。当管道变满时,所有将数据放入管道中的调用(通过方法send https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection.send和同等的send_bytes https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection.send_bytes)将挂起,直到管道的另一端删除一些数据(当您调用时,这会在内部发生queue.get())。当这种死锁发生时,供给线程无法再响应主线程设置为 true 的退出条件(它通过在缓冲区末尾放置一个哨兵来通知线程在队列为垃圾时退出)集)。

这很重要,因为即使供给线程是守护线程,默认情况下主线程也会尝试加入它,以便队列可以正常关闭。并且由于供给线程在尝试清空缓冲区时被卡住,因此在有人清空缓冲区之前它永远不会加入queue.get()足够的时间可以清空缓冲区。

这也解释了为什么您的案例会发生异常。基本上,如果放入队列的项目大小小于或等于管道允许的最大大小,则供给线程可以立即清空缓冲区,而无需等待其他人执行queue.get()和死锁(所以即使你不这样做,进程也不会挂起queue.get()如果您放入队列的数据量很小)。这本质上就是answer https://stackoverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em/31665635#31665635我链接到试图解释。

如果这一切仍然太抽象,那么您可以使用类似版本的代码来复制馈线线程如何被阻塞以及机器上管道的最大尺寸是多少。希望这有助于澄清一些事情:

import multiprocessing


def make_pipe(size):
    """
    Function to check the maximum data a pipe can store. Try inputting a high value for arg size.
    """

    w, r, = multiprocessing.Pipe()
    for i in range(1, size):
        print(i)
        w.send(i)  # If the argument size is high enough, this will eventually deadlock and the process will hang
    return w, r


w, r = make_pipe(10000)
print('done')
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Multiprocessing.Queue 当足够大时会挂起进程 的相关文章

随机推荐