如何正确使用run_in_executor?

2024-02-24

我尝试使用run_in_executor并有一些问题。这是代码(基本上是从文档复制粘贴)

import asyncio
import concurrent.futures


def cpu_bound(val):
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    print(f'Start task: {val}')
    sum(i * i for i in range(10 ** 7))
    print(f'End task: {val}')


async def async_task(val):
    print(f'Start async task: {val}')
    while True:
        print(f'Tick: {val}')
        await asyncio.sleep(1)


async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    for i in range(5):
        loop.create_task(async_task(i))

    # 1. Run in the default loop's executor:
    # for i in range(10):
    #     loop.run_in_executor(
    #         None, cpu_bound, i)
    # print('default thread pool')

    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
    #     for i in range(10):
    #         loop.run_in_executor(
    #             pool, cpu_bound, i)
    #     print('custom thread pool')

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor(max_workers = 10) as pool:
        for i in range(10):
            loop.run_in_executor(
                pool, cpu_bound, i)
        print('custom process pool')

    while True:
        await asyncio.sleep(1)


asyncio.run(main())

Case 1: run_in_executor where executor is None: async_task的执行时间与cpu_bound的执行。

在其他情况下async_task的将在之后执行cpu_bound完成了。 我想当我们使用ProcessPoolExecutor任务不应该阻塞循环。我哪里错了?


在其他情况下async_task的将在之后执行cpu_bound完成了。我想当我们使用ProcessPoolExecutor任务不应该阻塞循环。我哪里错了?

问题是with XXXPoolExecutor()结束时关闭泳池with堵塞。池关闭会等待挂起的任务完成,这会阻塞事件循环并且与 asyncio 不兼容。由于您的第一个变体不涉及with声明,不存在这个问题。

解决方案很简单,删除with语句并创建一次池(例如在顶层或在main()),并且只是use它在函数中。如果需要,您可以通过调用显式关闭池pool.shutdown() after asyncio.run()已完成。

另请注意,您永远不会awaiting返回的期货loop.run_in_executor。这是一个错误,asyncio 可能会警告您;您可能应该将返回的值收集在列表中并使用类似的内容等待它们results = await asyncio.gather(*tasks)。这不仅会收集结果,还会确保脱线程函数中发生的异常正确传播到您的代码而不是被丢弃。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何正确使用run_in_executor? 的相关文章

