Python跨进程共享数据/对象

2023-05-16

1. 跨进程共享方式

在multiprocess库中,跨进程对象共享有三种方式:

(1)第一种仅适用于原生机器类型,即python.ctypes当中的类型,这种在mp库的文档当中称为shared memory方式,即通过共享内存共享对象

(2)另外一种称之为server process,即有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象;

(3)最后一种在mp文档当中没有单独提出,但是在其中多次提到,而且是mp库当中最重要的一种共享方式,称为inheritance,即继承,对象在 父进程当中创建,然后在父进程是通过multiprocessing.Process创建子进程之后,子进程自动继承了父进程当中的对象,并且子进程对这 些对象的操作都是反映到了同一个对象。

2. Shared Memory模型

shared memory模型能共享ctypes当中的类型,通过RawValue,RawArray等包装类提供。通过查看multiprocess的源码可以看到支持的类型有:

Value、Array、Lock等,

def Event() -> synchronize.Event: ...
def Lock() -> synchronize.Lock: ...
def RLock() -> synchronize.RLock: ...
def Semaphore(value: int = ...) -> synchronize.Semaphore: ...
def Pipe(duplex: bool = ...) -> Tuple[connection.Connection, connection.Connection]: ...
def Pool(
    processes: Optional[int] = ...,
    initializer: Optional[Callable[..., Any]] = ...,
    initargs: Iterable[Any] = ...,
    maxtasksperchild: Optional[int] = ...,
) -> pool.Pool: ...

