在 FastAPI 端点中进行并发.futures.ThreadPoolExecutor 调用是否危险?

2024-03-10

我有以下测试代码:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

我需要使用concurrent.futures.ThreadPoolExecutorFastAPI 端点中的部分代码。

我担心的是 API 调用数量和线程包含的影响。担心创建太多线程及其相关后果、主机挨饿、应用程序和/或主机崩溃。

对这种方法有什么想法或陷阱吗?


你应该使用HTTPX https://www.python-httpx.org/图书馆,其中提供了一个async API https://www.python-httpx.org/async/。如中所述这个答案 https://stackoverflow.com/a/73736138/17865804,你产生一个Client并在每次需要时重复使用它。使异步请求HTTPX https://www.python-httpx.org/async/#making-async-requests,你需要一个AsyncClient https://github.com/encode/httpx/blob/71ee50b27770b461a5d2aaba9fca1fbc261bede1/httpx/_client.py#L1297.

您还可以使用以下命令控制连接池的大小limits的关键字参数Client,它以httpx.Limits https://github.com/encode/httpx/blob/9e97d7d42963e408ef2949cc86515d6025dbec24/httpx/_config.py#L276。例如:

limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.AsyncClient(limits=limits)

您可以根据您的需要调整以上内容。根据文档池限制配置 https://www.python-httpx.org/advanced/#pool-limit-configuration:

  • max_keepalive_connections,允许的保持活动连接数,或None总是允许。 (默认值20)
  • max_connections,允许的最大连接数,或None没有任何限制。 (默认100)
  • keepalive_expiry,空闲保持活动连接的时间限制(以秒为单位),或者None没有任何限制。 (默认5)

如果你愿意调整超时时间 https://www.python-httpx.org/advanced/#timeout-configuration同样,您可以使用timeout参数来设置单个请求或一个请求的超时Client/AsyncClient实例,这会导致给定的超时被用作此客户端发出的请求的默认超时(请参阅实现Timeout https://github.com/encode/httpx/blob/9baf3a6cd2fa9ebeb17dba5a3e5c6e9e0af83a96/httpx/_config.py#L189类也是如此)。您可以详细指定超时行为;例如,设置readtimeout 参数将指定等待接收数据块(即响应正文的块)的最大持续时间。如果HTTPX在此时间范围内无法接收数据,ReadTimeout引发异常。如果设置为None而不是一些正数值,不会有timeout on read。默认为 5 秒timeout关于所有操作。

您可以使用await client.aclose() to 显式关闭AsyncClient https://www.python-httpx.org/async/#opening-and-closing-clients当你完成它时(这可以在一个关闭事件 https://fastapi.tiangolo.com/advanced/events/#shutdown-event例如,处理程序)。

To 运行多个异步操作— 由于您需要请求五个不同的 URL,因此当您的 API 端点被调用时 — 您可以使用等待的 asyncio.gather() https://docs.python.org/3/library/asyncio-task.html#asyncio.gather。它将执行async操作并以相同的顺序返回结果列表可等待的事情 https://docs.python.org/3/library/asyncio-task.html#awaitables (tasks)被传递给该函数。

工作示例:

from fastapi import FastAPI
import httpx
import asyncio

URLS = ['https://www.foxnews.com/',
        'https://edition.cnn.com/',
        'https://www.nbcnews.com/',
        'https://www.bbc.co.uk/',
        'https://www.reuters.com/']
        
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
timeout = httpx.Timeout(5.0, read=15.0) # 15s timeout on read. 5s timeout elsewhere.
client = httpx.AsyncClient(limits=limits, timeout=timeout)
app = FastAPI()

@app.on_event('shutdown')
async def shutdown_event():
    await client.aclose()

async def send(url, client):
    return await client.get(url)

@app.get('/')
async def main():
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    # for demo purposes, return only the first 50 chars of each response
    return [r.text[:50] for r in responses]

如果你愿意避免将整个响应正文读入 RAM,你可以使用流式响应 https://www.python-httpx.org/async/#streaming-responses,如中所述这个答案 https://stackoverflow.com/a/73770074/17865804并演示如下:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import httpx
import asyncio

URLS = ['https://www.foxnews.com/',
        'https://edition.cnn.com/',
        'https://www.nbcnews.com/',
        'https://www.bbc.co.uk/',
        'https://www.reuters.com/']
        
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
timeout = httpx.Timeout(5.0, read=15.0) # 15s timeout on read. 5s timeout elsewhere.
client = httpx.AsyncClient(limits=limits, timeout=timeout)
app = FastAPI()

@app.on_event('shutdown')
async def shutdown_event():
    await client.aclose()

async def send(url, client):
    req = client.build_request('GET', url)
    return await client.send(req, stream=True)

async def iter_content(responses):
     for r in responses:
        async for chunk in r.aiter_text():
            # for demo purposes, return only the first 50 chars of each response
            yield chunk[:50]
            yield '\n\n'
            break
        await r.aclose()
        
@app.get('/')
async def main():
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    return StreamingResponse(iter_content(responses), media_type='text/event-stream')
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 FastAPI 端点中进行并发.futures.ThreadPoolExecutor 调用是否危险? 的相关文章

随机推荐