我想用multiprocessing.Value
+ multiprocessing.Lock
在不同的进程之间共享一个计数器。例如:
import itertools as it
import multiprocessing
def func(x, val, lock):
for i in range(x):
i ** 2
with lock:
val.value += 1
print('counter incremented to:', val.value)
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
with multiprocessing.Pool() as pool:
pool.starmap(func, ((i, v, lock) for i in range(25)))
print(counter.value())
这将引发以下异常:
RuntimeError:同步对象只能在之间共享
通过继承进行处理
我最困惑的是,相关的(尽管不完全相似)模式与multiprocessing.Process()
:
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
procs = [multiprocessing.Process(target=func, args=(i, v, lock))
for i in range(25)]
for p in procs: p.start()
for p in procs: p.join()
现在,我认识到这是两件明显不同的事情:
- 第一个示例使用的工作进程数等于
cpu_count()
,并分割一个可迭代的range(25)
它们之间
- 第二个示例创建 25 个工作进程和任务,每个进程和任务都有一个输入
也就是说:我如何与以下人员共享实例pool.starmap()
(or pool.map()
) 以这种方式?
我见过类似的问题here, here, and here,但这些方法似乎不适合.map()
/.starmap()
, 不管是否Value
uses ctypes.c_int
.
我意识到这种方法在技术上是有效的:
def func(x):
for i in range(x):
i ** 2
with lock:
v.value += 1
print('counter incremented to:', v.value)
v = None
lock = None
def set_global_counter_and_lock():
"""Egh ... """
global v, lock
if not any((v, lock)):
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
if __name__ == '__main__':
# Each worker process will call `initializer()` when it starts.
with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
pool.map(func, range(25))
这真的是解决这个问题的最佳实践方法吗?