Python 多处理:我可以使用更新的全局变量重用进程(已经并行化的函数)吗?

2023-12-28

首先让我向您展示我当前的设置:

import multiprocessing.pool
from contextlib import closing
import os

def big_function(param):
   process(another_module.global_variable[param])


def dispatcher():
    # sharing read-only global variable taking benefit from Unix
    # which follows policy copy-on-update
    # https://stackoverflow.com/questions/19366259/
    another_module.global_variable = huge_list

    # send indices
    params = range(len(another_module.global_variable))

    with closing(multiprocessing.pool.Pool(processes=os.cpu_count())) as p:
        multiprocessing_result = list(p.imap_unordered(big_function, params))

    return multiprocessing_result

这里我使用在创建进程池之前更新的共享变量,其中包含大量数据,这确实提高了我的速度,所以现在看起来没有被腌制。该变量也属于导入模块的范围(如果它很重要)。

当我尝试创建这样的设置时:

another_module.global_variable = []

p = multiprocessing.pool.Pool(processes=os.cpu_count())

def dispatcher():
    # sharing read-only global variable taking benefit from Unix
    # which follows policy copy-on-update
    # https://stackoverflow.com/questions/19366259/
    another_module_global_variable = huge_list

    # send indices
    params = range(len(another_module.global_variable))

    multiprocessing_result = list(p.imap_unordered(big_function, params))

    return multiprocessing_result  

p“记住”全局共享列表是空的,并且当从调度程序内部调用时拒绝使用新数据。


现在问题是:使用上面的第一个设置在 8 个核心上处理约 600 个数据对象,我的并行计算运行 8 秒,而单线程运行 12 秒。

我是这样想的:只要多处理pickles数据,并且每次都需要重新创建进程,我就需要pickle函数big_function(),所以我在这方面浪费了时间。使用全局变量部分解决了数据的情况(但我仍然需要在每次更新时重新创建池)。

我可以用以下实例做什么big_function()(这取决于其他模块、numpy 等的许多其他函数)?我可以创建吗os.cpu_count()它的副本一劳永逸,并以某种方式将新数据输入其中并接收结果,重用工人?


只是为了讨论“记住”问题:

another_module.global_variable = []
p = multiprocessing.pool.Pool(processes=os.cpu_count())

def dispatcher():
    another_module_global_variable = huge_list
    params = range(len(another_module.global_variable))
    multiprocessing_result = list(p.imap_unordered(big_function, params))
    return multiprocessing_result 

问题似乎出在您创建时Pool实例。

这是为什么?

这是因为当你创建实例时Pool,它确实设置了工作线程的数量(默认情况下等于 CPU 核心的数量),并且它们都在那时启动(分叉)。这意味着工人拥有父母全球状态的副本(并且another_module.global_variable除其他事项外),并且使用写时复制策略,当您更新another_module.global_variable你在父母的过程中改变它。工人对旧值有参考。这就是为什么你有问题。

以下是几个链接,可以为您提供更多解释:this https://stackoverflow.com/a/42149043 and this https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods.

这是一个小片段,您可以在其中切换全局变量值更改的行和启动进程的行,并检查子进程中打印的内容。

from __future__ import print_function
import multiprocessing as mp

glob = dict()
glob[0] = [1, 2, 3]


def printer(a):
    print(globals())
    print(a, glob[0])


if __name__ == '__main__':
    p = mp.Process(target=printer, args=(1,))
    p.start()
    glob[0] = 'test'
    p.join()

这是Python2.7代码,但它也适用于Python3.6。

这个问题的解决方案是什么?

好吧,回到第一个解决方案。您更新导入模块变量的值,然后创建进程池。


现在真正的问题是缺乏加速。

这是有趣的部分文档 https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled关于如何腌制函数:

请注意,函数(内置函数和用户定义函数)是通过“完全 合格的”名称引用,而不是值。这意味着只有 函数名称与模块名称一起被腌制 函数是在中定义的。既不是函数的代码,也不是它的任何代码 函数属性被腌制。因此定义模块必须是 可在 unpickling 环境中导入,并且该模块必须包含 命名对象,否则将引发异常。

