如何在 FastAPI 中进行多处理

2024-04-01

在提供 FastAPI 请求时,我需要对列表中的每个元素执行一项 CPU 密集型任务。我想在多个 CPU 核心上进行此处理。

在 FastAPI 中执行此操作的正确方法是什么?我可以使用标准吗multiprocessing模块?到目前为止我发现的所有教程/问题仅涵盖 I/O 密集型任务,例如 Web 请求。


async def终点

你可以使用循环.run_in_executor https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor with 进程池执行器 https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor在单独的进程中启动功能。

@app.post("/async-endpoint")
async def test_endpoint():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result

def终点

Since def端点是隐式运行 https://fastapi.tiangolo.com/async/?h=%20threadpool#path-operation-functions在单独的线程中,您可以使用模块的全部功能多重处理 https://docs.python.org/3/library/multiprocessing.html and 并发期货 https://docs.python.org/3/library/concurrent.futures.html。注意里面def功能,await可能无法使用。样品:

@app.post("/def-endpoint")
def test_endpoint():
    ...
    with multiprocessing.Pool(3) as p:
        result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
    ...
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
      results = executor.map(f, [1, 2, 3])

Note: 应该记住,随着请求数量的增加,在端点中创建进程池以及创建大量线程可能会导致响应速度减慢。


即时执行

在单独的进程中执行函数并立即等待结果的最简单、最本机的方法是使用循环.run_in_executor https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor with 进程池执行器 https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor.

如下例所示,可以在应用程序启动时创建池,并且不要忘记在应用程序退出时关闭。池中使用的进程数可以使用以下命令设置最大工人数 https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor ProcessPoolExecutor构造函数参数。如果max_workers is None或者没有给出,它将默认为机器上的处理器数量。

这种方法的缺点是请求处理程序(路径操作)在单独的进程中等待计算完成,而客户端连接保持打开状态。如果由于某种原因连接丢失,那么结果将无处返回。

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from contextlib import asynccontextmanager
from fastapi import FastAPI

from calc import cpu_bound_func


@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.executor = ProcessPoolExecutor()
    yield
    app.state.executor.shutdown()


app = FastAPI(lifespan=lifespan)


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


@app.get("/{param}")
async def handler(param: int):
    res = await run_in_process(cpu_bound_func, param)
    return {"result": res}

移至后台

通常,CPU 密集型任务在后台执行。 FastAPI提供了运行的能力后台任务 https://fastapi.tiangolo.com/tutorial/background-tasks/待运行after返回一个响应,您可以在其中启动并异步等待 CPU 密集型任务的结果。

例如,在这种情况下,您可以立即返回以下响应"Accepted"(HTTP 代码 202)和一个独特的任务ID,在后台继续计算,客户端稍后可以使用此请求任务的状态ID.

BackgroundTasks提供一些功能,特别是,您可以运行其中的几个功能(包括依赖项)。并且在其中您可以使用依赖项中获得的资源,只有当所有任务完成时才会清理这些资源,而在出现异常时也可以正确处理它们。在此可以更清楚地看到这一点diagram https://fastapi.tiangolo.com/tutorial/dependencies/dependencies-with-yield/#dependencies-with-yield-and-httpexception.

下面是执行最小任务跟踪的示例。假定应用程序正在运行一个实例。

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from contextlib import asynccontextmanager
from http import HTTPStatus

from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field

from calc import cpu_bound_func


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, param: int) -> None:
    jobs[uid].result = await run_in_process(cpu_bound_func, param)
    jobs[uid].status = "complete"


@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
    return new_task


@app.get("/status/{uid}")
async def status_handler(uid: UUID):
    return jobs[uid]


@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.executor = ProcessPoolExecutor()
    yield
    app.state.executor.shutdown()

更强大的解决方案

上面的所有示例都非常简单,但是如果您需要一些更强大的系统来进行繁重的分布式计算,那么您可以考虑消息代理RabbitMQ, Kafka, NATS等等。以及像 Celery 这样使用它们的库。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 FastAPI 中进行多处理 的相关文章

随机推荐