在 python3 中合并异步迭代

2023-11-27

有没有一个好的方法,或者一个支持良好的库,用于在 python3 中合并异步迭代器?

期望的行为与在reactivex中合并可观察量的行为基本相同。

也就是说,在正常情况下,如果我合并两个异步迭代器,我希望生成的异步迭代器按时间顺序产生结果。迭代器之一中的错误应该会使合并的迭代器脱轨。

Merging Observables

(来源:http://reactivex.io/documentation/operators/merge.html)

这是我最好的尝试,但似乎可能有一个标准解决方案:

async def drain(stream, q, sentinal=None):
    try:
        async for item in stream:
            await q.put(item)
        if sentinal:
            await q.put(sentinal)
    except BaseException as e:
        await q.put(e)


async def merge(*streams):

    q = asyncio.Queue()
    sentinal = namedtuple("QueueClosed", ["truthy"])(True)

    futures = {
        asyncio.ensure_future(drain(stream, q, sentinal)) for stream in streams
    }

    remaining = len(streams)
    while remaining > 0:
        result = await q.get()
        if result is sentinal:
            remaining -= 1
            continue
        if isinstance(result, BaseException):
            raise result
        yield result


if __name__ == "__main__":

    # Example: Should print:
    #   1
    #   2
    #   3
    #   4

    loop = asyncio.get_event_loop()

    async def gen():
        yield 1
        await asyncio.sleep(1.5)
        yield 3

    async def gen2():
        await asyncio.sleep(1)
        yield 2
        await asyncio.sleep(1)
        yield 4

    async def go():
        async for x in merge(gen(), gen2()):
            print(x)

    loop.run_until_complete(go())

您可以使用aiostream.stream.merge:

from aiostream import stream

async def go():
    async for x in stream.merge(gen(), gen2()):
        print(x)

更多示例在文档和这个answer.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 python3 中合并异步迭代 的相关文章

随机推荐

  • python3用单反斜杠替换双反斜杠[重复]

    这个问题在这里已经有答案了 我需要更换 with 在python3中是一个复杂的字符串 我知道这个问题已经被问过好几次了 但大多数时候都是针对简单的字符串 因此没有一个 接受的 答案真正适用于复杂的字符串 这也是不同的 from this
  • 如何通过在运行时选择单元测试来运行 CPPUnit 中的单元测试子集?

    我使用 CppUnit 作为单元测试框架 是否可以选择测试用例的子集在运行时执行 CppUnit 中是否提供了过滤选项来适应这种情况 您可能在 main 中调用的 TestRunner run 方法实际上具有可选参数 run std str
  • Javascript正则表达式-exec无限循环

    我正在尝试使用正则表达式获取链接文本 可能有几个链接可能与该模式匹配 我想获得最远的一个直到第四个 这是我的JS代码 var level 1 while match a href a
  • excel中24小时以上的时间表示

    有没有办法在 Excel 中表示时间值 其中小时数可以为 24 及以上 例如 第二天凌晨 1 点的时间为 25 00 00 公共交通调度中的常见表示 它不能只是纯文本 因为我希望能够对它们执行计算 例如平均值 标准差 或绘制图表 情况已经是
  • 为什么这个主要测试这么慢?

    这段代码取自 Haskell Road to Logic Math andProgramming 一书 它实现了埃拉托斯特尼筛法并解决了欧拉计划问题 10 sieve Integer gt Integer sieve 0 xs sieve
  • 矢量作为 C++ 中的数据成员

    在 C 中 如何将 101 个元素向量作为数据成员包含在我的类中 我正在执行以下操作 但它似乎不起作用 private std vector lt bool gt integers 101 我已经包含了矢量标头 提前致谢 class myC
  • 将数组分割成块

    假设我有一个如下所示的 Javascript 数组 Element 1 Element 2 Element 3 with close to a hundred elements 什么方法适合将数组分成许多更小的数组 比如说最多 10 个元素
  • JavaScript/jQuery VIN 验证器

    有人创建过 VIN 验证器吗 我正在尝试创建一个文本框 用户将在其中输入车辆识别号 然后 JS jQuery 将验证其是否正确 以防用户输入错误的数字 我对 JS jQuery 很陌生 并且找到了一些示例 但当然无法让它们正常工作 任何有任
  • string::c_str 查询

    调用 string c str 返回的指针指向哪里 在下面的代码片段中 我以为我会给出分段错误 但它给了我正确的输出 如果 string c str 返回的指针指向字符串对象内的内部位置 那么当函数返回并调用对象析构函数时 我应该获得无效的
  • Magento resize() 图像质量:脏白色背景

    我有一位客户对其产品缩略图在 Magento 上的呈现方式非常不满意 这种狡猾的外观在两个方面很明显 有一个肮脏的白色背景 有非常浅灰色的水平线 其次 颜色损失非常轻微 失去对比度和饱和度 我已经删除了所有压缩 将所有质量设置为 100 刷
  • 是否可以通过 WebKit、FireBug 或 IE8 开发工具等调试器来调试动态加载 JavaScript?

    From 我最近的问题 我已经创建了一些用于动态加载部分视图的 JavaScript 函数 但我无法调试任何动态加载 JavaScript 因为所有加载的 JavaScript 都将由 eval 函数进行评估 我找到了一种创建新 JavaS
  • libusb_claim_interface 在 Mac OS X Mountain Lion 上失败

    我已经搜索了几个小时但没有结果 我到处都看到过libusb detach kernel driverMac OS X 不支持 但我还没有找到它的补丁或替代方案 libusb claim interface返回这个 libusb 0 8633
  • Django表单中,自定义SelectField和SelectMultipleField

    我现在每天都使用 Django 已经三个月了 它真的很棒 快速 Web 应用程序开发 我还有一件事不能完全按照自己的意愿去做 它是选择字段和选择多个字段 我希望能够将一些参数添加到 Select 的选项中 我终于成功使用 optgroup
  • MySQL 的 my.ini 在 Windows 上位于哪里?

    我已经看过了http dev mysql com doc refman 4 1 en mysql config wizard file location html 如何找到 MySQL my cnf 位置 and http dev mysq
  • 传递二维数组作为参数

    我正在尝试将二维数组传递给接受指向指针的指针的函数 我了解到二维数组并不是指向指针的指针 指向一维数组的指针 当我编译下面的代码时 我收到了这个错误 include
  • 如何在 Chrome 控制台中包含 JavaScript 文件或库?

    是否有更简单 也许是本机 的方法来在 Google Chrome 浏览器中包含外部脚本文件 目前我正在这样做 document head innerHTML appendChild 是一种更原生的方式 var script document
  • 如何将多线程应用于反向传播神经网络训练?

    在我的大学项目中 我正在创建一个神经网络 可以对信用卡交易欺诈与否的可能性进行分类 我正在用反向传播进行训练 我正在用 Java 写这个 我想应用多线程 因为我的电脑是四核i7 花几个小时训练却发现我的大部分核心都闲置 这让我很烦恼 但是我
  • 如何检查解密是否正确?

    我正在开发一个聊天室 为多个用户加密消息 每个用户可能有不同的加密和密钥 密码 因此 用户的密钥不适用于所有消息 返回错误 var message secret message var encrypted CryptoJS AES encr
  • 使用带有导航抽屉的 ViewPager 的操作栏选项卡

    要求 使用带有导航抽屉的 ViewPager 的操作栏选项卡 我可以创建一个导航抽屉示例 单独使用 ViewPager 的操作栏选项卡 但是当我尝试同时使用两者时 我遇到了问题 我可以使用片段创建导航抽屉 并使用片段创建操作栏选项卡 但这两
  • 在 python3 中合并异步迭代

    有没有一个好的方法 或者一个支持良好的库 用于在 python3 中合并异步迭代器 期望的行为与在reactivex中合并可观察量的行为基本相同 也就是说 在正常情况下 如果我合并两个异步迭代器 我希望生成的异步迭代器按时间顺序产生结果 迭