Python Multiprocessing.Pool 惰性迭代

2024-01-16

我想知道 python 的 Multiprocessing.Pool 类与 map、imap 和 map_async 一起使用的方式。我的特殊问题是,我想映射一个创建内存密集型对象的迭代器,并且不希望所有这些对象同时生成到内存中。我想看看各种 map() 函数是否会耗尽我的迭代器,或者仅在子进程缓慢前进时智能地调用 next() 函数,因此我编写了一些测试:

def g():
  for el in xrange(100):
    print el
    yield el

def f(x):
  time.sleep(1)
  return x*x

if __name__ == '__main__':
  pool = Pool(processes=4)              # start 4 worker processes
  go = g()
  g2 = pool.imap(f, go)
  g2.next()

等等还有map、imap 和map_async。然而,这是最明显的例子,因为在 g2 上简单地调用一次 next() 就会打印出生成器 g() 中的所有元素,而如果 imap “懒惰”地执行此操作,我希望它只调用 go.next () 一次,因此仅打印出“1”。

有人可以澄清正在发生的事情,以及是否有某种方法可以让进程池根据需要“延迟”评估迭代器?

Thanks,

Gabe


我们先看一下程序的结尾。

多处理模块使用atexit打电话multiprocessing.util._exit_function当你的程序结束时。

如果您删除g2.next(),你的程序很快结束。

The _exit_function最终打电话Pool._terminate_pool。主线程改变了状态pool._task_handler._state from RUN to TERMINATE。与此同时pool._task_handler线程正在循环Pool._handle_tasks当达到条件时退出

            if thread._state:
                debug('task handler found thread._state != RUN')
                break

(参见/usr/lib/python2.6/multiprocessing/pool.py)

这就是阻止任务处理程序完全消耗生成器的原因,g()。如果你看进去Pool._handle_tasks你会看到的

        for i, task in enumerate(taskseq):
            ...
            try:
                put(task)
            except IOError:
                debug('could not put task on queue')
                break

这是消耗您的生成器的代码。 (taskseq不完全是你的发电机,但作为taskseq被消耗了,你的发电机也被消耗了。)

相反,当您调用g2.next()主线程调用IMapIterator.next,并在到达时等待self._cond.wait(timeout).

主线程正在等待而不是 呼叫_exit_function是允许任务处理程序线程正常运行的原因,这意味着完全消耗生成器put中的任务workers' inqueue in the Pool._handle_tasks功能。

底线是所有Pool映射函数消耗给定的整个可迭代对象。如果您想分块使用生成器,您可以这样做:

import multiprocessing as mp
import itertools
import time


def g():
    for el in xrange(50):
        print el
        yield el