# Functions Array and Value are copied from context.pyi.
# See https://github.com/python/typeshed/blob/ac234f25927634e06d9c96df98d72d54dd80dfc4/stdlib/2and3/turtle.pyi#L284-L291
# for rationale
def Array(typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...) -> sharedctypes._Array: ...
def Value(typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value: ...

共享的使用方法示例如下:

import multiprocessing


def func(num):
    num.value=11.11  # 子进程改变数值的值,主进程跟着改变


if __name__=="__main__":
    # d表示数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value)
    num=multiprocessing.Value("d", 10.0) 
    p=multiprocessing.Process(target=func,args=(num,))
    p.start()
    p.join()

    print(num.value)

3. Server Process模型

这个模式支持跨进程共享所有对象,也即是想要共享 “自定义对象”,只能使用这个方式!

server process模型中,有一个manager进程(就是那个server进程),负责管理实际的对象,真正的对象也是在manager进程的内存空间中。所有需要访问该对象的进程都需要先连接到该管理进程,然后获取到对象的一个代理对象(Proxy object)。这个模型是一个典型的RPC(远程过程调用)的模型。因为每个客户进程实际上都是在访问manager进程当中的对象,因此完全可以通过这个实现对象共享。

(1)Manager支持类型

通过查看源码可以发现,Manage() 支持的类型有:

def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...
def Condition(self, lock: Any = ...) -> threading.Condition: ...
def Event(self) -> threading.Event: ...
def Lock(self) -> threading.Lock: ...
def Namespace(self) -> _Namespace: ...
def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ...
def RLock(self) -> threading.RLock: ...
def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...
def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...
def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ...
def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ...
def list(self, sequence

使用示例如下:

import multiprocessing


def func(dict_in,list_in):
    # 跨进程共享, 子进程修改,主进程跟着改变
    dict_in["index1"]="xxx"

    list_in.append("xx")
    list_in.append("yy")


if __name__=="__main__":
with multiprocessing.Manager() as mg:
    # 创建主进程与子进程之间共享的dict/list
    mydict=multiprocessing.Manager().dict()
    mylist=multiprocessing.Manager().list(range(5))
    p=multiprocessing.Process(target=func,args=(mydict,mylist))
    p.start()
    p.join()
    
    print(mylist)
    print(mydict)

(2)共享自定义类

很多场景下,Manager自带的类并不能满足我们的需求,这时候就需要用到Manager对自定义类的支持。Server Process模型共享自定义对象的实现流程如下:

(1) 基于multiprocessing.managers 重写MyManager,类内部啥都不用实现:

class MyManager(managers.BaseManager):
    """
    自定义Manager
    """
    # Pass is really enough. Nothing needs to be done here.
    pass

(2) 注册自定义类,如:RedisService、MySQLService,

# RedisService/MySQLService是自定义类, 类内部分别包含Redis连接和MySQL连接,类定义此处省略
MyManager.register("RedisService", RedisService)
 MyManager.register("MySQLService", MySQLService)

(3) 构造MyManager的实例,并由它创建多进程共享的自定义对象,

manager = MyManager()
manager.start()

# 创建共享对象
self.redis_service = manager.RedisService(settings)
self.mysql_service = manager.MySQLService(settings)

(4) 该对象以参数形式传入到子进程中,子进程直接使用。

全流程的代码示例如下:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import managers


class MyManager(managers.BaseManager):
    """
    自定义Manager
    """
    # Pass is really enough. Nothing needs to be done here.
    pass

def proc_worker(redis_service, mysql_service, task_id):
    """
    TODO::工作进程
    """
    
    # 此处可直接使用进程池共享的redis和mongo服务
    rst = redis_service.get(task_id)
    rst = mysql_service.save_result(rst)
    ...

class ServerExecutor:
    """
    调度执行器
    """
    def __init__(self, settings):
        # 配置信息, 用于创建redis链接和mysql链接
        self.settings = settings

        # 在Manager中注册自定义类(RedisService/MySQLService是我的自定义类, 类内部分别包含Redis连接和MySQL连接)
        MyManager.register("RedisService", RedisService)
        MyManager.register("MySQLService", MySQLService)
        manager = MyManager()
        manager.start()

        # 创建共享对象
        self.redis_service = manager.RedisService(settings)
        self.mysql_service = manager.MySQLService(settings)

        # 这里不仅可以是ProcessPoolExecutor,也可以是多进程Process或者进程池Pool,各自用法略有不同
        self.executor = ProcessPoolExecutor(settings.executor_num)

    def submit(self, task_id):
        """
        提交任务
        """
        future = self.executor.submit(proc_work,
                                      self.redis_service,
                                      self.mysql_service,
                                      task_id)
        return future


# demo
...
executor = ServerExecutor(settings)
future = executor.submit(task_id)
future.add_done_callback(task_done_callback)
...

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python跨进程共享数据/对象 的相关文章

随机推荐

  • 【排序算法】:基数排序

    定义 基数排序 xff08 radix sort xff09 属于 分配式排序 xff08 distribution sort xff09 xff0c 又称 桶子法 xff08 bucket sort xff09 或bin sort xff
  • 【排序算法】:九大排序算法总结

    直接插入排序 参考博客 xff1a 排序算法 xff1a 直接插入排序 时间复杂度 xff1a 平均情况 xff1a O N 2 最好情况 xff1a O N 最坏情况 xff1a O N 2 空间复杂度 xff1a O 1 稳定性 xff
  • 使用Marlin1.1.x固件进行多电机控制(4轴联动+5个挤出机)

    有必要发一个博客 xff0c 这几天做实验没啥进展 xff0c 烦 写东西清理思路 xff0c 整理思维 这篇博文为了交流 xff0c 为两种读者 xff0c 第一种是做3d打印机想要给机器增加5个挤出机的人 xff0c 第二种是遇到运动控
  • 单下划线和双下划线(私有变量和公有变量)

    单下划线 开始的成员变量叫做保护变量 xff0c 意思是只有类对象和子类对象自己能访问到这些变量 xff1b 双下划线 开始的是私有成员 xff0c 意思是只有类对象自己能访问 xff0c 连子类对象也不能访问到这个数据 以单下划线开头 x
  • STM32寄存器的简介、地址查找,与直接操作寄存器

    什么是寄存器 提到单片机 xff0c 就不得不提到寄存器 根据百度百科介绍 xff0c 寄存器是中央处理器内的组成部分 寄存器是有限存贮容量的高速存贮部件 xff0c 它们可用来暂存指令 数据和地址 简单来说 xff0c 寄存器就是存放东西
  • 卡尔曼滤波五个公式推导

    一 两个方程 1 xff0c 系统的状态方程 xff08 预测方程 xff09 x k 61 A k
  • 实时操作系统RTOS(一)——资源整理

    文章目录 1 COS2 FreeRTOS3 NuttX参考资料 1 COS Micrium官网 C全家桶的githubMicrium书籍 xff08 C OS III C OS II C TCP IP C USB xff09 Micrium
  • 路由器设置之后不能连网,动态ip显示为0

    刚从学习毕业 xff0c 那么独居的日子里 xff0c 网络自然是不可或缺的 xff0c 于是博主在等到安装人员上门之后 xff0c 他竟然提出无耻的要求 配置路由器要收费 xff0c 于是乎作为一名计算机从业者 xff0c 自然不能让他把
  • 链表的倒序

    1 用函数实现 xff1a 定义3个临时变量 xff0c p1储存当前节点 xff0c p2储存后一个节点 xff0c p3储存重新指向的那个节点 步骤 xff1a 1 p1指向当前节点 xff0c p2指向后一个节点 xff0c 将p2的
  • Python入门:使用PyCharm调试Python程序

    Python入门 xff1a 使用PyCharm调试Python程序 面向Python初学者 PyCharm集成运行环境 在了解Python编程之前 xff0c 我们需要先弄明白如何编写运行代码 所以非常有必要先讲解一下Python的集成开
  • Debian9.5安装g++

    看了网上很多文章 gcc安装很轻松 xff0c 但是g 43 43 的安装过程就曲折多了 写此博客记录一下Debian9 5操作系统下g 43 43 安装过程 首先替换apt源为阿里云的源 xff1a debian 配置 debian 7
  • CMakeLists.txt文件中添加OpenCV库依赖项

    CMakeLists txt文件中添加OpenCV库依赖项 cmake needs this line cmake minimum required VERSION 2 8 Define project name project openc
  • 物理地址和逻辑地址

    1 物理地址和逻辑地址 物理地址 xff1a 加载到内存地址寄存器中的地址 xff0c 内存单元的真正地址 在前端总线上传输的内存地址都是物理内存地址 xff0c 编号从0开始一直到可用物理内存的最高端 这些数字被北桥 Nortbridge
  • 树莓派无opencv时进行视频实时处理

    用树莓派 xff0c 想要进行图像处理 xff0c 但网上的很多教程都用opencv xff0c opencv在树莓派上安装很麻烦 xff0c 那怎样进行图像处理呢 xff1f 代码如下 xff1a from picamera array
  • make: warning: Clock skew detected. Your build may be incomplete.

    问题现象 xff1a make warning Clock skew detected Your build may be incomplete 问题分析 xff1a 根据报警提示 xff0c 应该问题出现时钟问题 问题原因 xff1a 当
  • 关于Java之IO流音乐拼接小项目

    需求 xff1a 做一个音乐串烧 分析 xff1a 1 有n个音乐 xff0c 找到高潮部分 xff0c 2 获取高潮部分的流对象 3 把这部分对象保存成一个mp3 4 把它们拼接起来 以下为源码供大家分享 xff1a 方法一 xff1a
  • pixhawk源码下载与编译

    今天和同学在一起讨论发现自己还在看的pixhawk源码的版本好低啊 xff0c 就下个最新的吧 想起有些人还不会下载源码 xff0c 以及用什么工具查看 源码的下载我用的是git shell工具下的 xff0c px4的github网址是h
  • pixhawk开发环境

    Windows 7 64bit 软件安装 首先 xff0c 需要安装一些软件 xff0c CMake 32位的Java jdk以及PX4 Toolchain Installer CMake的话笔者使用的是CMake 3 3 2 win32
  • Pixhawk-信息流浅解析

    根深方能叶茂 在等待的日子里 xff0c 刻苦读书 xff0c 谦卑做人 xff0c 养得深根 xff0c 日后才能枝叶茂盛 Better 根爷 之前我们已经谈到系统框架 xff0c 之前谈到了定制自己功能的两部 xff1a 添加模块和修改
  • Python跨进程共享数据/对象

    1 跨进程共享方式 在multiprocess库中 xff0c 跨进程对象共享有三种方式 xff1a xff08 1 xff09 第一种仅适用于原生机器类型 xff0c 即python ctypes当中的类型 xff0c 这种在mp库的文档