在 Uvicorn 中与多个工作线程一起使用多重处理(线程锁)

2024-01-14

我正在使用通过 uvicorn 提供的 FastAPI 构建一个 API。 该 API 具有使用 python 多处理库的端点。

端点为 CPU 密集型任务生成多个进程以并行执行它们。 以下是高级代码逻辑概述:

import multiprocessing as mp

class Compute:
    
    def single_compute(self, single_comp_data):
        # Computational Task CPU BOUND
        global queue
        queue.put(self.compute(single_comp_data))

    def multi_compute(self, task_ids):
        # Prepare for Compuation
        output = {}
        processes = []
        global queue
        queue = mp.Queue()
        
        # Start Test Objs Computation
        for tid in task_ids:
            # Load  task data here, to make use of object in memory cache
            single_comp_data = self.load_data_from_cache(tid)
            p = mp.Process(target=self.single_compute, args=single_comp_data)
            p.start()
            processes.append(p)

        # Collect Parallel Computation
        for p in processes:
            result = queue.get()
            output[result["tid"]]= result
            p.join()

        return output

下面是简单的 API 代码:

from fastapi import FastAPI, Response
import json


app = FastAPI()
#comp holds an in memory cache, thats why its created in global scope
comp = Compute()

@app.get("/compute")
def compute(task_ids):
    result = comp.multi_compute(task_ids)
    return Response(content=json.dumps(result, default=str), media_type="application/json")

当像这样与多个工作人员一起运行时:

uvicorn compute_api:app --host 0.0.0.0 --port 7000 --workers 2

我收到这个 python 错误

TypeError: can't pickle _thread.lock objects

只有 1 个工作进程就可以了。该程序在 UNIX/LINUX 操作系统上运行。

有人可以向我解释一下为什么这里的多个 uvicorn 进程不可能分叉一个新进程,以及为什么我会遇到这个锁?

最终应该实现的目标很简单:

uvicorn 进程会产生多个其他进程(子进程 通过 fork)以及该 uvicorn 进程的内存副本。执行CPU密集型任务。


类型错误:无法 pickle _thread.lock 对象

源于您传递到子流程中的任何数据

p = mp.Process(target=self.single_compute, args=single_comp_data)

包含一个不可腌制的对象。

所有 args/kwargs 发送到multiprocessing子流程(无论是通过流程,还是通过Pool) 必须是可腌制的,同样,函数 run 的返回值也必须是可腌制的,以便可以将其发送回父进程。

如果您在 UNIX 上并使用fork多处理的启动方法(这是 Linux 上的默认设置,但不是 macOS 上的默认设置),您还可以利用写时复制内存语义,通过使数据可用来避免“向下”复制到子进程,例如通过实例状态、全局变量……,在生成子进程之前,并让它通过引用获取它,而不是将数据本身作为参数向下传递。

这个例子正在使用imap_unordered为了性能(假设不需要按顺序处理 id),并将返回一个将输入 ID 映射到它创建的结果的字典。

class Compute:
    _cache = {}  # could be an instance variable too but whatever

    def get_data(self, id):
        if id not in self._cache:
            self._cache[id] = get_data_from_somewhere(id)
        return self._cache[id]

    def compute_item(self, id):
        data = self.get_data(id)
        result = 42  # ... do heavy computation here ...
        return (id, result)

    def compute_result(self, ids) -> dict:
        for id in ids:
             self.get_data(id)  # populate in parent process
        with multiprocessing.Pool() as p:
             return dict(p.imap_unordered(self.compute_item, ids))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Uvicorn 中与多个工作线程一起使用多重处理(线程锁) 的相关文章