def f(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    pool = mp.Pool(processes=4)              # start 4 worker processes
    go = g()
    result = []
    N = 11
    while True:
        g2 = pool.map(f, itertools.islice(go, N))
        if g2:
            result.extend(g2)
            time.sleep(1)
        else:
            break
    print(result)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python Multiprocessing.Pool 惰性迭代 的相关文章

随机推荐

  • 如何从命令行获取文件句柄?

    我有一个以文件句柄作为参数的子例程 如何从命令行指定的文件路径创建文件句柄 我不想自己对这个文件进行任何处理 我只想将其传递给另一个子例程 该子例程返回一个包含文件中所有解析数据的哈希数组 我正在使用的命令行输入如下所示 getfile p
  • 如何在 Eclipse 中集成 SBT scala

    如何在eclipse中集成SBT scala 我也在关注这篇文章 我想在 scala 中添加现有项目 如何编译它以及如何在eclipse中使用build sbt SBT 集成测试设置 https stackoverflow com ques
  • Template.instance() 和 this 之间的区别

    Template instance 和这个有什么区别 使用其中之一有优势吗 Template name onRendered function var template Template instance var instance this
  • Redis sub/pub 和 php/nodejs

    开始开发一个新项目 使用 redis 作为 sub pub 系统来显示 mysql 数据库的结果 因此 如果有更新 我想将这些更新从 mysql 发布到我的网页 我的问题是 哪种选择更好 选项1 我应该通过nodejs 和socket io
  • 使用 DateTime.TryParse 检查字符串是否为有效日期

    我在用DateTime TryParse 函数检查特定字符串是否是有效的日期时间 不依赖于任何区域性 令我惊讶的是 该函数返回true对于 1 1 1 1 等偶数字符串 我怎么解决这个问题 Update 这是否意味着 如果我想检查特定字符串
  • 从 javascript 显示 Android 键盘

    我希望能够在导航到页面后在移动浏览器 例如 Android 上显示键盘 我已经看到了一些解决方法 用javascript在手机上显示虚拟键盘 https stackoverflow com questions 6837543 show vi
  • 以下示例中给出的结构有何不幸之处?

    15 6 2 初始化基数和成员 N4713 节在第 11 项之后有以下示例 struct A A default OK A int v v v OK const int v 42 OK A a1 error ill formed bindi
  • `more.com` 返回“内存不足”。

    环境详情 x64 Win7 SP1 企业版 Windows PowerShell v5 0 没有加载任何配置文件 我的本地电源外壳 questions tagged powershell会话正在返回 内存不足 当我尝试执行时help or
  • 如何对随时间缓慢变化的数据进行建模?

    假设我得到了大量 200 万行 数据 这些数据应该是静态且不变的 应该是 这些数据每月重新发布一次 有哪些方法可以用于 1 了解哪些数据点逐月发生变化以及 2 使用给定时间点的数据 解决方案1 天真地保存每个数据快照 并按日期注释 差异意识
  • 在从独立相机捕获的图像上绘制文本(时间戳)

    我的代码如下 单击即可打开相机 拍照 从相机获取照片 然后放入图像视图中 不过 我想拍摄图像并在图像上应用文本 某种时间戳 最好是图像的时间戳 或者只是系统日期时间 并保存为 jpeg 如果有人能帮助我那就太好了 public class
  • 如何存储网络应用程序的配置设置?

    我有一些站点元数据我希望可以更改 例如 在我的应用程序中 如果系统管理员不想使用站点的 库存 部分 他 她可以将其关闭 并且它将从主站点消失 所以我在想 也许我可以在数据库中创建一个名为 元 的表 并在那里插入值 或元组 然后 如果模块被关
  • 将“@daily-co/daily-js”导入 SvelteKit 应用程序会引发“全局未定义”错误

    我尝试过的 我尝试通过解决它if browser 进一步来说 if browser let DailyIframe await import daily co daily js 在load函数里面
  • 如何读写MP3到数据库

    如何从Sql数据库读取MP3 在 sql 中 我已将文件存储为二进制格式 现在我想检索存储在 sql 中的 Mp3 文件并显示在我的 aspx 页面中 如何 请帮忙 以最简单的形式 这就是您获取原始字节的方式 在不知道您想要它做什么的情况下
  • 分段错误:11 - Xcode 6.3

    无法存档 我的应用程序在模拟器和多个设备上运行良好 Xcode 6 3 2 基于 swift 但是当我尝试存档它时出现错误Command failed due to signal Segmentation fault 11 其他人面临同样的
  • 视图索引 (Oracle)

    假设我有两张桌子 tab a and tab b 我创建了一个如下所示的视图 create view join tabs as select col x as col z from tab a union select col y as c
  • Objective C 中什么是非空?

    有人可以详细说明为什么吗nonnulliOS 9 中引入 例如 NSArray method instancetype array is now instancetype nonnull array 参考 https developer a
  • C 的 std::vector 替代品 [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我想知道是否有替代方案C 中的 std 向量 我发现这个实现 http codingrecipes com implementation
  • (Flutter) 具有有限内容的无限滚动 `ListView.builder`

    1 问题 我该如何做我的ListView builder能够滚动到顶部和底部的空白区域吗 例如 我有一个自定义小部件列表 我希望用户能够通过滚动到列表中最上面的卡片 位于屏幕顶部 更接近他的拇指 而Flutter用空背景渲染顶部空间 2 到
  • 使用 MVC Web API 发布对象数组

    我有一个基本的后期操作 适用于单个对象RecordIem 我想做的是执行相同的操作 但通过使用相同格式发布一组请求来批量执行 例如 public HttpResponseMessage Post RecordItem request var
  • Python Multiprocessing.Pool 惰性迭代

    我想知道 python 的 Multiprocessing Pool 类与 map imap 和 map async 一起使用的方式 我的特殊问题是 我想映射一个创建内存密集型对象的迭代器 并且不希望所有这些对象同时生成到内存中 我想看看各