这意味着您的函数酸洗不应该是一个浪费时间的过程,或者至少不是一个浪费时间的过程。导致缺乏加速的原因是,对于您传递给的列表中的约 600 个数据对象imap_unordered调用时,您将它们中的每一个传递给一个工作进程。再次,底层实现multiprocessing.Pool可能是这个问题的原因。

如果你深入multiprocessing.Pool实施后,你会看到两个Threads using Queue正在处理父进程和所有子(工作)进程之间的通信。因此,所有进程都不断需要函数参数并不断返回响应,最终导致父进程非常繁忙。这就是为什么“大量”时间花在“分派”工作上,将数据传入或传出工作进程。

对此该怎么办?

尝试随时增加工作进程中进程的数据对象数量。在您的示例中,您一个接一个地传递数据对象,并且您可以确保每个工作进程在任何时候都只处理一个数据对象。为什么不增加传递给工作进程的数据对象的数量?这样,您可以使每个进程更加繁忙,处理 10 个、20 个甚至更多的数据对象。据我所见,imap_unordered has an chunksize争论。它设置为1默认情况下。尝试增加它。像这样的事情:

import multiprocessing.pool
from contextlib import closing
import os

def big_function(params):
   results = []
   for p in params:
       results.append(process(another_module.global_variable[p]))
   return results

def dispatcher():
    # sharing read-only global variable taking benefit from Unix
    # which follows policy copy-on-update
    # https://stackoverflow.com/questions/19366259/
    another_module.global_variable = huge_list

    # send indices
    params = range(len(another_module.global_variable))

    with closing(multiprocessing.pool.Pool(processes=os.cpu_count())) as p:
        multiprocessing_result = list(p.imap_unordered(big_function, params, chunksize=10))

    return multiprocessing_result

几个建议:

  1. 我看到你创造了params作为索引列表,您可以使用它来选择特定的数据对象big_function。您可以创建代表第一个和最后一个索引的元组并将它们传递给big_function。这可能是增加工作量的一种方式。这是我上面提出的方法的另一种方法。
  2. 除非你明确喜欢Pool(processes=os.cpu_count()),可以省略。默认情况下它需要 CPU 核心数。

