如何在异步代码中处理 CPU 密集型任务

2024-04-20

我正在做一些需要异步方法的繁重处理。我的一个方法返回一个字典列表,在将其添加到另一个可等待对象之前需要对其进行大量处理。 IE。

def cpu_bound_task_here(record):
    ```some complicated preprocessing of record```
    return record

在好心人给出以下答案后,我的代码现在被卡住了。

async def fun():
print("Socket open")
record_count = 0
symbol = obj.symbol.replace("-", "").replace("/", "")
loop = asyncio.get_running_loop()
await obj.send()

while True:
    try:
        records = await obj.receive()
        if not records:
            continue

        record_count += len(records)
        

因此,上述函数的作用是异步流式传输值,并在无限期地推送到 Redis 之前进行一些繁重的处理。我做了必要的改变,但现在我陷入了困境。


正如该输出告诉您的那样,run_in_executor返回一个Future。您需要等待它才能得到结果。

record = await loop.run_in_executor(
    None, something_cpu_bound_task_here, record
)

请注意,任何参数something_cpu_bound_task_here需要传递给run_in_executor.

此外,正如您所提到的,这是一个 CPU 密集型任务,您需要确保您使用的是concurrent.futures.ProcessPoolExecutor。除非你打过电话loop.set_default_executor某处,默认值是一个实例ThreadPoolExecutor.

with ProcessPoolExecutor() as executor:
    for record in records:
        record = await loop.run_in_executor(
            executor, something_cpu_bound_task_here, record
        )

最后,您的 while 循环有效地同步运行。你需要等待未来,然后等待obj.add在继续处理下一个项目之前records。您可能想稍微重组您的代码并使用类似的东西gather以允许一些并发。

async def process_record(record, obj, loop, executor):
    record = await loop.run_in_executor(
        executor, something_cpu_bound_task_here, record
    )
    await obj.add(record)

async def fun():
    loop = asyncio.get_running_loop()
    records = await receive()
    with ProcessPoolExecutor() as executor:
        await asyncio.gather(
            *[process_record(record, obj, loop, executor) for record in records]
        )
        

我不知道如何处理obj因为您的示例中没有定义这一点,但我相信您可以弄清楚。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在异步代码中处理 CPU 密集型任务 的相关文章

