跨进程共享多处理同步原语

2024-04-20

(Python 3.4、Linux)。

我有一个主进程“P”,它分叉 8 个进程(“C1”到“C8”)。我想创建multiprocessing.Barrier确保所有 8 个子进程在某个时刻保持同步。

如果我在父进程中定义同步原语,那么一切都会正常工作,这样当我分叉子进程时,它就会被正确继承:

import multiprocessing as mp
barrier = mp.Barrier(8)

def f():
  # do something
  barrier.wait()
  # do more stuff

def main():
  for i in range(8):
    p = mp.Process(target = f)
    p.start()

if __name__ == '__main__':
  main()

但就我而言,我不知道创建所需的详细信息Barrier对象直到子进程启动之后(我不知道我想要作为其传递的参数action范围)。因此,我想创建Barrier在一个子进程中,但我不知道如何使其可供其他子进程使用。以下当然行不通,因为 8Barrier子进程中的对象彼此完全独立:

import multiprocessing as mp

def f():
  global barrier
  # do something
  barrier = mp.Barrier(8)
  barrier.wait()
  # do more stuff

def main():
  for i in range(8):
    p = mp.Process(target = f)
    p.start()

if __name__ == '__main__':
  main()

我想创造barrier在一个子进程中并将其传递给其他子进程multiprocessing.Queue (or if Queue不接受Barrier对象,使用multiprocessing.Manager().Barrier)。然而,即使这有效,我也不知道如何确保实际上只有一个进程put将同步原语(7 个副本)放入队列,而其他同步原语只会get他们。 (当然,我可以在父进程中创建另一个同步原语来做到这一点,但是我也可以重构我的代码来创建原始的Barrier毕竟在父进程中。)


下面是一个示例,说明如何通过创建multiprocessing.managers.BaseManager在一个孩子中,然后从所有其他孩子连接到该经理。请注意,它需要传递一个multiprocessing.Lock从父级到所有子级以实现同步目的,您提到您希望避免这种情况。不过,我不确定是否还有其他选择。

import multiprocessing as mp
from multiprocessing.managers import BaseManager

class MyManager(BaseManager):
    pass

def f(lock):
  # do something
  with lock:
      try:
          MyManager.register('get_barrier')
          m = MyManager(address=('localhost', 5555), authkey=b'akey')
          m.connect()
          b = m.get_barrier()
          print("Got the barrier from the manager")
      except OSError as e:
          # We are the first. Create the manager, register
          # a mp.Barrier instance with it, and start it up.
          print("Creating the manager...")
          b = mp.Barrier(8)
          MyManager.register('get_barrier', callable=lambda:b)
          m = MyManager(address=('localhost', 5555), authkey=b'akey')
          m.start()
  b.wait()
  print("Done!")
  # do more stuff

def main():
    lock = mp.Lock()
    for i in range(8):
        p = mp.Process(target=f, args=(lock,))
        p.start()

if __name__ == '__main__':
  main()

Output:

Creating the manager...
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Got the barrier from the manager
Done!
Done!
Done!
Done!
Done!
Done!
Done!
Done!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

跨进程共享多处理同步原语 的相关文章

随机推荐