对于答案的长度或可能潜入的任何拼写错误,我们深表歉意。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 多处理:我可以使用更新的全局变量重用进程(已经并行化的函数)吗? 的相关文章

  • 如何修复 Apache mod_wsgi 的 Python 版本不匹配问题?

    我收到此错误 Thu Jul 12 14 31 36 2012 error python init Python version mismatch expected 2 6 7 found 2 6 8 当尝试启动 Apache 服务器时 在
  • 如何忽略传递给函数的意外关键字参数?

    假设我有一些功能 f def f a None print a 现在 如果我有一本字典 比如dct a Foo 我可以打电话f dct 并得到结果Foo打印 但是 假设我有一本字典dct2 a Foo b Bar 如果我打电话f dct2
  • 重新索引错误没有意义

    I have DataFrames大小在 100k 到 2m 之间 我正在处理这个问题的框架是如此之大 但请注意 我必须对其他框架执行相同的操作 gt gt gt len data 357451 现在这个文件是通过编译许多文件创建的 所以它
  • 使用 Python 在 Google Cloud Storage 存储桶中创建/上传新文件

    如何使用 Python 和可用的客户端库在 Google Cloud Storage 中创建新的空文件 或者如何使用 blob 函数 upload from filename 将新文件上传到选定的存储桶 要初始化 blob 对象 我们应该在
  • pip 安装失败,SSL 证书验证失败 (_ssl.c:833)

    我无法通过 pip install 安装任何外部 python 模块 我已经正确安装了 python 但如果我使用 pip install 它会显示此错误 这是我运行后的代码pip install pytesseract C Users 1
  • Python sqlite3参数化删除表

    我在 python 中删除 sqlite3 表时遇到问题 我正在使用标准sqlite3模块 self conn sqlite3 connect sql drop table self conn execute sql u table nam
  • PyTorch:加速数据加载

    我正在使用 dendnet121 从 Kaggle 数据集进行猫 狗检测 我启用了cuda 看起来训练速度非常快 然而 数据加载 或者可能是处理 似乎非常慢 有一些方法可以加快速度吗 我尝试玩女巫批量大小 但没有提供太多帮助 我还将 num
  • 如何为 C 分配的 numpy 数组注册析构函数?

    我想在 C C 中为 numpy 数组分配数字 并将它们作为 numpy 数组传递给 python 我可以做的PyArray SimpleNewFromData http docs scipy org doc numpy reference
  • 如何使用 xlrd 将新列和行添加到 .xls 文件

    如何向 xlrd 中的工作表添加新列和 或行 我有一个使用 open workbook 读取的 xls 文件 我需要在第一张表中添加一个新列 bouncebacks 然后在该表中添加新行 但我在 xlrd 文档中找不到任何显示如何添加新行和
  • 使用 3d 对象作为 3d 散点图中的标记 - Python

    使用下面的代码 我尝试模拟一个用罐头制成的碗 我希望每个标记都是一个罐头 最好的方法是什么 我真的很感激任何建议 谢谢 import pylab import numpy as np from math import pi sin cos
  • 使用 python 只读取 Excel 中的可见行

    我想只读取 python 中 Excel 工作表中的可见行 输入 Excel表 所以当我过滤时 作为 python 中的输出 在本例中我将仅获得可见数据 1 行 这是我的代码 from openpyxl import load workbo
  • pandas to_sql sqlalchemy 与 secure_transport 的连接

    我正在尝试将数据发送到具有 require secure transport ON 的服务器上的 mysql 数据库 当我尝试使用以下代码连接到它时 import pandas as pd import pymysql from sqlal
  • Python,多线程,获取网页,下载网页

    我想在一个站点批量下载网页 我的 urls txt 文件中有 5000000 个 url 链接 大约有300M 如何让多线程链接这些网址并下载这些网页 或者如何批量下载这些网页 我的想法 with open urls txt r as f
  • Python Camelot无边框表格提取问题

    我正在努力从 pdf 文件中提取一些无边框表格 如下图所示 我已经安装了 python camelot 如图所示here https github com socialcopsdev camelot并且仅适用于有边框的表格 请参阅以下详细信
  • 使用 statsmodels.formula.api 中的 ols - 如何删除常数项?

    我正在遵循第一个例子statsmodels教程 http statsmodels sourceforge net devel http statsmodels sourceforge net devel 如何指定在 ols 中不使用常数项进
  • 如何使用 pygame.mixer 重复音乐?

    我创建了以下使用 pygame mixer 播放 mp3 音乐的代码 然而 音乐不会重复 有什么想法可以让音乐重复播放吗 这是代码 playlist list playlist append put music here mp3 playl
  • 如何可视化多维数据上的 kmeans 聚类

    我在 mnist 数据集上使用 kmeans 聚类算法 并希望可视化聚类后的图 到目前为止我做了这个 from mnist import MNIST mndata MNIST Datasets X train y train mndata
  • 如何限制scrapy请求对象?

    所以我有一个蜘蛛 我认为它正在泄漏内存 结果当我检查 telnet 控制台 gt gt gt prefs 时 它只是从链接丰富的页面中抓取了太多链接 有时它会超过 100 000 个 现在我已经一遍又一遍地浏览文档和谷歌 但我找不到一种方法
  • Flask 扩展未在 app.extensions 中注册

    我想访问在我的 Flask 应用程序上注册的一些扩展 我尝试使用app extensions 但我初始化的一些扩展不在字典中 from flask import current app current app extensions get
  • 在至少 7 天内连续三天登录该产品的用户

    我有一个用于用户参与的数据框 df 如下所示 time stamp user id 2013 01 01 10 05 23 1 2013 01 03 16 35 23 1 2013 01 06 11 06 35 1 2013 01 10 1

随机推荐