随机推荐

  • PHP Mailer 返回 PHP 致命错误:未捕获 PHPMailer\PHPMailer\Exception:SMTP 错误:无法进行身份验证

    我看到了很多类似的问题 比如我的问题 但从来没有一个对我有帮助的 我正在尝试 PHPMailerhttps github com PHPMailer PHPMailer https github com PHPMailer PHPMaile
  • 使用 JQuery .ajax,使用 jquery 'json' 与 'text' 时不会调用 Success 方法

    我有一个 ASP Net 网页尝试使用 jquery axax 方法从 asmx web 服务检索数据 当dataType text 时 ajax方法正确调用成功方法 但是当使用 json 的dataType时 我无法让它返回 谁能看到我缺
  • 当存在 UTF-8 字符编码时,显示引号的问号图标

    由于某种原因 我添加后 在我的头标签之间 某些符号 即引号 显示为中间带有问号的菱形图标 就好像没有找到该符号一样 有人知道怎么回事吗 您的文本编辑器正在以某种非 UTF 8 很可能是 CP1252 的编码保存文件 包括引号 将文件实际转换
  • Java 中所有 UTF-8 字符的维吉尼亚密码

    我有一个简单的函数 用于通过 Java 中的 Vigen re 加密字符串 我省略了解密 因为这只是计算新值的行中的 而不是 但此功能仅适用于普通字母 A Z 如何更改该函数以使其支持小写字母以及大写字母和所有其他 UTF 8 字符 pub
  • 将 Homebrew 安装的 Qt 添加到 Mac 上的 Qt Creator

    我通过安装 Qt5 和 Qt Creatorhomebrew https brew sh 由于它们是彼此独立安装的 因此 Qt 不会自动添加到 Qt Creator 中的已知 Qt 安装列表中 我以为添加安装很简单 但是 由于自制程序将所有
  • 为什么要把Reader的构造函数参数定义为函数呢?

    在学习Reader Monad时 我发现它的定义是 newtype Reader r a Reader runReader r gt a instance Monad Reader r where return a Reader gt a
  • Express js:将多个路由文件合并为单个文件

    在我的express js应用程序中 我按以下方式组织我的路线 paths comment js 处理所有评论路由 var express require express var router express Router var comm
  • 将 DAO 记录集转换为断开连接的 ADO 记录集 dbDecimal 问题

    在 MS Access VBA 2007 中 我编写了以下函数 将 DAO 记录集转换为断开连接的内存中 ADO 记录集 问题是我在 DAO dbDecimal 字段上遇到数据类型转换问题 当我尝试将 DAO 记录集中的数据插入到新创建的
  • 使用 geom_smooth() 和“gam”相当于 span

    这可能是一个非常基本的问题 但我还没有找到答案 有没有相当于span论据中的geom smooth函数时method gam 我对 GAM 不太熟悉 所以如果有任何意见 我将不胜感激 我想为 n gt 1 000 的数据添加更灵活 更摆动
  • 声明图像字段的正确方法,sqlalchemy

    我正在尝试使用 Flask 构建和应用程序 我看过一些教程和书籍以及一些代码 1 它们解释了如何为数据库设置声明整数和字符串 然而 他们没有解释如何存储图像 我现在很困惑 虽然存储图像的自然方法是在数据库中 但阅读 Flask 站点上的一些
  • .NET 中机器的域名?

    一定有一种简单的方法可以做到这一点 我不敢相信没有 我扫描了网络 发现有 20 种不同的方法来查找当前用户所在的域 但没有一种方法可以获取当前计算机的域 或工作组 在非托管 c 中 这是通过以下方式检索的 WKSTA INFO 100 bu
  • Python CSV 模块 - 引号丢失

    我有一个 CSV 文件 其中包含这样的数据 15 I 2 41301888 BYRNESS RAW BYRNESS VILLAGE NORTHUMBERLAND ENG 11 I 3 41350101 2 2935 2 2008 01 09
  • OpenCV:何时使用 GridAdaptedFeatureDetector?

    我正在尝试制作一个基于描述符的检测器 我正在使用 OpenCV 我发现有很多特征类型和描述符类型 以及匹配器类型 更多我还看到可以有诸如网格或金字塔之类的组合类型作为特征类型 我还没有找到对它们的很好的解释 除了金字塔 它说这很好 对于本质
  • 容器“Android 依赖项”引用不存在的库 appcompat_v7.jar”

    我知道这个问题已被问过很多次 但不幸的是找不到任何解决方案 所以这里是 The container Android Dependencies references non existing library C Users Zain ul a
  • 替代RelativeLayout中的weightSum?

    我有四个TextView我想要在水平线上均匀分布的项目 这意味着一行中的所有空间必须均匀地被TextView项目 像这样 以前 我使用 LinearLayout 来实现此目的 我将weightSum 设置为4 并将layout weight
  • 单个构建步骤的 TeamCity 构建日志

    当 teamcity 执行 MSBuild 步骤时 构建日志具有可折叠 可扩展的层次结构 我有一个很大的构建步骤 它运行一个 powershell 脚本 生成的构建日志很大 需要很长时间才能加载 有没有一种方法可以让 teamcity 将单
  • 使用python sklearn增量训练随机森林模型

    我使用下面的代码来保存随机森林模型 我正在使用 cPickle 保存训练后的模型 当我看到新数据时 我可以增量训练模型吗 目前 训练集大约有2年的数据 有没有办法再训练两年并将其 某种程度上 附加到现有保存的模型中 rf RandomFor
  • 如何在没有 SSRS 服务器的情况下使用报表查看器控件执行 .rdl 报表?

    我正在尝试设置一个网页 用户可以在其中选择要运行的 rdl 文件 它将打开报表查看器控件 ASPX 加载报表定义文件 运行它并显示报表 到目前为止 我发现 本地 报告只能接受来自代码的数据源 因此您必须手动执行数据库代码 这将很痛苦 因为报
  • 为什么这个简单的 C# 试用不起作用 [重复]

    这个问题在这里已经有答案了 这会产生条纹而不是点 为什么 我正在尝试绘制单个像素 还尝试了另一种方法 使用 fillrectangle 它也没有给出所需的结果 得到的是条形而不是点 protected override void OnPai
  • 如何在异步代码中处理 CPU 密集型任务

    我正在做一些需要异步方法的繁重处理 我的一个方法返回一个字典列表 在将其添加到另一个可等待对象之前需要对其进行大量处理 IE def cpu bound task here record some complicated preproces