您对共享字典的用法与以下内容相同,并且可以重写为:
temp = shared_object['nested']
temp.update({'second_key': 'second_value'})
当您请求的值时nested
key来自共享字典,它存储在当前进程的内存空间中。这意味着您对此字典所做的任何更改都不会反映在管理器内存储的共享字典中,因为管理器在检索字典的子集后无法知道您对字典做了什么。因此,您需要做的就是通知经理您对嵌套字典进行了更改。您可以通过两种方式执行此操作:
使用更新后的值显式调用管理器对象
这可能是这里最简单(就性能而言也是最快)的修复,在对嵌套字典进行任何更改后,显式将键的值设置为更改后的字典:
temp = shared_object['nested']
temp.update({'second_key': 'second_value'})
shared_object['nested'] = temp
这样做可以让管理者知道被管理的对象发生了变化。
为嵌套字典创建另一个托管对象(手动与自动)
实现此目的的另一种方法是为嵌套字典创建托管对象并存储它。但是,请记住,如果对象的嵌套级别非常深,这很快就会变得不方便。这是因为对于每个嵌套字典,您需要创建另一个托管对象。此外,如果您创建的子进程是守护进程(使用时默认为multiprocessing.Pool
)并且他们需要在共享字典中添加自己的嵌套对象。这是因为要创建托管对象,您需要启动管理器进程,而守护进程无法生成自己的进程。
但是,如果您让共享字典足够负责,在有人尝试在其中存储嵌套字典时自动创建托管对象,那么这两个问题都可以很快得到解决。这消除了手动执行此操作的负担,下面给出了如何实现这一点的示例(它适用于列表和字典):
from multiprocessing.managers import SyncManager, MakeProxyType, ListProxy, State
from multiprocessing import Process, Lock
from collections import UserDict
from collections import UserList
class ManagerList(UserList):
def __check_state(self):
global manager
global lock
# Managers are not thread-safe, protect starting one from within another with a lock
with lock:
if manager._state.value != State.STARTED:
manager.start(initializer=init)
def __setitem__(self, key, value):
global manager
self.__check_state()
if isinstance(value, list):
value = manager.list(value)
elif isinstance(value, dict):
value = manager.dict(value)
return super().__setitem__(key, value)
class ManagerDict(UserDict):
def __check_state(self):
global manager
global lock
# Managers are not thread-safe, protect starting one from within another with a lock
with lock:
if manager._state.value != State.STARTED:
manager.start(initializer=init)
def __setitem__(self, key, value):
global manager
self.__check_state()
if isinstance(value, list):
value = manager.list(value)
elif isinstance(value, dict):
value = manager.dict(value)
return super().__setitem__(key, value)
ManagerDictProxy = MakeProxyType('DictProxy', (
'__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
'__setitem__', 'clear', 'copy', 'get', 'items',
'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
))
ManagerDictProxy._method_to_typeid_ = {
'__iter__': 'Iterator',
}
SyncManager.register('list', ManagerList, ListProxy)
SyncManager.register('dict', ManagerDict, ManagerDictProxy)
def init():
global manager
global lock
lock = Lock()
manager = SyncManager()
def worker(shared_object):
print(f'Before defining the nested dictionary: {shared_object}')
shared_object['nested'] = {
'first_key': 'first_value',
}
print(f'After defining the nested dictionary: {shared_object["nested"]}')
shared_object['nested'].update(
{
'second_key': 'second_value'
}
)
print(f'After updating the nested dictionary: {shared_object["nested"]}')
if __name__ == "__main__":
manager = SyncManager()
manager.start(initializer=init)
d = manager.dict() # It works with manager.list() too
p = Process(target=worker, args=(d, ))
p.start()
p.join()
这使用了__setitem__
dunder 方法检查分配给字典的值是否是另一个列表或字典,如果是,它会为它们创建一个托管对象并存储它。现在对这些嵌套字典/列表所做的任何更改也将反映在原始字典中。这适用于任意数量的嵌套级别。
但是,请记住,只有当数据类型为list
or dict
(或其子类)。此外,它还为每个嵌套级别创建一个管理器进程。因此,使用第一种方法总是比使用这种方法更快,尽管这种方法更方便。
Output
Before defining the nested dictionary: {}
After defining the nested dictionary: {'first_key': 'first_value'}
After updating the nested dictionary: {'first_key': 'first_value', 'second_key': 'second_value'}