As per FastAPI 的文档 https://fastapi.tiangolo.com/async/#path-operation-functions:
当你用普通声明一个路径操作函数时def
反而
的async def
,它在外部线程池中运行就是那么await
ed,而不是直接调用(因为它会阻止
服务器)。
另外,正如所描述的here https://fastapi.tiangolo.com/async/#concurrency-and-async-await:
如果您使用的是与以下对象通信的第三方库
某些东西(数据库、API、文件系统等)并且没有
支持使用await
,(目前大多数情况都是这样
数据库库),然后将路径操作函数声明为
通常,只需def
.
如果您的应用程序(以某种方式)不需要与
其他任何事情并等待它响应,使用async def
.
如果您只是不知道,请使用普通def
.
Note: 可以混用def
and async def
在您的路径操作功能中,您需要尽可能多的功能,并使用最佳的方式定义每个功能
为您提供的选择。 FastAPI 会用它们做正确的事情。
无论如何,在上述任何一种情况下,FastAPI仍然会工作
异步地并且速度极快。
但按照上述步骤,它可以做一些
性能优化。
Thus, def
端点(在异步编程的上下文中,仅使用定义的函数def
叫做同步函数),在 FastAPI 中,在与外部线程池分开的线程中运行,然后await
ed,因此 FastAPI 仍然可以工作异步地。换句话说,服务器将处理对此类端点的请求同时。然而,async def
端点运行在event loop https://docs.python.org/3/library/asyncio-eventloop.html- 在主(单)线程上 - 也就是说,服务器还将处理对此类端点的请求同时/异步地, 只要有 an await https://stackoverflow.com/questions/38865050/is-await-in-python3-cooperative-multitasking调用此类内部的非阻塞 I/O 绑定操作async def
端点/路线,例如waiting对于(1)客户端要通过网络发送的数据,(2)要读取磁盘中文件的内容,(3)要完成的数据库操作等,(看一下here https://fastapi.tiangolo.com/async/#asynchronous-code)。但是,如果端点定义为async def
才不是await
对于内部的某些内容,为了让出事件循环中其他任务运行的时间(例如,对相同或其他端点的请求、后台任务等),对此类端点的每个请求都必须完全完成(即退出端点),然后将控制权返回到事件循环并允许其他任务运行。也就是说,在这种情况下,服务器会处理请求依次地. Note同样的概念不仅适用于 FastAPI 端点,还适用于StreamingResponse的生成函数 https://stackoverflow.com/a/75760884/17865804 (see StreamingResponse https://github.com/encode/starlette/blob/31164e346b9bd1ce17d968e1301c3bb2c23bb418/starlette/responses.py#L235类实现),以及Background Tasks https://fastapi.tiangolo.com/tutorial/background-tasks/ (see BackgroundTask https://github.com/encode/starlette/blob/33f46a13625bcca4b7520e33be299a23b2e2b26c/starlette/background.py#L15类实现);因此,读完这个答案后,您应该能够决定是否应该定义 FastAPI 端点,StreamingResponse
的生成器,或后台任务函数def
or async def
.
关键词await
(仅适用于async def
函数)将函数控制权传递回event loop
。换句话说,它暂停了周围的执行协程 https://docs.python.org/3/library/asyncio-task.html#coroutines(即,协程对象是调用的结果async def
函数),并告诉event loop
让其他东西运行,直到那await
ed 任务完成。Note这只是因为你可以定义一个自定义函数async def
进而await
它在你的里面async def
端点,并不意味着您的代码将异步工作,例如,如果该自定义函数包含对time.sleep()
、CPU 密集型任务、非异步 I/O 库或任何其他与异步 Python 代码不兼容的阻塞调用。例如,在 FastAPI 中,当使用async
的方法UploadFile https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile, 例如await file.read()
and await file.write()
,FastAPI/Starlette,在幕后,实际上运行这样的文件对象的方法 https://docs.python.org/3/tutorial/inputoutput.html#methods-of-file-objects在外部线程池中(使用async
run_in_threadpool() https://github.com/encode/starlette/blob/b8ea367b4304a98653ec8ce9c794ad0ba6dcaf4b/starlette/concurrency.py#L35函数)和await
是它;否则,此类方法/操作将阻止event loop
。您可以通过查看来了解更多信息的实施UploadFile class https://github.com/encode/starlette/blob/048643adc21e75b668567fc6bcdd3650b89044ea/starlette/datastructures.py#L426.
Note that async
并不意味着parallel, but 同时。异步代码async and await很多时候总结为使用协程 https://fastapi.tiangolo.com/async/#coroutines. 协程是协作的(或协作多任务 https://en.wikipedia.org/wiki/Cooperative_multitasking),这意味着“在任何给定时间,带有协程的程序都在运行only它的协程之一,并且这个正在运行的协程仅当它明确请求暂停时才暂停其执行”(参见here https://stackoverflow.com/questions/553704/what-is-a-coroutine and here https://stackoverflow.com/questions/1934715/difference-between-a-coroutine-and-a-thread有关协程的更多信息)。如中所述本文 https://jwodder.github.io/kbits/posts/pyasync-fundam/:
具体来说,每当执行当前正在运行的协程时
达到await
表达式,协程可能会被挂起,并且
另一个先前挂起的协程可能会恢复执行,如果它发生了什么
被暂停,此后返回了一个值。暂停还可以
发生当async for
块请求下一个值
异步迭代器或当async with
输入块或
退出,因为这些操作使用await
在引擎盖下。
但是,如果在某个阻塞 I/O 密集型或 CPU 密集型操作中直接执行/调用async def
函数/端点,它会阻塞主线程(因此,event loop
)。因此,阻塞操作例如time.sleep()
in an async def
端点会阻止整个服务器(如您问题中提供的代码示例所示)。因此,如果您的端点不会做出任何async
调用,你可以声明它def
相反,它将在外部线程池中运行,然后await
ed,如前所述(更多解决方案将在以下部分中给出)。例子:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
否则,如果您必须在端点内执行的函数是async
你必须执行的功能await
,你应该定义你的端点async def
。为了演示这一点,下面的示例使用asyncio.sleep() https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep函数(从asyncio https://docs.python.org/3/library/asyncio.html库),它提供非阻塞睡眠操作。这await asyncio.sleep()
方法将暂停周围协程的执行(直到睡眠操作完成),从而允许事件循环中的其他任务运行。给出了类似的例子here https://docs.python.org/3/library/asyncio-task.html#coroutine and here https://stackoverflow.com/a/56730924以及。
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
Both上面的端点将以您问题中提到的相同顺序将指定的消息打印到屏幕上(如果两个请求大约在同一时间到达),即:
Hello
Hello
bye
bye
重要的提示
当您第二次(第三次等)呼叫您的端点时,请记住从与浏览器主会话隔离的选项卡;否则,后续请求(即第一个请求之后的请求)将被浏览器阻止(在客户端),因为浏览器在发送下一个请求之前将等待服务器对上一个请求的响应。您可以通过使用来确认print(request.client)
在端点内,您会看到hostname
and port
所有传入请求的编号相同(如果请求是从同一浏览器窗口/会话中打开的选项卡发起的),因此,这些请求将按顺序处理,因为浏览器首先按顺序发送它们。到solve这个,你可以:
-
重新加载同一选项卡(与正在运行的选项卡相同),或者
-
在隐身窗口中打开新选项卡,或者
-
使用不同的浏览器/客户端发送请求,或者
-
Use the httpx
图书馆到发出异步 HTTP 请求 https://www.python-httpx.org/async/#making-async-requests, 随着等待的 https://docs.python.org/3/library/asyncio-task.html#awaitables asyncio.gather() https://docs.python.org/3/library/asyncio-task.html#asyncio.gather,它允许同时执行多个异步操作,然后返回结果列表same将可等待(任务)传递给该函数的顺序(看看这个答案 https://stackoverflow.com/a/74239367/17865804更多细节)。
Example:
import httpx
import asyncio
URLS = ['http://127.0.0.1:8000/ping'] * 2
async def send(url, client):
return await client.get(url, timeout=10)
async def main():
async with httpx.AsyncClient() as client:
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
print(*[r.json() for r in responses], sep='\n')
asyncio.run(main())
如果您必须调用可能需要不同时间来处理请求的不同端点,并且您希望在从服务器返回响应后立即在客户端打印响应,而不是等待asyncio.gather()
收集所有任务的结果并按照任务传递到的相同顺序打印出来send()
函数——你可以替换send()
上面例子的功能如下所示:
async def send(url, client):
res = await client.get(url, timeout=10)
print(res.json())
return res
Async
/await
以及阻止 I/O 密集型或 CPU 密集型操作
如果您需要使用async def
(因为你可能需要await
对于端点内的协程),但也有一些同步阻塞 I/O 密集型或 CPU 密集型操作(长时间运行的计算任务),这将阻塞event loop
(本质上是整个服务器)并且不会让其他请求通过,例如:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this will block the event loop
finally:
await file.close()
print("bye")
return "pong"
then:
-
您应该检查是否可以将端点的定义更改为正常def
代替async def
。例如,如果端点中唯一需要等待的方法是读取文件内容的方法(正如您在下面的注释部分中提到的),您可以将端点参数的类型声明为bytes
(i.e., file: bytes = File()
),因此,FastAPI 会为您读取该文件,您将收到如下内容bytes
。因此,不需要使用await file.read()
。请注意,上述方法适用于小文件,因为整个文件内容应存储到内存中(请参阅文档关于File参数 https://fastapi.tiangolo.com/tutorial/request-files/#define-file-parameters);因此,如果您的系统没有足够的 RAM 来容纳累积的数据(例如,如果您有 8GB RAM,则无法加载 50GB 文件),您的应用程序最终可能会崩溃。或者,您可以致电.read()
的方法SpooledTemporaryFile https://docs.python.org/3/library/tempfile.html#tempfile.SpooledTemporaryFile直接(可以通过.file
的属性UploadFile
对象),这样你就不必再await
the .read()
方法 - 现在您可以使用正常声明您的端点def
,每个请求都会运行在单独的线程(下面给出示例)。有关如何上传的更多详细信息File
,以及Starlette/FastAPI如何使用SpooledTemporaryFile
幕后花絮,请看这个答案 https://stackoverflow.com/a/70657621/17865804 and 这个答案 https://stackoverflow.com/a/70667530/17865804.
@app.post("/ping")
def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = file.file.read()
res = cpu_bound_task(contents)
finally:
file.file.close()
print("bye")
return "pong"
-
使用 FastAPI(Starlette)run_in_threadpool() https://github.com/encode/starlette/blob/b8ea367b4304a98653ec8ce9c794ad0ba6dcaf4b/starlette/concurrency.py#L35函数从concurrency
模块——正如@tiangolo建议的那样here https://github.com/tiangolo/fastapi/issues/1066#issuecomment-612940187—“将在单独的线程中运行该函数,以确保主线程(运行协程的地方)不会被阻塞”(参见here https://bocadilloproject.github.io/guide/async.html#common-patterns)。正如@tiangolo 所描述的here https://gitter.im/tiangolo/fastapi?at=5ce550f675d9a575a625feb7, "run_in_threadpool
是一个可等待函数,第一个参数是普通函数,接下来的参数直接传递给该函数。它支持序列参数和关键字参数”。
from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, contents)
-
或者,使用asyncio
's loop.run_in_executor() https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor——获得运行后event loop
using asyncio.get_running_loop() https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop—运行任务,在这种情况下,您可以await
让它完成并返回结果,然后再继续下一行代码。通过None
as the executor参数,将使用默认执行器;那是ThreadPoolExecutor https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor:
import asyncio
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)
或者,如果您愿意传递关键字参数 https://docs.python.org/3/library/asyncio-eventloop.html#asyncio-pass-keywords相反,你可以使用lambda
表达式(例如,lambda: cpu_bound_task(some_arg=contents)
),或者,优选地,functools.partial() https://docs.python.org/3/library/functools.html#functools.partial,文档中特别推荐了loop.run_in_executor() https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor:
import asyncio
from functools import partial
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
您还可以在自定义中运行您的任务ThreadPoolExecutor https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor。例如:
import asyncio
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
在 Python 3.9+ 中,您还可以使用asyncio.to_thread() https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread在单独的线程中异步运行同步函数,本质上是使用await loop.run_in_executor(None, func_call)
在引擎盖下,正如可以看到的实施asyncio.to_thread() https://github.com/python/cpython/blob/c5660ae96f2ab5732c68c301ce9a63009f432d93/Lib/asyncio/threads.py#L12. The to_thread()
function 接受要执行的阻塞函数的名称以及该函数的任何参数(*args 和/或 **kwargs),然后返回一个协程,该协程可以await
编辑。例子:
import asyncio
res = await asyncio.to_thread(cpu_bound_task, contents)
-
ThreadPoolExecutor
将成功阻止event loop
免遭封锁,但不会给你性能改进你会期望跑步并行代码;尤其是当一个人需要执行CPU-bound
操作,例如所描述的操作here https://fastapi.tiangolo.com/async/#is-concurrency-better-than-parallelism(例如,音频或图像处理、机器学习等)。因此最好是在单独的进程中运行 CPU 密集型任务—using ProcessPoolExecutor https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor,如下所示 - 您可以再次将其集成asyncio
, 为了await
它完成工作并返回结果。如上所述here https://stackoverflow.com/q/15900366,在 Windows 上,保护代码的主循环以避免递归生成子进程等非常重要。基本上,您的代码必须在if __name__ == '__main__': https://stackoverflow.com/questions/419163/what-does-if-name-main-do.
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
-
Use more workers https://fastapi.tiangolo.com/deployment/server-workers/。例如,uvicorn main:app --workers 4
(如果您正在使用Gunicorn 作为 Uvicorn 工人的流程经理 https://fastapi.tiangolo.com/deployment/server-workers/#gunicorn-with-uvicorn-workers,请看一下这个答案 https://stackoverflow.com/a/71613757/17865804). Note:每个工人“有自己的东西、变量和记忆” https://fastapi.tiangolo.com/deployment/concepts/#memory-per-process。这意味着global
变量/对象等不会在进程/工作人员之间共享。在这种情况下,您应该考虑使用数据库存储或键值存储(缓存),如下所述here https://stackoverflow.com/a/71537393/17865804 and here https://stackoverflow.com/a/65699375/17865804。另外,请注意“如果您的代码消耗了大量内存,每个过程将消耗等量的内存”.
-
如果您需要执行繁重的背景计算并且您不一定需要它由同一进程运行(例如,您不需要共享内存、变量等),您可能会受益于使用其他更大的工具,例如Celery https://docs.celeryq.dev/,如中所述FastAPI 的文档 https://fastapi.tiangolo.com/tutorial/background-tasks/#caveat.