async def
终点
你可以使用循环.run_in_executor https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor with 进程池执行器 https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor在单独的进程中启动功能。
@app.post("/async-endpoint")
async def test_endpoint():
loop = asyncio.get_event_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_func) # wait result
def
终点
Since def
端点是隐式运行 https://fastapi.tiangolo.com/async/?h=%20threadpool#path-operation-functions在单独的线程中,您可以使用模块的全部功能多重处理 https://docs.python.org/3/library/multiprocessing.html and 并发期货 https://docs.python.org/3/library/concurrent.futures.html。注意里面def
功能,await
可能无法使用。样品:
@app.post("/def-endpoint")
def test_endpoint():
...
with multiprocessing.Pool(3) as p:
result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
...
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(f, [1, 2, 3])
Note: 应该记住,随着请求数量的增加,在端点中创建进程池以及创建大量线程可能会导致响应速度减慢。
即时执行
在单独的进程中执行函数并立即等待结果的最简单、最本机的方法是使用循环.run_in_executor https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor with 进程池执行器 https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor.
如下例所示,可以在应用程序启动时创建池,并且不要忘记在应用程序退出时关闭。池中使用的进程数可以使用以下命令设置最大工人数 https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor ProcessPoolExecutor
构造函数参数。如果max_workers
is None
或者没有给出,它将默认为机器上的处理器数量。
这种方法的缺点是请求处理程序(路径操作)在单独的进程中等待计算完成,而客户端连接保持打开状态。如果由于某种原因连接丢失,那么结果将无处返回。
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from contextlib import asynccontextmanager
from fastapi import FastAPI
from calc import cpu_bound_func
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.executor = ProcessPoolExecutor()
yield
app.state.executor.shutdown()
app = FastAPI(lifespan=lifespan)
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
@app.get("/{param}")
async def handler(param: int):
res = await run_in_process(cpu_bound_func, param)
return {"result": res}
移至后台
通常,CPU 密集型任务在后台执行。 FastAPI提供了运行的能力后台任务 https://fastapi.tiangolo.com/tutorial/background-tasks/待运行after返回一个响应,您可以在其中启动并异步等待 CPU 密集型任务的结果。
例如,在这种情况下,您可以立即返回以下响应"Accepted"
(HTTP 代码 202)和一个独特的任务ID
,在后台继续计算,客户端稍后可以使用此请求任务的状态ID
.
BackgroundTasks
提供一些功能,特别是,您可以运行其中的几个功能(包括依赖项)。并且在其中您可以使用依赖项中获得的资源,只有当所有任务完成时才会清理这些资源,而在出现异常时也可以正确处理它们。在此可以更清楚地看到这一点diagram https://fastapi.tiangolo.com/tutorial/dependencies/dependencies-with-yield/#dependencies-with-yield-and-httpexception.
下面是执行最小任务跟踪的示例。假定应用程序正在运行一个实例。
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from contextlib import asynccontextmanager
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
from calc import cpu_bound_func
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {}
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
async def start_cpu_bound_task(uid: UUID, param: int) -> None:
jobs[uid].result = await run_in_process(cpu_bound_func, param)
jobs[uid].status = "complete"
@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
return new_task
@app.get("/status/{uid}")
async def status_handler(uid: UUID):
return jobs[uid]
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.executor = ProcessPoolExecutor()
yield
app.state.executor.shutdown()
更强大的解决方案
上面的所有示例都非常简单,但是如果您需要一些更强大的系统来进行繁重的分布式计算,那么您可以考虑消息代理RabbitMQ
, Kafka
, NATS
等等。以及像 Celery 这样使用它们的库。