如何将 python 字典与多处理同步

2024-05-05

我正在使用 Python 2.6 和用于多线程的多处理模块。现在我想要一个同步字典(其中我真正需要的唯一原子操作是值上的 += 运算符)。

我应该用 multiprocessing.sharedctypes.synchronized() 调用包装字典吗?或者是另一种方法?


Intro

似乎有很多扶手椅建议,但没有可行的示例。这里列出的答案都没有建议使用多处理,这有点令人失望和不安。作为Python爱好者,我们应该支持我们的内置库,虽然并行处理和同步从来都不是一件小事,但我相信通过适当的设计它可以变得微不足道。这在现代多核架构中变得极其重要,怎么强调都不为过!也就是说,我对多处理库还很不满意,因为它仍处于起步阶段,存在很多陷阱、错误,并且面向函数式编程(我讨厌这一点)。目前我还是比较喜欢Pyro https://pythonhosted.org/Pyro4/由于多处理的严重限制,无法在服务器运行时共享新创建的对象,因此模块(远远领先于它的时代)超过了多处理。管理器对象的“注册”类方法只会在管理器(或其服务器)启动之前实际注册对象。废话不多说,更多代码:

服务器.py

from multiprocessing.managers import SyncManager


class MyManager(SyncManager):
    pass


syncdict = {}
def get_dict():
    return syncdict

if __name__ == "__main__":
    MyManager.register("syncdict", get_dict)
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.start()
    raw_input("Press any key to kill server".center(50, "-"))
    manager.shutdown()

在上面的代码示例中,Server.py 使用多处理的 SyncManager,它可以提供同步共享对象。该代码无法在解释器中运行,因为多处理库对于如何为每个注册对象查找“可调用”非常敏感。运行 Server.py 将启动一个自定义的 SyncManager,它共享 syncdict 字典以供多个进程使用,并且可以连接到同一台计算机上的客户端,或者如果在环回以外的 IP 地址上运行,则可以连接到其他计算机。在本例中,服务器在端口 5000 上的环回 (127.0.0.1) 上运行。在操作syncdict 时,使用 authkey 参数可以使用安全连接。当按下任意键时,管理器将关闭。

客户端.py

from multiprocessing.managers import SyncManager
import sys, time

class MyManager(SyncManager):
    pass

MyManager.register("syncdict")

if __name__ == "__main__":
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.connect()
    syncdict = manager.syncdict()

    print "dict = %s" % (dir(syncdict))
    key = raw_input("Enter key to update: ")
    inc = float(raw_input("Enter increment: "))
    sleep = float(raw_input("Enter sleep time (sec): "))

    try:
         #if the key doesn't exist create it
         if not syncdict.has_key(key):
             syncdict.update([(key, 0)])
         #increment key value every sleep seconds
         #then print syncdict
         while True:
              syncdict.update([(key, syncdict.get(key) + inc)])
              time.sleep(sleep)
              print "%s" % (syncdict)
    except KeyboardInterrupt:
         print "Killed client"

客户端还必须创建一个自定义的 SyncManager,注册“syncdict”,这次无需传入可调用函数来检索共享字典。然后,它使用自定义的 SycnManager 进行连接,使用端口 5000 上的环回 IP 地址 (127.0.0.1) 以及与 Server.py 中启动的管理器建立安全连接的身份验证密钥。它通过调用管理器上注册的可调用函数来检索共享字典syncdict。它会提示用户执行以下操作:

  1. 要操作的syncdict中的键
  2. 每个周期增加键访问的值的量
  3. 每个周期的睡眠时间(以秒为单位)

然后客户端检查该密钥是否存在。如果没有,它会在syncdict上创建密钥。然后,客户端进入“无限”循环,其中按增量更新键的值,休眠指定的数量,并打印同步字典以重复此过程,直到发生键盘中断 (Ctrl+C)。

烦人的问题

  1. 管理器的注册方法必须在管理器启动之前调用,否则即使对管理器的 dir 调用将显示它确实具有已注册的方法,您也会收到异常。
  2. 字典的所有操作都必须使用方法而不是字典赋值来完成(syncdict["blast"] = 2 将由于多处理共享自定义对象的方式而惨败)
  3. 使用 SyncManager 的 dict 方法可以缓解恼人的问题 #2,但烦人的问题 #1 会阻止 SyncManager.dict() 返回的代理被注册和共享。 (SyncManager.dict() 只能在管理器启动后调用,而 register 只能在管理器启动之前工作,因此 SyncManager.dict() 仅在进行函数式编程并将代理作为参数传递给进程时有用,例如文档示例确实如此)
  4. 服务器和客户端都必须注册,尽管直观上看起来客户端在连接到管理器后才能弄清楚(请将其添加到您的多处理开发人员的愿望清单中)

Closing

