Python跨进程共享数据/对象

2023-05-16

1. 跨进程共享方式

在multiprocess库中,跨进程对象共享有三种方式:

(1)第一种仅适用于原生机器类型,即python.ctypes当中的类型,这种在mp库的文档当中称为shared memory方式,即通过共享内存共享对象

(2)另外一种称之为server process,即有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象;

(3)最后一种在mp文档当中没有单独提出,但是在其中多次提到,而且是mp库当中最重要的一种共享方式,称为inheritance,即继承,对象在 父进程当中创建,然后在父进程是通过multiprocessing.Process创建子进程之后,子进程自动继承了父进程当中的对象,并且子进程对这 些对象的操作都是反映到了同一个对象。

2. Shared Memory模型

shared memory模型能共享ctypes当中的类型,通过RawValue,RawArray等包装类提供。通过查看multiprocess的源码可以看到支持的类型有:

Value、Array、Lock等,

def Event() -> synchronize.Event: ...
def Lock() -> synchronize.Lock: ...
def RLock() -> synchronize.RLock: ...
def Semaphore(value: int = ...) -> synchronize.Semaphore: ...
def Pipe(duplex: bool = ...) -> Tuple[connection.Connection, connection.Connection]: ...
def Pool(
    processes: Optional[int] = ...,
    initializer: Optional[Callable[..., Any]] = ...,
    initargs: Iterable[Any] = ...,
    maxtasksperchild: Optional[int] = ...,
) -> pool.Pool: ...