随机推荐

  • 将模块导入 Pyscript

    当我们编写 Python 代码时 我们通常使用导入的包和模块 例如 我们在编码时可能会这样写 import numpy import requests from bs4 import BeautifulSoup 当我们尝试将 python
  • 在 git 中,如何仅从更改的行中删除 Windows 行结尾?

    有时 当我尝试向开源项目贡献代码时 该项目尚未格式化并且包含 UNIX 和 Window 行结尾 我的 智能 IDE 会以某种方式检测每个文件使用哪种类型的结尾 如果它检测到 Windows 行结尾 那么我的所有更改都将具有 Windows
  • 更改 Sysem.Variants.VarToWideStr 的区域设置格式

    我的应用程序上的第三方组件 FastReports 广泛使用 System Variants VarToWideStr 函数 这很好 只是它忽略了我需要该应用程序使用的区域设置 Example FormatSettings ShortDat
  • Spark 跨接收器的结构化流一致性

    我想在以下情况下更好地理解 Spark 2 2 结构化流的一致性模型 一个来源 Kinesis 从此源向 2 个不同接收器进行 2 次查询 一个用于存档目的的文件接收器 S3 另一个用于处理数据的接收器 数据库或文件 尚未决定 我想了解跨接
  • 覆盖 django 的模型相关管理器

    我如何才能超越关系经理 例如 user entry set django db models fields related RelatedManager 但我需要自己的经理 我尝试这段代码 但这不起作用 class EntryManager
  • 为什么 QObject ::findChildren 返回具有公共基类的子级?

    我使用 QObject 作为复合模式的基类 假设我有一个父类 File 在一个人为的示例中 我向其中添加不同类型的子类 HeaderSection 和 PageSection File HeaderSection 和 PageSection
  • 简单表达式缺少参数类型

    遵循播放 websocket 示例 http www playframework com documentation 2 3 x ScalaWebSockets我遇到了一个奇怪的问题 文档中的以下示例正在运行 Future successf
  • 弹出并刷新视图控制器

    我有三个视图控制器 当我到达第三个视图控制器时 我使用 poptorootviewcontroller 弹出到我的第一个视图控制器 但是当我在第三个视图控制器中使用 popviewcontroller 我想返回到我的第二个视图控制器 时 它
  • 为什么 PowerShell 无法识别带引号的参数?

    当您直接调用脚本 在 PowerShell 控制台或 ISE 中 或通过另一个 PowerShell 实例调用脚本时 为什么 PowerShell 对带引号的参数的处理方式有所不同 这是脚本 TestQuotes ps1 param str
  • scala 中的非最终单例对象有什么意义?

    我一直以为objectScala 中的声明将被编译为final类 因为它们是由有效的匿名类实现的 自从final与非最终类相比 类更容易被 JVM 优化 我认为最终性有好处并且没有成本 所以所有object实施将是最终的 但我一定错过了一些
  • PySpark 使用临时 AWS 令牌进行 s3 身份验证的问题

    我已经设置了本地 PySpark 但是每次我尝试使用 s3a 协议读取文件 s3 时 它都会返回 403 AccessDenied 错误 我尝试连接的账户仅支持 AWS ShouldRole 它为我提供了临时 Access key Secr
  • 如何在不使用 LoadBalancer 类型的情况下发布 Kubernetes 服务(在 GCP 上)

    我想避免使用type LoadBalancer 对于某个 Kubernetes Service 但仍然能够将其发布到互联网上 我正在使用 Google Cloud Platform GCP 来运行当前在单个节点上运行的 Kubernetes
  • android中的fontFamily和typeFace有什么区别?

    android 中的 fontFamily 和 typeFace 有什么区别 当然 我阅读了android开发者网站中的所有描述 但我还不清楚 根据两个词的一般含义 应该是相同的意思 但在android xml属性 textview 中 它
  • 全局覆盖 Emacs 中的键绑定

    如何设置全局覆盖并优先于该键的所有其他绑定的键绑定 我想覆盖所有主要 次要模式映射并确保我的绑定始终有效 这当然行不通 global set key C i some function 它适用于text mode 但是当我使用lisp mo
  • 从 WCF 客户端使用非 wcf SOAP 错误(定义了 SOAP 错误)

    我有一个从 WCF 客户端调用的非 WCF 服务器 我需要访问已注册的肥皂故障 以防服务器抛出它 它包含我需要用户的反馈 我使用了来自的例子如何从 WCF 客户端访问 SOAP 1 1 错误详细信息 无错误契约 https stackove
  • 使用 Java 进行 MongoDB ISODate 字段搜索

    我在使用 Java 搜索 mongodb 中的 ISODate 字段时遇到问题 我想找到完全匹配的日期 以下是我查询第一个集合并获取 ISODate 字段 Timestamp 的方法 一旦我得到这个日期 我想搜索具有相同 时间戳 值的另一个
  • FxCop 和代码分析规则

    我最近开始在 Visual Studio 中的项目中使用代码分析 我创建了一个自定义规则集 用于我的所有项目 其中包括两个 Microsoft 定义的规则集 我一直致力于将 FxCop 集成到 CI 构建过程中 以使未通过所有规则的构建失败
  • 我如何使用 Rugged 来创建和提交文件,就像从命令行一样?

    我正在尝试使用坚固的东西来做一些非常简单的事情 创建并提交一个文件 使存储库处于与执行相同的状态 git init echo blah blah blah gt blah txt git add blah txt git commit m
  • 围绕三角形画布旋转圆

    我想使用画布围绕三角形旋转一个圆 前面有这段代码 但这是中间的圆 还有一个旋转的矩形 我希望圆旋转并在中间有一个三角形 有人可以帮忙吗 这是我的JS代码 var canvas document getElementById canvas v
  • 如何正确使用run_in_executor?

    我尝试使用run in executor并有一些问题 这是代码 基本上是从文档复制粘贴 import asyncio import concurrent futures def cpu bound val CPU bound operati