我希望您和我一样喜欢这个相当彻底且稍微耗时的答案。我很难弄清楚为什么我在使用 Pyro 让它变得轻而易举的多处理模块上如此费力,现在多亏了这个答案,我已经一针见血了。我希望这对 python 社区如何改进多处理模块有用,因为我确实相信它有很大的前景,但在其起步阶段还没有达到可能的程度。尽管描述了烦人的问题,我认为这仍然是一个相当可行的替代方案,而且非常简单。您还可以使用 SyncManager.dict() 并将其作为参数传递给 Processes,按照文档显示的方式,这可能是一个更简单的解决方案,具体取决于您的要求,这对我来说感觉不自然。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 python 字典与多处理同步 的相关文章

  • 如何手动计算分类交叉熵?

    当我手动计算二元交叉熵时 我应用 sigmoid 来获取概率 然后使用交叉熵公式并平均结果 logits tf constant 1 1 0 1 2 labels tf constant 0 0 1 1 1 probs tf nn sigm
  • 使用特定的类/函数预加载 Jupyter Notebook

    我想预加载一个笔记本 其中包含我在另一个文件中定义的特定类 函数 更具体地说 我想用 python 来做到这一点 比如加载一个配置文件 包含所有相关的类 函数 目前 我正在使用 python 生成笔记本并在服务器上自动启动它们 因为不同的
  • 如何用python脚本控制TP LINK路由器

    我想知道是否有一个工具可以让我连接到路由器并关闭它 然后从 python 脚本重新启动它 我知道如果我写 import os os system ssh l root 192 168 2 1 我可以通过 python 连接到我的路由器 但是
  • 安装了 32 位的 Python,显示为 64 位

    我需要运行 32 位版本的 Python 我认为这就是我在我的机器上运行的 因为这是我下载的安装程序 当我重新运行安装程序时 它会将当前安装的 Python 版本称为 Python 3 5 32 位 然而当我跑步时platform arch
  • 处理 Python 行为测试框架中的异常

    我一直在考虑从鼻子转向行为测试 摩卡 柴等已经宠坏了我 到目前为止一切都很好 但除了以下之外 我似乎无法找出任何测试异常的方法 then It throws a KeyError exception def step impl contex
  • Python getstatusoutput 替换不返回完整输出

    我发现了这个很棒的替代品getstatusoutput Python 2 中的函数在 Unix 和 Windows 上同样有效 不过我觉得这个方法有问题output被构建 它只返回输出的最后一行 但我不明白为什么 任何帮助都是极好的 def
  • 跟踪 pypi 依赖项 - 谁在使用我的包

    无论如何 是否可以通过 pip 或 PyPi 来识别哪些项目 在 Pypi 上发布 可能正在使用我的包 也在 PyPi 上发布 我想确定每个包的用户群以及可能尝试积极与他们互动 预先感谢您的任何答案 即使我想做的事情是不可能的 这实际上是不
  • 将 python2.7 与 Emacs 24.3 和 python-mode.el 一起使用

    我是 Emacs 新手 我正在尝试设置我的 python 环境 到目前为止 我已经了解到在 python 缓冲区中使用 python mode el C c C c将当前缓冲区的内容加载到交互式 python shell 中 显然使用了什么
  • 使用Python请求登录Google帐户

    在多个登录页面上 需要谷歌登录才能继续 我想用requestspython 中的库以便让我自己登录 通常这很容易使用requests库 但是我无法让它工作 我不确定这是否是由于 Google 做出的一些限制 也许我需要使用他们的 API 或
  • 使用字典映射数据帧索引

    为什么不df index map dict 工作就像df column name map dict 这是尝试使用index map的一个小例子 import pandas as pd df pd DataFrame one A 10 B 2
  • 立体太阳图 matplotlib 极坐标图 python

    我正在尝试创建一个与以下类似的简单的立体太阳路径图 http wiki naturalfrequent com wiki Sun Path Diagram http wiki naturalfrequency com wiki Sun Pa
  • 使用 xlrd 打开 BytesIO (xlsx)

    我正在使用 Django 需要读取上传的 xlsx 文件的工作表和单元格 使用 xlrd 应该可以 但因为文件必须保留在内存中并且可能不会保存到我不知道如何继续的位置 本例中的起点是一个带有上传输入和提交按钮的网页 提交后 文件被捕获req
  • 如何在不丢失注释和格式的情况下更新 YAML 文件 / Python 中的 YAML 自动重构

    我想在 Python 中更新 YAML 文件值 而不丢失 Python 中的格式和注释 例如我想改造 YAML 文件 value 456 nice value to value 6 nice value 界面类似于 y yaml load
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J
  • Pandas 将多行列数据帧转换为单行多列数据帧

    我的数据框如下 code df Car measurements Before After amb temp 30 268212 26 627491 engine temp 41 812730 39 254255 engine eff 15
  • 根据列 value_counts 过滤数据框(pandas)

    我是第一次尝试熊猫 我有一个包含两列的数据框 user id and string 每个 user id 可能有多个字符串 因此会多次出现在数据帧中 我想从中导出另一个数据框 一个只有那些user ids列出至少有 2 个或更多string
  • Django-tables2 列总计

    我正在尝试使用此总结列中的所有值文档 https github com bradleyayers django tables2 blob master docs pages column headers and footers rst 但页
  • cv2.VideoWriter:请求一个元组作为 Size 参数,然后拒绝它

    我正在使用 OpenCV 4 0 和 Python 3 7 创建延时视频 构造 VideoWriter 对象时 文档表示 Size 参数应该是一个元组 当我给它一个元组时 它拒绝它 当我尝试用其他东西替换它时 它不会接受它 因为它说参数不是
  • 使用 z = f(x, y) 形式的 B 样条方法来拟合 z = f(x)

    作为一个潜在的解决方案这个问题 https stackoverflow com questions 76476327 how to avoid creating many binary switching variables in gekk
  • 使用随机放置的 NaN 创建示例 numpy 数组

    出于测试目的 我想创建一个M by Nnumpy 数组与c随机放置的 NaN import numpy as np M 10 N 5 c 15 A np random randn M N A mask np nan 我在创建时遇到问题mas

随机推荐