在 Uvicorn/FastAPI 内发出下游 Https 请求的正确方法是什么?

2024-04-25

我有一个 API 端点(FastAPI / Uvicorn)。除此之外,它还向另一个 API 请求信息。当我使用多个并发请求加载 API 时,我开始收到以下错误:

h11._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE

在正常环境下,我会利用request.session,但我知道它不是完全线程安全的。

因此,在 FastAPI 等框架内使用请求的正确方法是什么,其中多个线程将使用requests同时图书馆?


而不是使用requests,你可以使用httpx https://www.python-httpx.org,它提供了一个async API https://www.python-httpx.org/async/还有(httpxFastAPI 的文档中也建议 https://fastapi.tiangolo.com/advanced/async-tests/#httpx表演时async测试,以及最近的 FastAPI/Starlette替换了 HTTP 客户端TestClient from requests to httpx https://github.com/encode/starlette/commit/6765502c1e5a418c16bb379c54b8ea706b0444cf).

下面的示例基于中给出的示例httpx文档 https://www.python-httpx.org/async/#streaming-responses,演示如何使用该库发出异步 HTTP(s) 请求,然后将响应流式传输回客户端。这httpx.AsyncClient() https://www.python-httpx.org/async/#making-async-requests你可以用什么来代替requests.Session(),当向同一主机发出多个请求时,这非常有用,因为底层 TCP 连接将被重用,而不是为每个请求重新创建一个连接,从而显着提高性能。此外,它还允许您重复使用headers和其他设置(例如proxies and timeout),以及坚持cookies,跨请求。你产生一个Client并在每次需要时重复使用它。您可以使用await client.aclose() to 显式关闭客户端 https://www.python-httpx.org/async/#opening-and-closing-clients一旦你完成了它(你可以在一个shutdown event https://fastapi.tiangolo.com/advanced/events/#shutdown-event处理程序)。示例和更多详细信息也可以在以下位置找到这个答案 https://stackoverflow.com/a/74239367/17865804.

Example

from fastapi import FastAPI
import httpx
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse

client = httpx.AsyncClient()
app = FastAPI()

@app.on_event('shutdown')
async def shutdown_event():
    await client.aclose()

@app.get('/')
async def home():
    req = client.build_request('GET', 'https://www.example.com/')
    r = await client.send(req, stream=True)
    return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))

示例(已更新)

Since startup and shutdown现已弃用 https://fastapi.tiangolo.com/advanced/events/#alternative-events-deprecated(并且将来可能会被完全删除),您可以改为使用lifespan handler https://fastapi.tiangolo.com/advanced/events/#lifespan初始化httpx客户端,以及在关闭时关闭客户端实例,类似于中演示的内容这个答案 https://stackoverflow.com/a/76322910/17865804。 Starlette 特别提供了一个使用lifespan处理程序和httpx客户在其文档页面中。如中所述Starlette 的文档 https://www.starlette.io/lifespan/#lifespan-state:

The lifespan有这样的概念state,这是一本字典 可以用来共享对象之间的寿命,和 要求。

The state收到的请求是状态的浅表副本 在寿命处理程序上收到。

因此,可以使用端点内部访问添加到生命周期处理程序中状态的对象request.state。下面的示例使用流响应来与外部服务器通信,并将响应发送回客户端。看here https://www.python-httpx.org/async/#streaming-responses欲了解更多详细信息async响应流方法httpx (i.e., aiter_bytes(), aiter_text(), aiter_lines(), etc.).

如果您想使用media_type为了StreamingResponse,您可以使用原始响应中的响应,如下所示:media_type=r.headers['content-type']。然而,正如描述的这个答案 https://stackoverflow.com/a/75760884/17865804,您需要确保media_type未设置为text/plain;否则,内容将不会按预期在浏览器中传输,除非您禁用MIME 嗅探(查看链接的答案以获取更多详细信息和解决方案)。

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
import httpx


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialise the Client on startup and add it to the state
    async with httpx.AsyncClient() as client:
        yield {'client': client}
        # The Client closes on shutdown 


app = FastAPI(lifespan=lifespan)


@app.get('/')
async def home(request: Request):
    client = request.state.client
    req = client.build_request('GET', 'https://www.example.com')
    r = await client.send(req, stream=True)
    return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose)) 

如果出于任何原因您需要逐块读取内容在服务器端响应客户端之前,您可以执行以下操作:

@app.get('/')
async def home(request: Request):
    client = request.state.client
    req = client.build_request('GET', 'https://www.example.com')
    r = await client.send(req, stream=True)
    
    async def gen():
        async for chunk in r.aiter_raw():
            yield chunk
        await r.aclose()
        
    return StreamingResponse(gen())

如果您不想使用流式响应,而是have httpx为您阅读回复首先(这会将响应数据存储到服务器的 RAM;因此,您应该确保有足够的空间来容纳数据),您可以使用以下内容。请注意,使用r.json()仅适用于响应数据为 JSON 格式的情况;否则,你可以返回一个PlainTextResponse https://fastapi.tiangolo.com/advanced/custom-response/#plaintextresponse或自定义Response https://fastapi.tiangolo.com/advanced/custom-response/#response直接,如下所示。

from fastapi import Response
from fastapi.responses import PlainTextResponse

@app.get('/')
async def home(request: Request):
    client = request.state.client
    req = client.build_request('GET', 'https://www.example.com')
    r = await client.send(req)
    content_type = r.headers.get('content-type')
    
    if content_type == 'application/json':
        return r.json()
    elif content_type == 'text/plain':
        return PlainTextResponse(content=r.text)
    else:
        return Response(content=r.content) 

使用async API of httpx意味着你必须定义你的端点async def;否则,你将不得不使用标准同步API https://www.python-httpx.org/advanced/ (for def vs async def see 这个答案 https://stackoverflow.com/a/71517830/17865804),并且如中所述这个 github 讨论 https://github.com/encode/httpx/discussions/1633#discussioncomment-717658:

Yes. HTTPX 旨在线程安全,是的,一个 跨所有线程的客户端实例在以下方面会做得更好 连接池,而不是使用每个线程实例。

您还可以使用以下命令控制连接池大小limits的关键字参数Client (see 池限制配置 https://www.python-httpx.org/advanced/#pool-limit-configuration)。例如:

limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Uvicorn/FastAPI 内发出下游 Https 请求的正确方法是什么? 的相关文章

随机推荐