而不是使用requests
,你可以使用httpx https://www.python-httpx.org,它提供了一个async API https://www.python-httpx.org/async/还有(httpxFastAPI 的文档中也建议 https://fastapi.tiangolo.com/advanced/async-tests/#httpx表演时async
测试,以及最近的 FastAPI/Starlette替换了 HTTP 客户端TestClient from requests to httpx https://github.com/encode/starlette/commit/6765502c1e5a418c16bb379c54b8ea706b0444cf).
下面的示例基于中给出的示例httpx文档 https://www.python-httpx.org/async/#streaming-responses,演示如何使用该库发出异步 HTTP(s) 请求,然后将响应流式传输回客户端。这httpx.AsyncClient() https://www.python-httpx.org/async/#making-async-requests你可以用什么来代替requests.Session()
,当向同一主机发出多个请求时,这非常有用,因为底层 TCP 连接将被重用,而不是为每个请求重新创建一个连接,从而显着提高性能。此外,它还允许您重复使用headers
和其他设置(例如proxies
and timeout
),以及坚持cookies
,跨请求。你产生一个Client
并在每次需要时重复使用它。您可以使用await client.aclose()
to 显式关闭客户端 https://www.python-httpx.org/async/#opening-and-closing-clients一旦你完成了它(你可以在一个shutdown event https://fastapi.tiangolo.com/advanced/events/#shutdown-event处理程序)。示例和更多详细信息也可以在以下位置找到这个答案 https://stackoverflow.com/a/74239367/17865804.
Example
from fastapi import FastAPI
import httpx
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse
client = httpx.AsyncClient()
app = FastAPI()
@app.on_event('shutdown')
async def shutdown_event():
await client.aclose()
@app.get('/')
async def home():
req = client.build_request('GET', 'https://www.example.com/')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
示例(已更新)
Since startup and shutdown现已弃用 https://fastapi.tiangolo.com/advanced/events/#alternative-events-deprecated(并且将来可能会被完全删除),您可以改为使用lifespan handler https://fastapi.tiangolo.com/advanced/events/#lifespan初始化httpx
客户端,以及在关闭时关闭客户端实例,类似于中演示的内容这个答案 https://stackoverflow.com/a/76322910/17865804。 Starlette 特别提供了一个使用lifespan
处理程序和httpx
客户在其文档页面中。如中所述Starlette 的文档 https://www.starlette.io/lifespan/#lifespan-state:
The lifespan
有这样的概念state
,这是一本字典
可以用来共享对象之间的寿命,和
要求。
The state
收到的请求是状态的浅表副本
在寿命处理程序上收到。
因此,可以使用端点内部访问添加到生命周期处理程序中状态的对象request.state
。下面的示例使用流响应来与外部服务器通信,并将响应发送回客户端。看here https://www.python-httpx.org/async/#streaming-responses欲了解更多详细信息async
响应流方法httpx
(i.e., aiter_bytes()
, aiter_text()
, aiter_lines()
, etc.).
如果您想使用media_type
为了StreamingResponse
,您可以使用原始响应中的响应,如下所示:media_type=r.headers['content-type']
。然而,正如描述的这个答案 https://stackoverflow.com/a/75760884/17865804,您需要确保media_type
未设置为text/plain
;否则,内容将不会按预期在浏览器中传输,除非您禁用MIME 嗅探(查看链接的答案以获取更多详细信息和解决方案)。
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
import httpx
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialise the Client on startup and add it to the state
async with httpx.AsyncClient() as client:
yield {'client': client}
# The Client closes on shutdown
app = FastAPI(lifespan=lifespan)
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
如果出于任何原因您需要逐块读取内容在服务器端响应客户端之前,您可以执行以下操作:
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
async def gen():
async for chunk in r.aiter_raw():
yield chunk
await r.aclose()
return StreamingResponse(gen())
如果您不想使用流式响应,而是have httpx
为您阅读回复首先(这会将响应数据存储到服务器的 RAM;因此,您应该确保有足够的空间来容纳数据),您可以使用以下内容。请注意,使用r.json()
仅适用于响应数据为 JSON 格式的情况;否则,你可以返回一个PlainTextResponse https://fastapi.tiangolo.com/advanced/custom-response/#plaintextresponse或自定义Response https://fastapi.tiangolo.com/advanced/custom-response/#response直接,如下所示。
from fastapi import Response
from fastapi.responses import PlainTextResponse
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req)
content_type = r.headers.get('content-type')
if content_type == 'application/json':
return r.json()
elif content_type == 'text/plain':
return PlainTextResponse(content=r.text)
else:
return Response(content=r.content)
使用async
API of httpx
意味着你必须定义你的端点async def
;否则,你将不得不使用标准同步API https://www.python-httpx.org/advanced/ (for def
vs async def
see 这个答案 https://stackoverflow.com/a/71517830/17865804),并且如中所述这个 github 讨论 https://github.com/encode/httpx/discussions/1633#discussioncomment-717658:
Yes. HTTPX
旨在线程安全,是的,一个
跨所有线程的客户端实例在以下方面会做得更好
连接池,而不是使用每个线程实例。
您还可以使用以下命令控制连接池大小limits
的关键字参数Client
(see 池限制配置 https://www.python-httpx.org/advanced/#pool-limit-configuration)。例如:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)