使用 asyncio 并行化生成器

2024-02-14

我的应用程序从慢速 I/O 源读取数据,进行一些处理,然后将其写入本地文件。我已经用生成器实现了这个,如下所示:

import time

def io_task(x):
    print("requesting data for input %s" % x)
    time.sleep(1)   # this simulates a blocking I/O task
    return 2*x

def producer(xs):
    for x in xs:
        yield io_task(x)

def consumer(xs):
    with open('output.txt', 'w') as fp:
        for x in xs:
            print("writing %s" % x)
            fp.write(str(x) + '\n')

data = [1,2,3,4,5]
consumer(producer(data))

现在我想在 asyncio 的帮助下并行化这个任务,但我似乎不知道如何做。对我来说,主要问题是通过生成器直接将数据从生产者提供给消费者,同时让 asyncio 发出多个并行请求io_task(x)。还有,这整个async def vs. @asyncio.coroutine事情让我很困惑。

有人可以告诉我如何构建一个使用的最小工作示例吗asyncio从这个示例代码?

(注:这是not可以打电话给io_task(),缓冲结果然后将它们写入文件。我需要一个适用于可能超出主内存的大型数据集的解决方案,这就是我到目前为止一直使用生成器的原因。然而,可以安全地假设消费者总是比所有生产者加起来更快)


从 python 3.6 开始异步发电机 https://www.python.org/dev/peps/pep-0525/,只需进行很少的更改即可使您的代码与 asyncio 兼容。

The io_task函数变成协程:

async def io_task(x):
    await asyncio.sleep(1)
    return 2*x

The producer发电机变成异步发电机:

async def producer(xs):
    for x in xs:
        yield await io_task(x)

The consumer函数成为协程并使用aiofiles https://pypi.python.org/pypi/aiofiles、异步上下文管理和异步迭代:

async def consumer(xs):
    async with aiofiles.open('output.txt', 'w') as fp:
        async for x in xs:
            await fp.write(str(x) + '\n')

主协程在事件循环中运行:

data = [1,2,3,4,5]
main = consumer(producer(data))
loop = asyncio.get_event_loop()
loop.run_until_complete(main)
loop.close()

另外,您可以考虑使用音频流 https://github.com/vxgmichel/aiostream在生产者和消费者之间管道化一些处理操作。


编辑:不同的 I/O 任务可以通过使用轻松地在生产者端同时运行已完成 https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed:

async def producer(xs):
    coros = [io_task(x) for x in xs]
    for future in asyncio.as_completed(coros):
        yield await future
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 asyncio 并行化生成器 的相关文章

