如何从并行进程中运行的函数中检索值?

2024-03-07

Multiprocessing 模块对于 Python 初学者来说相当令人困惑,特别是对于那些刚刚从 MATLAB 迁移并因并行计算工具箱而变得懒惰的人。我有以下函数,运行时间约为 80 秒,我想通过使用 Python 的多处理模块来缩短这个时间。

from time import time

xmax   = 100000000

start = time()
for x in range(xmax):
    y = ((x+5)**2+x-40)
    if y <= 0xf+1:
        print('Condition met at: ', y, x)
end  = time()
tt   = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time:          ', tt)

这会按预期输出:

Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2
Each iteration took:  8.667453265190124e-07
Total time:           86.67453265190125

由于循环的任何迭代都不依赖于其他迭代,因此我尝试采用此方法服务器进程 https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes从官方文档中可以在单独的进程中扫描范围的块。最后我想出了 vartec 的答案这个问题 https://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multiprocessing-proce并可以编写以下代码。我还根据 Darkonaut 对当前问题的回答更新了代码。

from time import time 
import multiprocessing as mp

def chunker (rng, t): # this functions makes t chunks out of rng
    L  = rng[1] - rng[0]
    Lr = L % t
    Lm = L // t
    h  = rng[0]-1
    chunks = []
    for i in range(0, t):
        c  = [h+1, h + Lm]
        h += Lm
        chunks.append(c)
    chunks[t-1][1] += Lr + 1
    return chunks

def worker(lock, xrange, return_dict):
    '''worker function'''
    for x in range(xrange[0], xrange[1]):
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            return_dict['x'].append(x)
            return_dict['y'].append(y)
            with lock:                
                list_x = return_dict['x']
                list_y = return_dict['y']
                list_x.append(x)
                list_y.append(y)
                return_dict['x'] = list_x
                return_dict['y'] = list_y