随机推荐

  • 检测连接到 Wifi 的 Android 设备

    我想制作一个连接到 Wifi 网络的 Android 应用程序 假设网络 SSID ABC 假设它已连接到 Wifi ABC 连接到 ABC 后 我希望我的应用程序显示连接到同一 wifi ABC 网络的所有 Android 设备的 ip
  • 使用 Parallel.For 和 EPPlus 创建 Excel 工作表

    我正在使用EPPlus http epplus codeplex com 库来创建包含许多工作表的 Excel 工作簿 我想知道并行构建工作表是否安全 如果库支持这种行为 我在 有限的 文档中找不到提及 package new ExcelP
  • 在 Visual Studio 2013 中的托管单元测试上使用混合模式调试

    我在 Visual Studio 2013 测试框架中有一个 C 单元测试 它练习 CLI 和本机代码 我想在执行 C 单元测试时研究代码的本机部分 但是 运行 测试 gt 调试 gt 所有测试 会运行托管调试器 因此不会命中本机代码中的断
  • 不同内核的线程如何访问同一全局内存地址?

    如果一个线程束中的许多线程想要读取全局内存中的某个地址 那么该数据就会被广播 对吗 如果 warp 中的许多线程想要写入全局内存中的某个地址 则存在序列化 但无法预测顺序 对吗 但是 第一个问题 如果不同扭曲 不同块中的许多线程想要写入全局
  • 设置要在 PowerShell 导出 csv 中使用的日期格式?

    我正在尝试将数据库表导出为文本 CSV ish 以供以后批量插入 采用 ISO 格式 yyyy mm dd 的日期会少很多麻烦 我相信 我最终说服了 SQL Server Express 在导入时采用英式格式 尽管无论我做什么 灰色的服务器
  • GitHub Copilot 命令不起作用并显示错误

    我安装 GitHub Copilot 只是为了测试 但是 这些命令都不起作用 例如 如果我尝试按 CTRL Enter 则会收到以下错误消息 未找到命令 github copilot generate 我正在尝试使用 JS 文件 我安装了最
  • c# HttpWebRequest 不向代理服务器发送默认凭据

    我正在使用鱿鱼代理服务器在将请求传递到公共网络之前对客户端进行身份验证 我还没设置HttpWebRequest Proxy对象 因此我假设 Web 请求将采用默认窗口凭据并传递到代理服务器 我也已将用户条目添加到鱿鱼代理 但在发出请求时出现
  • 使用 UCWA API 进行聊天机器人?

    UCWA 能否用于 Skype For Business 本地服务器上的企业聊天机器人应用程序 我找不到太多与此相关的文档 使用 UCWA 实现聊天机器人绝对是可能的 但您必须经历一些挑战 这主要是为了让 UCWA 模拟的 App 始终在线
  • jQuery 手风琴展开所有 div

    当页面加载或事件发生时是否可以展开所有组件 谢谢 只需使用这个 accordion ui accordion content show
  • Base 64 编码有何用途?

    我时常听到人们谈论 base 64 编码 它是干什么用的 当您想要通过网络传输一些二进制数据时 通常不会仅通过以原始格式在网络上传输位和字节来实现 为什么 因为有些媒体是为流文本而设计的 你永远不知道 某些协议可能会将你的二进制数据解释为控
  • Vue 组件和 AJAX 加载 HTML 内容

    我有一个 Vue 组件 它基本上是复杂 HTML 标记的简写 初始加载时 一切正常 我正在使用 AJAX 将更多这些组件加载到页面上 问题是该组件在使用 AJAX 加载后 不想编译成 HTML 我只得到未渲染的 Vue 组件 如下所示
  • 在 asp.net webform 应用程序中选择启用 ajax 的 WCF 服务时有哪些优点和缺点?

    我刚刚经历了我的第一次ajax enabled WCF service在样本中asp net webform应用程序 如果我的网络应用程序中有 10 15 个页面 其中涉及add edit view and delete操作 是否有可能使它
  • UIPickerView 导致崩溃

    每当我尝试在应用程序中选择 UIPickerView 时 它就会崩溃 我已经实现了所有委托方法 但收到此错误 2013 01 15 13 57 56 176 tracker 16142 c07 Assertion failure in UI
  • 我应该如何编辑查询以提高性能,同时保留现有结构?

    我想提高查询的性能 如下所示 里面有一个索引isl ref and isl date字段 但由于我使用 gt 运算符并且使用 因此无法使用索引 1 1440 增加一分钟isl date场地 我应该如何编辑查询以提高性能 同时保留现有结构 S
  • 如何在 Eclipse LogCat 查看器中过滤掉标记名

    我有一个 Android 应用程序会 发送垃圾邮件 LogCat 我想删除它的 logcat 条目以使输出更具可读性 是否可以有一个过滤器来删除特定标记名称的 LogCat 条目 或者一种有效的搜索模式 是的 创建一个过滤器 其中 按日志标
  • Terraform /AWS aws_servicecatalog_portfolio

    我正在尝试通过 Terraform 部署服务目录 当我尝试通过代码部署服务目录产品时 Service catalog product resource aws servicecatalog product linuxDesktop name
  • ExtJS 7.3 中没有可用的 ext-locale 包

    由于某些奇怪的原因 我收到此错误 无法满足 ext locale 的要求 错误 以下内容 版本无法满足 ERR 应用程序 ext locale 否 匹配 ERR 无法解决包要求 根据官方说明 我将需求添加到了 app json classi
  • 如何通过 printf 打印二进制数[重复]

    这个问题在这里已经有答案了 可能的重复 有 printf 转换器可以以二进制格式打印吗 https stackoverflow com questions 111928 is there a printf converter to prin
  • 使用 Robospice 和 Retrofit 将图像上传到 Google appengine

    我正在尝试使用 Robospice 和 Retrofit 将图像上传到我的 Google appengine blobstore 我可以获取 GAE 提供的上传 URL 但是当我尝试将带有图像的 URL 作为 Multipart POST
  • 在 Uvicorn 中与多个工作线程一起使用多重处理(线程锁)

    我正在使用通过 uvicorn 提供的 FastAPI 构建一个 API 该 API 具有使用 python 多处理库的端点 端点为 CPU 密集型任务生成多个进程以并行执行它们 以下是高级代码逻辑概述 import multiproces