asyncio as_yielded 来自异步生成器

2024-02-25

我希望能够从许多异步协程中产生收益。异步的as_completed有点接近我正在寻找的东西(即我希望任何协程能够随时返回调用者然后继续),但这似乎只允许常规协程具有单个返回。

这是我到目前为止所拥有的:

import asyncio


async def test(id_):
    print(f'{id_} sleeping')
    await asyncio.sleep(id_)
    return id_


async def test_gen(id_):
    count = 0
    while True:
        print(f'{id_} sleeping')
        await asyncio.sleep(id_)
        yield id_
        count += 1
        if count > 5:
            return


async def main():
    runs = [test(i) for i in range(3)]

    for i in asyncio.as_completed(runs):
        i = await i
        print(f'{i} yielded')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

更换runs = [test(i) for i in range(3)] with runs = [test_gen(i) for i in range(3)]并为for i in asyncio.as_completed(runs)迭代每个产量就是我所追求的。

这是否可以用 Python 表达,是否有任何第三方可以为您提供比协程流程标准库更多的选择?

Thanks


您可以使用aiostream.stream.merge http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.merge:

from aiostream import stream

async def main():
    runs = [test_gen(i) for i in range(3)]
    async for x in stream.merge(*runs):
        print(f'{x} yielded')

在一个安全环境 http://aiostream.readthedocs.io/en/latest/core.html#aiostream.core.Stream.stream确保迭代后正确清理生成器:

async def main():
    runs = [test_gen(i) for i in range(3)]
    merged = stream.merge(*runs)
    async with merged.stream() as streamer:
        async for x in streamer:
            print(f'{x} yielded')

或者使用使其更紧凑pipes http://aiostream.readthedocs.io/en/latest/operators.html#pipe-lining:

from aiostream import stream, pipe

async def main():
    runs = [test_gen(i) for i in range(3)]
    await (stream.merge(*runs) | pipe.print('{} yielded'))

更多示例在文档 http://aiostream.readthedocs.io/en/latest/examples.html.


处理@nirvana-msu 评论

通过准备相应的源,可以识别产生给定值的生成器:

async def main():
    runs = [test_gen(i) for i in range(3)]
    sources = [stream.map(xs, lambda x: (i, x)) for i, xs in enumerate(runs)]
    async for i, x in stream.merge(*sources):
        print(f'ID {i}: {x}')
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

asyncio as_yielded 来自异步生成器 的相关文章

