我正在使用通过 uvicorn 提供的 FastAPI 构建一个 API。
该 API 具有使用 python 多处理库的端点。
端点为 CPU 密集型任务生成多个进程以并行执行它们。
以下是高级代码逻辑概述:
import multiprocessing as mp
class Compute:
def single_compute(self, single_comp_data):
# Computational Task CPU BOUND
global queue
queue.put(self.compute(single_comp_data))
def multi_compute(self, task_ids):
# Prepare for Compuation
output = {}
processes = []
global queue
queue = mp.Queue()
# Start Test Objs Computation
for tid in task_ids:
# Load task data here, to make use of object in memory cache
single_comp_data = self.load_data_from_cache(tid)
p = mp.Process(target=self.single_compute, args=single_comp_data)
p.start()
processes.append(p)
# Collect Parallel Computation
for p in processes:
result = queue.get()
output[result["tid"]]= result
p.join()
return output
下面是简单的 API 代码:
from fastapi import FastAPI, Response
import json
app = FastAPI()
#comp holds an in memory cache, thats why its created in global scope
comp = Compute()
@app.get("/compute")
def compute(task_ids):
result = comp.multi_compute(task_ids)
return Response(content=json.dumps(result, default=str), media_type="application/json")
当像这样与多个工作人员一起运行时:
uvicorn compute_api:app --host 0.0.0.0 --port 7000 --workers 2
我收到这个 python 错误
TypeError: can't pickle _thread.lock objects
只有 1 个工作进程就可以了。该程序在 UNIX/LINUX 操作系统上运行。
有人可以向我解释一下为什么这里的多个 uvicorn 进程不可能分叉一个新进程,以及为什么我会遇到这个锁?
最终应该实现的目标很简单:
uvicorn 进程会产生多个其他进程(子进程
通过 fork)以及该 uvicorn 进程的内存副本。执行CPU密集型任务。