# Functions Array and Value are copied from context.pyi.
# See https://github.com/python/typeshed/blob/ac234f25927634e06d9c96df98d72d54dd80dfc4/stdlib/2and3/turtle.pyi#L284-L291
# for rationale
def Array(typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...) -> sharedctypes._Array: ...
def Value(typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value: ...

共享的使用方法示例如下:

import multiprocessing


def func(num):
    num.value=11.11  # 子进程改变数值的值,主进程跟着改变


if __name__=="__main__":
    # d表示数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value)
    num=multiprocessing.Value("d", 10.0) 
    p=multiprocessing.Process(target=func,args=(num,))
    p.start()
    p.join()

    print(num.value)

3. Server Process模型

这个模式支持跨进程共享所有对象,也即是想要共享 “自定义对象”,只能使用这个方式!

server process模型中,有一个manager进程(就是那个server进程),负责管理实际的对象,真正的对象也是在manager进程的内存空间中。所有需要访问该对象的进程都需要先连接到该管理进程,然后获取到对象的一个代理对象(Proxy object)。这个模型是一个典型的RPC(远程过程调用)的模型。因为每个客户进程实际上都是在访问manager进程当中的对象,因此完全可以通过这个实现对象共享。

(1)Manager支持类型

通过查看源码可以发现,Manage() 支持的类型有:

def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...
def Condition(self, lock: Any = ...) -> threading.Condition: ...
def Event(self) -> threading.Event: ...
def Lock(self) -> threading.Lock: ...
def Namespace(self) -> _Namespace: ...
def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ...
def RLock(self) -> threading.RLock: ...
def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...
def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...
def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ...
def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ...
def list(self, sequence

使用示例如下:

import multiprocessing


def func(dict_in,list_in):
    # 跨进程共享, 子进程修改,主进程跟着改变
    dict_in["index1"]="xxx"

    list_in.append("xx")
    list_in.append("yy")


if __name__=="__main__":
with multiprocessing.Manager() as mg:
    # 创建主进程与子进程之间共享的dict/list
    mydict=multiprocessing.Manager().dict()
    mylist=multiprocessing.Manager().list(range(5))
    p=multiprocessing.Process(target=func,args=(mydict,mylist))
    p.start()
    p.join()
    
    print(mylist)
    print(mydict)

(2)共享自定义类

很多场景下,Manager自带的类并不能满足我们的需求,这时候就需要用到Manager对自定义类的支持。Server Process模型共享自定义对象的实现流程如下:

(1) 基于multiprocessing.managers 重写MyManager,类内部啥都不用实现:

class MyManager(managers.BaseManager):
    """
    自定义Manager
    """
    # Pass is really enough. Nothing needs to be done here.
    pass

(2) 注册自定义类,如:RedisService、MySQLService,

# RedisService/MySQLService是自定义类, 类内部分别包含Redis连接和MySQL连接,类定义此处省略
MyManager.register("RedisService", RedisService)
 MyManager.register("MySQLService", MySQLService)

(3) 构造MyManager的实例,并由它创建多进程共享的自定义对象,

manager = MyManager()
manager.start()

# 创建共享对象
self.redis_service = manager.RedisService(settings)
self.mysql_service = manager.MySQLService(settings)

(4) 该对象以参数形式传入到子进程中,子进程直接使用。

全流程的代码示例如下:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import managers


class MyManager(managers.BaseManager):
    """
    自定义Manager
    """
    # Pass is really enough. Nothing needs to be done here.
    pass

def proc_worker(redis_service, mysql_service, task_id):
    """
    TODO::工作进程
    """
    
    # 此处可直接使用进程池共享的redis和mongo服务
    rst = redis_service.get(task_id)
    rst = mysql_service.save_result(rst)
    ...

class ServerExecutor:
    """
    调度执行器
    """
    def __init__(self, settings):
        # 配置信息, 用于创建redis链接和mysql链接
        self.settings = settings

        # 在Manager中注册自定义类(RedisService/MySQLService是我的自定义类, 类内部分别包含Redis连接和MySQL连接)
        MyManager.register("RedisService", RedisService)
        MyManager.register("MySQLService", MySQLService)
        manager = MyManager()
        manager.start()

        # 创建共享对象
        self.redis_service = manager.RedisService(settings)
        self.mysql_service = manager.MySQLService(settings)

        # 这里不仅可以是ProcessPoolExecutor,也可以是多进程Process或者进程池Pool,各自用法略有不同
        self.executor = ProcessPoolExecutor(settings.executor_num)

    def submit(self, task_id):
        """
        提交任务
        """
        future = self.executor.submit(proc_work,
                                      self.redis_service,
                                      self.mysql_service,
                                      task_id)
        return future


# demo
...
executor = ServerExecutor(settings)
future = executor.submit(task_id)
future.add_done_callback(task_done_callback)
...

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python跨进程共享数据/对象 的相关文章

  • 使用 urllib2 进行 Python 身份验证

    所以我尝试使用 python 从名为 vsearch cisco com 的网站下载文件 python Connects to the Cisco Server and Downloads files at the URL specifie
  • 使用 Pillow 和 Numpy 进行图像推导

    I have two images and 我想导出一个只有红色 Hello 的图像 例如 所以我正在运行一个简单的推导python脚本 from PIL import Image import numpy as np root root
  • 如何使 Django ManyToMany “直通”查询更加高效?

    我使用的是 ManyToManyField 和 through 类 这会在获取事物列表时产生大量查询 我想知道是否有更有效的方法 例如 这里有一些描述书籍及其几位作者的简化类 它们通过角色类 定义 编辑器 插画家 等角色 class Per
  • 如何跳过财务图中的空日期(周末)

    ax plot date dates dates highs lows 我目前正在使用此命令来绘制财务高点和低点Matplotlib http en wikipedia org wiki Matplotlib 效果很好 但如何删除 x 轴上
  • 合并数据框中的值以写入 Excel

    我有一个看起来像的数据框 column1 column2 column3 colum4 column5 1 r n 1 r s 1 r n 2 r s 3 r n 3 2 r n 1 r s 1 r n 4 r s 4 r n 5 3 r
  • 是否可以在 Sphinx 中隐藏 Python 函数参数?

    假设我有以下函数 该函数记录在Numpydoc 风格 https github com numpy numpy blob master doc HOWTO DOCUMENT rst txt 并且文档是自动生成的Sphinx http sph
  • HoughLinesP后如何合并线?

    My task is to find coordinates of lines startX startY endX endY and rectangles 4 lines Here is input file 我使用下一个代码 img c
  • 使用 Python 的 optparse 模块时如何遵守 PEP 257 文档字符串?

    根据PEP 257 http www python org dev peps pep 0257 multi line docstrings命令行脚本的文档字符串应该是它的使用消息 脚本的文档字符串 a 独立程序 应该可用 作为其 使用 消息
  • S3 选择检索 CSV 中的标头

    我尝试使用以下代码从存储在 S 存储桶中的 CSV 中获取记录子集 s3 boto3 client s3 bucket bucket file name file sql stmt SELECT S FROM s3object S LIMI
  • 使用Python进行图像识别[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有一个想法 就是我想识别图像中的字母 可能是 bmp或 jpg 例如 这是一个包含字母 S 的 bmp 图像 我想做的是使用Pyth
  • 如何在返回的 AJAX 调用上使用 django 模板标签?

    我有一个简单的 AJAX 脚本 它在名为的搜索字段中获取输入的字符串AJAXBox并调用一个视图函数 该函数使用过滤器查询数据库并返回与输入参数匹配的所有 User 对象的查询集 当我使用 django 模板标签迭代查询集时 它不起作用 我
  • 将查询参数添加到 URL

    我正在尝试自动从网站下载数据 我需要将动态参数传递到每天更改的站点 html 的结构是表格而不是表单 如何传递参数并从 url 获取结果 这是我尝试过的 它需要在 python 2 7 中 import urllib url https d
  • 如何在 Python 中将 EXR 文件的 float16 转换为 uint8

    我正在使用 OpenEXR 读取 Python 中的 EXR 文件 我有带有半数据 float16 的 R G 和 B 通道 我尝试使用 Numpy 将数据从 float16 转换为 uint8 0 255 颜色 但没有成功 rCh get
  • 使用 .map() 在 pandas DataFrame 中高效创建附加列

    我正在分析形状与以下示例类似的数据集 我有两种不同类型的数据 abc数据和xyz data abc1 abc2 abc3 xyz1 xyz2 xyz3 0 1 2 2 2 1 2 1 2 1 1 2 1 1 2 2 2 1 2 2 2 3
  • Learning_rate 不是合法参数

    我正在尝试通过实现 GridSearchCV 来测试我的模型 但我似乎无法在 GridSearch 中添加学习率和动量作为参数 每当我尝试通过添加这些代码来执行代码时 我都会收到错误 这是我创建的模型 def define model op
  • 与 GNU Make 等 Python 相关的并行任务并发

    我正在寻找一种方法或者可能是一种哲学方法来如何在 python 中执行类似 GNU Make 的操作 目前 我们使用 makefile 来执行处理 因为 makefile 非常擅长通过更改单个选项 j x 进行并行运行 此外 gnu mak
  • 在Python中将罗马数字转换为整数

    根据 user2486 所说 这是我当前的代码 def romanMap map M 1000 CM 900 D 500 CD 400 C 100 XC 90 L 50 XL 40 X 10 IX 9 V 5 V 4 I 1 return
  • 用户的完整 UNIX 用户名

    想知道您是否知道是否有一种巧妙的方法可以从 shell 获取完整的用户名 示例 如果我的 UNIX 用户名是 froyo 那么我想获取我的全名 在本例中 如系统中注册的那样 froyo Abhishek Pratap Finger 命令可以
  • Python 可以替代 Java 小程序吗?

    除了制作用于物理模拟 如抛射运动 重力等 的教育性 Java 小程序之外 还有其他选择吗 如果你想让它在浏览器中运行 你可以使用PyJamas http pyjs org 这是一个 Python 到 Javascript 的编译器和工具集
  • Matplotlib 渲染日期、图像的问题

    我在使用 conda forge 的 Matplotlib v 3 1 3 和 python 3 7 时遇到问题 我拥有 Matplotlib 所需的所有依赖项 当我输入这段代码时 它应该可以工作 我得到了泼溅艺术 它基于此 YouTube

随机推荐

  • linux中出错处理

    linux中 xff0c 在支持多线程的环境中 xff0c 通常每个线程都有属于自己的errno变量 xff0c 是用来表示特定错误的常量 以下是 lt errno h gt 中定义的所有出错errno常量 define EPERM 1 O
  • linux-capabilities

    导航 返回顶部 1 man capabilities 1 1 Capabilities 功能1 2 全面实施功能需要 在内核2 6 24之前 xff0c 仅满足前两个要求 xff1b 从内核2 6 24开始 xff0c 所有这三个要求都得到
  • Arch-base-vs-iso

    Arch base vs iso Arch base vs iso 通常绝大多数的Linux分发版的iso镜像本身 iso文件都有约2Gb上下 都可以直接启动电脑并运行完整的Linux桌面系统 极少数的Linux发行版仅提供命令行界面 xf
  • Arch系统软件列表

    Arch系统软件列表 1 安装统计 2 安装列表 3 安装说明 4 作为依赖项的安装列表 5 更正 mangaro使用减的方式安装系统 开箱即用的豪华版本 xff0c 大部分人需要的都有了 xff0c 同样包括个别用户不需要的 xff0c
  • 服务器如何设置多用户登录?Windows服务器多界面设置方法

    当你在使用服务器时是否有遇到这样一个问题 xff1f 当你正在服务器里进行工作时 xff0c 突然一个小伙伴在没有告知你的情况下进入了服务器里 xff0c 导致你服务器失去连接了 xff0c 这种情况是非常常见的现象 主要原因就是因为服务器
  • 运算符:&和&&的区别

    amp 运算符有两种用法 xff1a 1 按位与 xff1b 2 逻辑与 amp amp 运算符是短路与运算 逻辑与跟短路与的差别是非常巨大的 xff0c 虽然二者都要求运算符左右两端的布尔值都是true整个表达式的值才是true amp
  • python删除txt文本文件第一行数据

    txt文本单次删除第一行 txt文本单次删除第一行 with open 39 test txt 39 mode 61 39 r 39 encoding 61 39 utf 8 39 as f line 61 f readlines 读取文件
  • Android WebView 网页使用本地字体

    要求在网页里面调用android app中assets目录下的某个字体文件 网页加载通常有两种方式 xff1a 1 loadDataWithBaseURL 2 loadUrl 一 loadDataWithBaseURL 网页中直接使用fil
  • Blazor入门100天 : 身份验证和授权 (4) - 自定义字段

    目录 建立默认带身份验证 Blazor 程序角色 组件 特性 过程逻辑DB 改 Sqlite将自定义字段添加到用户表脚手架拉取IDS文件 本地化资源freesql 生成实体类 freesql 管理ids数据表初始化 Roles freesq
  • 若依前后端分离版,图片上传后无法显示问题

    若依前后端分离版 xff0c 部署时通常会采用Nginx做反向代理 访问图片出现404 Not Found问题 若依在文件上传成功后 xff0c 请求访问后台地址会根据profile进行匹配 xff0c 需要自己配置代理 配置方式一 xff
  • Vue项目使用history模式,打包部署到二级目录

    需求 xff1a Vue项目使用history模式 xff0c 打包部署到服务器上的二级目录 示例 xff1a your host name h5 index html 原理 xff1a 把页面地址所有含有 h5 的url重定向到 xff1
  • Springboot 服务jar的外部指定端口和文件方式

    在命令行中指定启动端口 java jar xxx jar server port 61 9000 在命令行中指定启动端口与配置文件 java jar xxx jar server port 61 8980 spring profiles a
  • 【Android】本地图片选择(打开媒体库,选择图片)

    在此调查中我要实现的是 xff1a 点击Pictures 按钮后 xff0c 获取手机内所有图片 xff0c 选择某一个图片 xff0c 并显示到ImageView中 应用范围 xff1a 图片上传时的图片选择 xff0c 类似 34 浏览
  • 【Android】获取手机中已安装apk文件信息(PackageInfo、ResolveInfo)(应用图片、应用名、包名等)

    众所周知 xff0c 通过PackageManager 可以获取手机端已安装的apk 文件的信息 xff0c 具体代码如下 PackageManager packageManager 61 this getPackageManager Li
  • 【Android】状态栏通知Notification、NotificationManager详解

    在Android系统中 xff0c 发一个状态栏通知还是很方便的 下面我们就来看一下 xff0c 怎么发送状态栏通知 xff0c 状态栏通知又有哪些参数可以设置 xff1f 首先 xff0c 发送一个状态栏通知必须用到两个类 xff1a N
  • 【Android动画】之Tween动画 (渐变、缩放、位移、旋转)

    Android 平台提供了两类动画 一类是Tween动画 xff0c 就是对场景里的对象不断的进行图像变化来产生动画效果 xff08 旋转 平移 放缩和渐变 xff09 第二类就是 Frame动画 xff0c 即顺序的播放事先做好的图像 x
  • 【Android布局】在程序中设置android:gravity 和 android:layout_Gravity属性

    在进行UI布局的时候 xff0c 可能经常会用到 android gravity 和 android layout Gravity 这两个属性 关于这两个属性的区别 xff0c 网上已经有很多人进行了说明 xff0c 这边再简单说一下 资料
  • 【Android环境】SDK Platform Tools component is missing!

    今天没事升级了一下ADT 10 0 1工具 xff0c 然后重启Eclipse 就出现 34 SDK Platform Tools component is missing 34 的提示框 xff01 然后就发现 xff0c 所有Andro
  • 【Android面试】Android面试题集锦 (陆续更新)

    一些常见的Android面试基础题做下总结 xff0c 看看你能做出多少道 1 Intent的几种有关Activity启动的方式有哪些 xff0c 你了解每个含义吗 这里Android123提示大家 xff0c Intent的一些标记有FL
  • Python跨进程共享数据/对象

    1 跨进程共享方式 在multiprocess库中 xff0c 跨进程对象共享有三种方式 xff1a xff08 1 xff09 第一种仅适用于原生机器类型 xff0c 即python ctypes当中的类型 xff0c 这种在mp库的文档