随机推荐

  • R-cran 中使用线条、点或类似颜色填充箱线图

    我需要在 R 中为箱线图使用黑白颜色 我想用线条和点对箱线图进行颜色填充 举个例子 我想ggplot2可以做到这一点 但我找不到任何方法来做到这一点 预先感谢您的帮助 我认为这是一个很好的问题 并思考是否可以在 R 基础上做到这一点并获得方
  • 如何使用 Pyinstaller 捆绑 .jar 文件

    你怎么获得py安装程序 http www pyinstaller org将 jar 文件捆绑为使用它们的 python 项目的档案 例如 要制作一个 exe 文件 我正在使用pyjnius http pyjnius readthedocs
  • 如何将 Integer 转换为 int?

    我正在开发一个网络应用程序 其中数据将在客户端和服务器端之间传输 我已经知道 JavaScript int Java int 因为 Java int 不能为 null 对吧 现在这就是我面临的问题 我将 Java int 变量更改为 Int
  • Android Lollipop 中是否可能有一个不共存的“个人资料所有者”应用程序

    Lollipop API 提供了 2 个新功能 配置文件所有者 和 设备所有者 http developer android com about versions android 5 0 html Enterprise http devel
  • Spring-MVC控制器中触发404?

    我如何获得Spring http en wikipedia org wiki Spring Framework3 0控制器触发404 我有一个控制器 RequestMapping value method RequestMethod GET
  • Android 的氛围

    首先 我打算使用气氛 https github com Atmosphere atmosphere在服务器端和客户端都有一个带有 WebView 的 Android 应用程序 我将在其中使用 jQuery 但是 稍后计划推出更 标准 的 A
  • SSRS 包含或类似表达式

    我正在尝试从数据集中的字段创建计算表达式 我需要从一个字段中查找包含 交易所交易 一词的所有内容 并且在我的新字段中包含 ETF 13F 一词 如果没有任何匹配 那么它就只是空白 我已经尝试过喜欢 交易所交易 并包含 交易所交易 函数 并且
  • 使用进程生成器或 apache commons exec 执行外部程序

    我需要执行一个外部应用程序 该应用程序返回大量数据 需要 2 个多小时才能完成 并且连续输出数据 我需要做的是异步执行该程序并将输出捕获到文件中 我尝试使用 java process builder 但是它似乎仅在程序退出或强制终止时才挂起
  • 使用 JSF 验证更改 CSS 样式

    我正在研究在 JSF 服务器端验证后以红色突出显示失败的字段的要求 无法使用 javascript 进行验证 有没有一种方法可以将服务器端验证与 css 样式更改链接起来 您可以使用托管 bean 来执行此操作 public class V
  • Laravel Backpack - 显示关系函数中的特定属性

    我已经注册了Comment模型有一个User参考 像这样 public function user return this gt belongsTo App User 该函数返回一个实例User 这是正确的 但我不知道如何注册User列获取
  • jquery 捕获单词值

    有没有办法用 jquery 或 javascript 捕获单词值 在示例中 搜索 五月行情 当我单击 搜索 或 引号 或任何单词时 我会提醒该单词文本吗 Update 这就是我的意思 http jsfiddle net BE68L http
  • 动态代码生成

    我目前正在开发一个应用程序 您可以用它创建 程序 而无需编写源代码 如果您愿意 只需单击并播放即可 现在的问题是如何从我的数据模型生成可执行程序 有很多种可能性 但我不确定哪一种最适合我 我需要生成包含类和命名空间以及可以成为应用程序一部分
  • 谷歌应用程序脚本桌面IDE [重复]

    这个问题在这里已经有答案了 我喜欢将 Google Sheets 与应用程序脚本一起使用 但在线脚本 IDE 很麻烦 滞后 等 并且没有桌面 IDE 的许多便利 希望谷歌能在某个时候推出桌面IDE 有人知道制作 Google 应用程序脚本的
  • 传单中的javascript地图如何刷新

    我通过使用传单 API 在 javascript 中有一个基本的 geoJson 程序 div style width 100 height 400px div
  • 在 Laravel 8 中捕获 HTTP 客户端错误

    你如何捕捉抛出的错误HTTP客户端 https laravel com docs 8 x http client 例如超时 以便在您可以对错误执行任何操作以避免停止执行之前 它不会在 Laraval 调试器 在调试模式下 中抛出curl 错
  • 表视图中的文本标签太长,会影响正确的详细信息(detailTextLabel)被覆盖或不显示

    我已经为该单元格设置了一个文本 但是 它显示的文本太长 这会影响正确的详细文本被覆盖或不显示 我无法更改它 因为我需要下一个视图控制器中的名称 是否可以使其仅显示文本 后跟 EXAMPLE 电气电子工程 01 gt 传奇 Electrica
  • 如何在插件架构中做到免注册COM

    我们使用清单文件来实现免注册 COM 正如我在这另一个问题 https stackoverflow com questions 465882 generate manifest files for registration free com
  • 在哪里添加 String 原型

    我目前正在 Titanium Studio 中使用 JavaScript CommonJS 并且有一个关于原型设计的问题 假设我想向现有的类添加一个新函数 例如 String prototype trim function return t
  • 运行应用程序中的 Grails 3.0 静态 html

    之前曾就 grails 2 3 4 提出过类似的问题 我觉得很奇怪 我找不到一种方法来做到这一点 因为这对我来说似乎是一个标准用例 我只是想在运行 grails run app 时提供 html 页面 包括它们链接的 css 和 js An
  • 使用 asyncio 并行化生成器

    我的应用程序从慢速 I O 源读取数据 进行一些处理 然后将其写入本地文件 我已经用生成器实现了这个 如下所示 import time def io task x print requesting data for input s x ti