来自一个多处理管理器的多个队列

2024-01-07

我正在编写一个将使用 python 的多处理和线程模块的脚本。 为了您的理解,我生成了与可用核心一样多的进程,并且在每个进程中我启动了例如25 个线程。 每个线程消耗一个input_queue并产生output_queue。 对于我使用的队列对象multiprocessing.Queue.

在我的第一次测试之后,我遇到了死锁,因为负责馈送和刷新队列的线程挂起。过了一段时间我发现我可以使用Queue().cancel_join_thread()解决这个问题。

但由于数据丢失的可能性,我想使用:multiprocessing.Manager().Queue()

现在实际问题: 为每个队列使用一个管理器对象是否更好?或者我应该创建一个管理器并从同一个管理器对象获取两个查询?

# One manager for all queues
import multiprocessing

manager = multiprocessing.Manager()
input_queue = manager.Queue()
output_queue = manager.Queue()

...Magic...

# As much managers as queues
manager_in = multiprocessing.Manager()
queue_in = manager_in.Queue()

manager_out = multiprocessing.Manager()
queue_out = manager_out.Queue()

...Magic...

感谢您的帮助。


无需使用两个单独的Manager对象。正如您已经看到的Managerobject 允许在多个进程之间共享对象;来自docs https://docs.python.org/2/library/multiprocessing.html#managers:

管理器提供了一种创建可在不同流程之间共享的数据的方法。管理器对象控制管理共享对象的服务器进程。其他进程可以使用代理访问共享对象。

因此,如果您有两个不同的队列,您仍然可以使用相同的管理器。如果它对某人有帮助,这里是一个简单的示例,使用两个队列和一个管理器:

from multiprocessing import Manager, Process
import time


class Worker(Process):
    """
    Simple worker.
    """

     def __init__(self, name, in_queue, out_queue):
        super(Worker, self).__init__()
        self.name = name
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        while True:
            # grab work; do something to it (+1); then put the result on the output queue
            work = self.in_queue.get()
            print("{} got {}".format(self.name, work))
            work += 1

            # sleep to allow the other workers a chance (b/c the work action is too simple)
            time.sleep(1)

            # put the transformed work on the queue
            print("{} puts {}".format(self.name, work))
            self.out_queue.put(work)


if __name__ == "__main__":
    # construct the queues
    manager = Manager()
    inq = manager.Queue()
    outq = manager.Queue()

    # construct the workers
    workers = [Worker(str(name), inq, outq) for name in range(3)]
    for worker in workers:
        worker.start()

    # add data to the queue for processing
    work_len = 10
    for x in range(work_len):
        inq.put(x)

    while outq.qsize() != work_len:
        # waiting for workers to finish
        print("Waiting for workers. Out queue size {}".format(outq.qsize()))
        time.sleep(1)

    # clean up
    for worker in workers:
        worker.terminate()

    # print the outputs
    while not outq.empty():
        print(outq.get())

使用两个管理器,如下所示:

# construct the queues
manager1 = Manager()
inq = manager1.Queue()
manager2 = Manager()
outq = manager2.Queue()

有效,但没有必要。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

来自一个多处理管理器的多个队列 的相关文章

随机推荐