if __name__ == '__main__':
    start = time()
    manager = mp.Manager()
    return_dict = manager.dict()
    lock = manager.Lock()
    return_dict['x']=manager.list()
    return_dict['y']=manager.list()
    xmax = 100000000
    nw = mp.cpu_count()
    workers = list(range(0, nw))
    chunks = chunker([0, xmax], nw)
    jobs = []
    for i in workers:
        p = mp.Process(target=worker, args=(lock, chunks[i],return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    end = time()
    tt   = end-start #total time
    print('Each iteration took: ', tt/xmax)
    print('Total time:          ', tt)
    print(return_dict['x'])
    print(return_dict['y'])

这大大将运行时间缩短至约 17 秒。但是,我的共享变量无法检索任何值。请帮我找出代码的哪一部分出了问题。

我得到的输出是:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[]
[]

我从中期望:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[0, 1, 2]
[-15, -3, 11]

您的示例中的问题是对标准可变结构的修改Manager.dict不会被传播。我首先向您展示如何与经理一起修复它,只是为了向您展示更好的选择。

multiprocessing.Manager有点重,因为它使用一个单独的进程只是为了Manager处理共享对象需要使用锁来保证数据一致性。如果您在一台机器上运行它,有更好的选择multiprocessing.Pool,以防您不必运行自定义Process课程,如果有必要的话,multiprocessing.Process和...一起multiprocessing.Queue将是常见的做法。

引用部分来自多处理docs. https://docs.python.org/3.7/library/multiprocessing.html#managers


Manager

如果标准(非代理)列表或字典对象包含在引用对象中,则对这些可变值的修改将不会通过管理器传播,因为代理无法知道其中包含的值何时被修改。但是,将值存储在容器代理中(这会触发setitem在代理对象上)确实通过管理器传播,因此为了有效地修改这样的项目,可以将修改后的值重新分配给容器代理...

在你的情况下,这看起来像:

def worker(xrange, return_dict, lock):
    """worker function"""
    for x in range(xrange[0], xrange[1]):
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            with lock:
                list_x = return_dict['x']
                list_y = return_dict['y']
                list_x.append(x)
                list_y.append(y)
                return_dict['x'] = list_x
                return_dict['y'] = list_y

The lock这里将是一个manager.Lock例如,您必须作为参数传递,因为整个(现在)锁定操作本身并不是原子的。 (Here https://stackoverflow.com/a/52453247/9059420是一个更简单的例子Manager使用锁)

在大多数用例中,这种方法可能不如使用嵌套代理对象方便,但也演示了对同步的一定程度的控制。

由于 Python 3.6 代理对象是可嵌套的:

版本 3.6 中的更改:共享对象可以嵌套。例如,共享容器对象(例如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager 管理和同步。

从Python 3.6开始你可以填写你的manager.dict在开始多处理之前manager.list作为值,然后直接附加到工作线程中,而无需重新分配。

return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

EDIT:

这是完整的示例Manager:

import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges

# def context_timer ... see code snippet in "Pool" section below

def worker(batch_range, return_dict, lock):
    """worker function"""
    for x in batch_range:
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            with lock:
                return_dict['x'].append(x)
                return_dict['y'].append(y)


if __name__ == '__main__':

    N_WORKERS = mp.cpu_count()
    X_MAX = 100000000

    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
    batch_ranges = build_batch_ranges(batch_sizes)
    print(batch_ranges)

    with Manager() as manager:
        lock = manager.Lock()
        return_dict = manager.dict()
        return_dict['x'] = manager.list()
        return_dict['y'] = manager.list()

        tasks = [(batch_range, return_dict, lock)
                 for batch_range in batch_ranges]

        with context_timer():

            pool = [Process(target=worker, args=args)
                    for args in tasks]

            for p in pool:
                p.start()
            for p in pool:
                p.join()

        # Create standard container with data from manager before exiting
        # the manager.
        result = {k: list(v) for k, v in return_dict.items()}

    print(result)

Pool

最常见的是multiprocessing.Pool就会这么做。由于您希望将迭代分布在一个范围内,因此您在示例中面临额外的挑战。 你的chunker即使每个进程都有大约相同的工作要做,函数也无法划分范围:

chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]]  # 4, 4, 4, 6!

对于下面的代码,请获取代码片段mp_utils.py从我的回答来看here https://stackoverflow.com/a/52637805/9059420,它尽可能为块范围提供两个函数。

With multiprocessing.Pool your worker函数只需返回结果Pool将负责通过内部队列将结果传输回父进程。这result将是一个列表,因此您必须按照您想要的方式重新排列结果。您的示例可能如下所示:

import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain

from mp_utils import calc_batch_sizes, build_batch_ranges

@contextmanager
def context_timer():
    start_time = time.perf_counter()
    yield
    end_time = time.perf_counter()
    total_time   = end_time-start_time
    print(f'\nEach iteration took: {total_time / X_MAX:.4f} s')
    print(f'Total time:          {total_time:.4f} s\n')


def worker(batch_range):
    """worker function"""
    result = []
    for x in batch_range:
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            result.append((x, y))
    return result


if __name__ == '__main__':

    N_WORKERS = mp.cpu_count()
    X_MAX = 100000000

    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
    batch_ranges = build_batch_ranges(batch_sizes)
    print(batch_ranges)

    with context_timer():
        with Pool(N_WORKERS) as pool:
            results = pool.map(worker, iterable=batch_ranges)

    print(f'results: {results}')
    x, y = zip(*chain.from_iterable(results))  # filter and sort results
    print(f'results sorted: x: {x}, y: {y}')

示例输出:

[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2

Each iteration took: 0.0000 s
Total time:          8.2408 s

results: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)

Process finished with exit code 0

如果你有多个论点worker您将构建一个包含参数元组和交换的“任务”列表pool.map(...) with pool.starmap(...iterable=tasks)。有关更多详细信息,请参阅文档。


进程和队列

如果你不能使用multiprocessing.Pool由于某种原因,你必须采取 自己处理进程间通信(IPC),通过传递一个multiprocessing.Queue作为你的工人职能的论点 - 进程并让它们将结果排队发送回 父母。

您还必须构建类似池的结构,以便您可以迭代它来启动和加入流程,并且您必须get()结果从队列返回。更多关于Queue.get我写过的用法here https://stackoverflow.com/a/53132779/9059420.

采用这种方法的解决方案可能如下所示:

def worker(result_queue, batch_range):
    """worker function"""
    result = []
    for x in batch_range:
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            result.append((x, y))
    result_queue.put(result)  # <--


if __name__ == '__main__':

    N_WORKERS = mp.cpu_count()
    X_MAX = 100000000

    result_queue = mp.Queue()  # <--
    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
    batch_ranges = build_batch_ranges(batch_sizes)
    print(batch_ranges)

    with context_timer():

        pool = [Process(target=worker, args=(result_queue, batch_range))
                for batch_range in batch_ranges]

        for p in pool:
            p.start()

        results = [result_queue.get() for _ in batch_ranges]

        for p in pool:
            p.join()

    print(f'results: {results}')
    x, y = zip(*chain.from_iterable(results))  # filter and sort results
    print(f'results sorted: x: {x}, y: {y}')
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从并行进程中运行的函数中检索值? 的相关文章

  • 如果单元格以文本字符串开头...公式

    我有一个公式 用于检查单元格是否以文本 A 开头返回 拾取 B 代表收集 和 C 代表预付 但它似乎不能正常工作 A 和 C 均返回预付费 LOOKUP LEFT A1 A B C Pick Up Collect Prepaid 我不知道l
  • 为什么使用SignTool进行代码签名时需要指定时间戳服务器?

    时间戳是可选参数 所以有人可以解释带时间戳的exe文件和不带时间戳的exe文件之间的区别吗 如果我跳过此选项会发生什么 如果您跳过时间戳选项 那么当您的证书过期时 exe 将不再具有有效的证书 如果您使用时间戳服务器 那么 exe 将始终具
  • ASP.NET 入口点?

    刚刚创建了一个空白的 ASP NET Web 应用程序 切入点在哪里 我看到 Default aspx 似乎是调用的默认模板 我猜 Site Master 充当布局文件 Global asax 似乎提供了一些用于事件处理的方法存根 然后是
  • 在IOS5中实现SIP功能

    我想构建一个 iPhone 应用程序 它可以选择通过 SIP VoIP 拨打电话 但目前我不知道如何开始 有谁有关于这个主题的一些信息 或者可能有一个我可以用来实现 SIP 功能的演示项目 提前致谢 你前面还有一条漫长而有趣的路 您需要选择
  • Angular2 http.post 被执行两次

    我遇到一个奇怪的问题 Angular2 的 RC1 Http 服务执行 http post 调用两次 我已经调试了我的应用程序 并且我知道这不是点击事件问题 导致核心服务调用的所有调用 public create json Object p
  • 跳过一行GridBagLayout

    我在 JFrame 上使用 GridBagLayout 我希望能够跳过一两行 但将这些行显示为空白 然后在这些行后面有一个按钮 我在文档中找不到任何方法来执行我所描述的操作 有谁知道我可以执行此操作的任何方法吗 发现它比添加空组件干净得多
  • PostgreSQL 使用 JPA 和 Hibernate 抛出“列的类型为 jsonb,但表达式的类型为 bytea”

    这是我的实体类 映射到表中postgres 9 4 我正在尝试将元数据存储为jsonb在数据库中输入 Entity Table name room categories TypeDef name jsonb typeClass JsonBi
  • 如何在 SpringDoc OpenAPI 3 中引用文件?

    我有 Spring Boot 项目 我想在其中记录我的 API 这里是正在处理的 Web 服务的示例 ApiResponses value ApiResponse responseCode 200 content Content media
  • 1° 夏令时 Java 和 JS 表现出不同的行为

    假设巴西利亚 GMT 0300 夏令时于 21 10 2012 00 00 00 此时时钟应提前一小时 Java new Date 2012 1900 9 21 0 0 0 Sun Oct 21 01 00 00 BRST 2012 Chr
  • 如何通过 jQuery onblur 提交表单

    所以我尝试通过 jQuery onblur 提交表单 即一旦焦点离开密码字段 表单就会通过 jQuery 提交 有类似的问题 但这不是我要找的 我尝试使用 document getElementById 但它不起作用 任何帮助表示赞赏 提前
  • 如何从停止的地方开始播放视频

    我正在使用 VideoView 来播放视频 如果我退出应用程序 在返回应用程序 即在 onResume 中 时 它应该从停止的位置播放视频 要获取当前进度 在 onPause 中检查 long progress mVideoView get
  • Laravel Echo 不监听推送事件

    尝试使用 laravel 和 vuejs 创建一种聊天应用程序 发送消息后 我会从 laravel 触发事件 该事件会使用正确的事件类反映在推送器调试控制台上 但根本不会调用来自 vuejs 的监听回调 created window Ech
  • 尝试访问从资产复制到数据\数据\的数据库中的DatabaseHelper时出现空指针异常

    我有一个数据库助手类 代码如下 这个助手的类任务是将数据库从应用程序附带的资产文件夹复制到我的应用程序的 data data 中 以便我可以使用它 一旦我将数据库放入 data data 我能够 我想添加它并执行 CRUD 操作 并且该数据
  • 如何将 Ant 路径转换为文件集?

    我正在编写一个 Ant 脚本来将项目打包到 WAR 文件中 该软件由多个项目组成 它们有自己的源目录 库等 WAR 任务有一个嵌套元素lib我目前正在研究这个问题 我目前有所需库的参考作为Path 包含几个FileSets 我在类路径引用中
  • gwt - 在 RPC 调用中使用 List

    我有一个 RPC 服务 方法如下 public List
  • 在 Android 上使用 pocketsphinx 未检测到关键字

    谁能解释一下如何使用 pocketsphinx 将语音转换为文本 我试试这个 import com example speechtutor SpeechRecognizerRecorder import com example speech
  • 文件构造函数说明

    我无法理解以下文件构造函数 public File String parent String child and public File File parent String child 参数有什么作用parent and child该文件
  • Spring JMS开始根据请求监听jms队列

    Spring提供 JMSListener用于监听来自特定队列的消息的注释 还有一个替代方案实施JmsListenerConfigurer http docs spring io spring docs current spring fram
  • 关闭 IPython Notebook 中的自动保存

    我正在寻找一种方法来关闭 iPython 笔记本中的自动保存 我已经通过 Google Stack Overflow 搜索看到了有关如何打开自动保存的参考资料 但我想要相反的内容 关闭自动保存 如果这是可以永久设置的东西而不是在每个笔记本的
  • 寻求有关标记视频系统上的“相关视频”查询的建议

    好吧 我运行一个小型视频网站 在实际的视频页面上有一条与大多数视频页面 例如 YouTube 类似的 相关视频 目前我所做的就是随机获取其标签之一并查找其他视频相同的标签 毫不奇怪 这不是一个好方法 因为有些标签非常模糊 有些视频被错误标记

随机推荐