multiprocessing.Queue 作为池工作程序的参数中止工作程序的执行

2023-11-29

我实际上发现很难相信我遇到了我遇到的问题,这似乎是 python 多处理模块中的一个大错误......无论如何,我遇到的问题是每当我通过将 multiprocessing.Queue 作为参数传递给 multiprocessing.Pool 工作线程,池工作线程永远不会执行其代码。即使在一个非常简单的测试中,我也能够重现这个错误,该测试是在 python 中找到的示例代码的稍微修改版本docs.

这是队列示例代码的原始版本:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])


if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()

这是我对队列示例代码的修改版本:

from multiprocessing import Queue, Pool

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    p.apply_async(f,args=(q,))
    print(q.get()) # prints "[42, None, 'hello']"
    p.close()
    p.join()

我所做的只是将 p 设置为大小为 1 的进程池,而不是 multiprocessing.Process 对象,结果是代码永远挂在 print 语句上,因为没有任何内容写入队列!当然,我以原始形式对其进行了测试,效果很好。我的操作系统是 Windows 10,Python 版本是 3.5.x,有人知道为什么会发生这种情况吗?

更新:仍然不知道为什么这个示例代码适用于 multiprocessing.Process 而不是 multiprocessing.Pool 但我发现了一个解决办法我对(亚历克斯·马泰利的回答)很满意。显然,您可以创建一个 multiprocessing.Queues 的全局列表并传递每个进程和索引以供使用,我将避免使用托管队列,因为它们速度较慢。感谢访客向我展示了链接。


Problem

你打电话时apply_async它返回一个AsyncResult对象并将工作负载分配给单独的线程(另请参阅这个答案)。该线程遇到的问题是Queue对象不能是pickled因此所请求的工作无法分发(并最终执行)。我们可以通过调用来看到这一点AsyncResult.get:

r = p.apply_async(f,args=(q,))
r.get()

这引发了RuntimeError:

RuntimeError: Queue objects should only be shared between processes through inheritance

然而这RuntimeError仅当您请求结果时才会在主线程中引发,因为它实际上发生在不同的线程中(因此需要一种传输方式)。

那么当你这样做时会发生什么

p.apply_async(f,args=(q,))

是目标函数f永远不会被调用,因为它的参数之一(q)不能被腌制。所以q从未收到物品并保持空状态,因此调用q.get在主线程中永远阻塞。

Solution

With apply_async您不必手动管理结果队列,但它们很容易以以下形式提供给您AsyncResult对象。因此,您可以修改代码以简单地从目标函数返回:

from multiprocessing import Queue, Pool

def f():
    return [42, None, 'hello']

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    result = p.apply_async(f)
    print(result.get())
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

multiprocessing.Queue 作为池工作程序的参数中止工作程序的执行 的相关文章

随机推荐