随机推荐

  • TFS 构建 PowerShell 步骤中的 Robocopy 报告失败但没有错误

    我的 powershell 脚本运行时日志文件中没有报告错误 但 TFS 2015 构建步骤报告错误 我需要执行特殊回电吗 这是一种新的样式构建 而不是基于 XAML 的构建 该脚本没有什么特别的 它调用 robocopy 并且成功发生 这
  • 默认情况下启用 WIX 的 MSI 日志记录

    我目前正在为我的软件编写安装程序 并且真的很想记录安装 我正在使用维克斯 但是 我见过记录安装的唯一方法是更改 reg 作为全局设置 并使用 l v 或类似的东西将其添加为命令行中的命令 我想做的是 安装程序运行后立即记录该安装程序 而不修
  • Werkzeug 引发 BrokenFilesystemWarning

    当我将表单数据发送到 Flask 应用程序时 出现以下错误 它说它将使用 UTF 8 编码 但区域设置已经是 UTF 8 这个错误是什么意思 home virtualenvs project local lib python2 7 site
  • While循环:UnboundLocalError:赋值前引用的局部变量

    我正在使用 python 3 5 因此 我尝试创建一个函数 将 x 和 y 作为正浮点输入 然后计算并返回 R x N y 其中 N 是最大整数 因此 x gt N y 我做了这个功能 def floatme x y N 1 while x
  • 将类型结构的通用列表绑定到中继器

    我在尝试将通用列表绑定到中继器时遇到了一些问题 泛型列表中使用的类型实际上是一个结构体 我在下面构建了一个基本示例 struct Fruit public string FruitName public string Price strin
  • MySQL 崩溃(“某些指针可能无效并导致转储中止”)

    我有一个名为 tweets 的 5GB MySQL 数据库 我需要从中访问 searchresults 表 但是 当我对其执行查询或创建转储时 MySQL 服务器 在 Windows 10 上运行 始终崩溃 并在同一行出现相同的错误 例如
  • Hyperledger Fabric 中的对等通道创建失败

    我正在尝试与一个订购者和 2 个对等方建立一个示例超级账本结构环境 我没有使用 docker 方法 而是运行实际的可执行文件本身 Orderer 和 2 个对等节点已成功启动 但是 通道创建失败并出现以下错误 任何帮助将不胜感激 订购者窗口
  • 比较 Cloud Functions 中的两个 Firestore 时间戳

    我正在 Firestore 中编写更新函数 我想比较两个Timestamp 我尝试了多种方法但没有成功 你能指出我比较两个的正确方法吗Timestamp在消防库里 exports updateFunction functions fires
  • Java监听端口

    我们想要捕获通过端口 7777 到达系统的数据 public static void main String args try final ServerSocket serverSocket new ServerSocket 7777 ne
  • 在 F# 中将字符串相乘

    我有一个问题我不太确定 我的问题如下 let myFunc text string times int 我想要这个函数做的是将字符串放在一起的次数与指定的次数相同times范围 if input check 3我想要输出字符串 check
  • 如何生成x的前20次方?

    所以 我有 X 一个 300 1 向量 我想要 1 X X X X X X X X X 300 20 矩阵 我该怎么做 X 2 1 X X X X X X ans 2 4 8 1 1 1 这可行 但我无法面对将整个内容打出来 我肯定不需要写
  • Jackson 2.2.3 中属性的 getter 定义相互冲突

    为了简单起见 这里是一个简单的类 class GetterMethodsObject int id 10 public int getId return id JsonIgnore public boolean isId return tr
  • 使用相互或循环(循环)导入时会发生什么?

    在 Python 中 当两个模块尝试执行操作时会发生什么import彼此 更一般地说 如果多个模块尝试import在一个循环中 See also What can I do about ImportError Cannot import n
  • LINQ to Entities 查询不支持转换为十进制

    我有一个数据库表事务 transactionID LocalAmount 其中 Localmount 属性的数据类型是float 在用户界面上我试图返回SUM按钮单击事件中一行中的列 Localamount 我用过decimal代替floa
  • 什么是“单位”? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 在单元测试的背景下 什么是 单元 我通常将其定义为单一代码执行路径通过单一方法 根据经验法则 测试一个方法所需的单元测试数量等于或大于
  • 透明 UINavigationBar 下的 UIWebView

    我有一个 UIWebView 我想将其放在半透明的 UINavigationBar 下 通常 当我将 UIScrollView 放在半透明的 UINavigationBar 下时 我会设置其 contentOffset 以便所有内容最初都会
  • API 端点返回“此请求的授权已被拒绝”。发送不记名令牌时

    我已按照教程使用 C 中的 OAuth 保护 Web API 我正在做一些测试 到目前为止我已经能够成功地从 token 我正在使用名为 Advanced REST Client 的 Chrome 扩展来测试它 access token t
  • 是否可以向 networkx 中的图形对象添加无向和有向边?

    我正在致力于实现一种算法来确定数据集的图形结构 数据集的变量之间可以有无向或有向边 我可以用 Python 创建自己的图形对象 但我很好奇 Networkx 是否具有此功能 据我所知 Networkx 只有一个 Graph 对象 仅无向边
  • Windows 身份验证和 Angular 7 应用程序

    我开发了内联网应用 后端 ASP NET WEB API 2 所有控制器都有授权属性 前端 Angular 7 产品构建后 我将生成的脚本移至后端项目
  • asyncio as_yielded 来自异步生成器

    我希望能够从许多异步协程中产生收益 异步的as completed有点接近我正在寻找的东西 即我希望任何协程能够随时返回调用者然后继续 但这似乎只允许常规协程具有单个返回 这是我到目前为止所拥有的 import asyncio async