如何使用ML模型和FastAPI处理多个用户的请求?

2024-03-29

我正在研究通过FastAPI分发人工智能模块的过程。

我创建了一个 FastAPI 应用程序,它使用预先学习的机器学习模型来回答问题。

在这种情况下,一个用户使用是没有问题的,但是当多个用户同时使用时,响应可能会太慢。

那么,当多个用户输入问题时,有没有办法复制模型并一次性加载呢?

class sentencebert_ai():
    def __init__(self) -> None:
        super().__init__()

 def ask_query(self,query, topN):
        startt = time.time()

        ask_result = []
        score = []
        result_value = []  
        embedder = torch.load(model_path)
        corpus_embeddings = embedder.encode(corpus, convert_to_tensor=True)
        query_embedding = embedder.encode(query, convert_to_tensor=True)
        cos_scores = util.pytorch_cos_sim(query_embedding, corpus_embeddings)[0] #torch.Size([121])121개의 말뭉치에 대한 코사인 유사도 값이다.
        cos_scores = cos_scores.cpu()

        top_results = np.argpartition(-cos_scores, range(topN))[0:topN]

        for idx in top_results[0:topN]:        
            ask_result.append(corpusid[idx].item())
            #.item()으로 접근하는 이유는 tensor(5)에서 해당 숫자에 접근하기 위한 방식이다.
            score.append(round(cos_scores[idx].item(),3))

        #서버에 json array 형태로 내보내기 위한 작업
        for i,e in zip(ask_result,score):
            result_value.append({"pred_id":i,"pred_weight":e})
        endd = time.time()
        print('시간체크',endd-startt)
        return result_value
        # return ','.join(str(e) for e in ask_result),','.join(str(e) for e in score)



class Item_inference(BaseModel):
    text : str
    topN : Optional[int] = 1

@app.post("/retrieval", tags=["knowledge recommendation"])
async def Knowledge_recommendation(item: Item_inference):
  
    # db.append(item.dict())
    item.dict()
    results = _ai.ask_query(item.text, item.topN)

    return results


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--port", default='9003', type=int)
    # parser.add_argument("--mode", default='cpu', type=str, help='cpu for CPU mode, gpu for GPU mode')
    args = parser.parse_args()

    _ai = sentencebert_ai()
    uvicorn.run(app, host="0.0.0.0", port=args.port,workers=4)

修正版本

@app.post("/aaa") def your_endpoint(request: Request, item:Item_inference): start = time.time() model = request.app.state.model item.dict() #커널 실행시 필요 _ai = sentencebert_ai() results = _ai.ask_query(item.text, item.topN,model) end = time.time() print(end-start) return results ``` 

首先,您不应该在每次请求到达时都加载模型,而应该在启动时加载一次(您可以使用启动事件 https://fastapi.tiangolo.com/advanced/events/#startup-event为此)和将其存储在应用程序实例上 https://www.starlette.io/applications/#accessing-the-app-instance——使用通用的app.state属性(参见实现State https://github.com/encode/starlette/blob/212fa46b23be0701a5963cdeff14f05ed352e22a/starlette/datastructures.py#L674也)——您可以稍后检索,如所述here https://stackoverflow.com/a/71537393/17865804 and here https://stackoverflow.com/a/71298949/17865804。例如:

from fastapi import Request

@app.on_event("startup")
async def startup_event():
    app.state.model = torch.load('<model_path>')

其次,如果你没有任何async您必须在端点内执行的函数await,你可以定义你的端点def代替async def。这样,FastAPI 将并发处理请求,因为每个请求将在单独的线程中运行;然而,async def端点在主线程上运行,即服务器顺序处理请求,只要没有await调用此类路由内的某些 CPU/IO 绑定(阻塞)操作。如果是这样,则关键字await会将函数控制传递回事件循环,从而允许事件循环中的其他任务/请求运行。请看一下答案here https://stackoverflow.com/a/71188190/17865804 and here https://stackoverflow.com/a/71517830/17865804以及其中包含的所有参考文献,以理解async/await,以及使用之间的区别def and async def。示例为def端点:

@app.post('/')
def your_endpoint(request: Request):
    model = request.app.state.model
    # run your synchronous ask_query() function here

或者,如上所述here https://stackoverflow.com/a/71517830/17865804,您最好可以在单独的进程中运行 CPU 密集型任务,使用ProcessPoolExecutor https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor,并与asyncio, 为了await它完成工作并返回结果 - 在这种情况下,您需要使用以下命令定义端点async def,作为await关键字仅在async功能。请注意,重要的是保护代码的主循环以避免子进程的递归生成 https://stackoverflow.com/a/45302590, ETC。;也就是说,您的代码必须位于if __name__ == '__main__' https://stackoverflow.com/questions/419163/what-does-if-name-main-do。例子:

from fastapi import FastAPI, Request
import concurrent.futures
import asyncio
import uvicorn

class MyAIClass():
    def __init__(self) -> None:
        super().__init__()

    def ask_query(self, model, query, topN):
        # ...
 
ai = MyAIClass()
app = FastAPI()

@app.on_event("startup")
async def startup_event():
    app.state.model = torch.load('<model_path>')

@app.post('/')
async def your_endpoint(request: Request):
    model = request.app.state.model

    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, ai.ask_query, model, item.text, item.topN)


if __name__ == '__main__':
    uvicorn.run(app)

Note如果你打算拥有若干工人 https://fastapi.tiangolo.com/deployment/server-workers/同时活跃,每个工人都有自己的记忆 https://fastapi.tiangolo.com/deployment/concepts/#memory-per-process换句话说,工作人员不共享相同的内存,因此每个工作人员都会将自己的 ML 模型实例加载到内存 (RAM) 中。例如,如果您的应用程序使用四个工作线程,则模型将被加载四次到 RAM 中。因此,如果模型以及代码中的其他变量消耗大量内存,每个进程/工人将消耗等量的内存。如果您想避免这种情况,您可以看看如何在多个工作人员之间共享对象 https://stackoverflow.com/questions/65686318/sharing-python-objects-across-multiple-workers,以及 - 如果您正在使用Gunicorn 作为 Uvicorn 工人的流程经理 https://fastapi.tiangolo.com/deployment/server-workers/#gunicorn-with-uvicorn-workers——你可以使用 Gunicorn 的--preload https://docs.gunicorn.org/en/stable/settings.html#preload-app旗帜。根据文档:

命令行: --preload

Default: False

在分叉工作进程之前加载应用程序代码。

通过预加载应用程序,您还可以节省一些 RAM 资源 加快服务器启动时间。不过,如果您推迟申请 加载到每个工作进程,您可以重新加载您的应用程序代码 通过重新启动工人即可轻松完成。

Example:

gunicorn --workers 4 --preload --worker-class=uvicorn.workers.UvicornWorker app:app

Note你不能结合 Gunicorn 的--preload https://docs.gunicorn.org/en/stable/settings.html#preload-app with --reload https://docs.gunicorn.org/en/stable/settings.html#reload标志,因为当代码预加载到主进程中时,如果您的应用程序代码发生更改,新的工作进程(将自动创建)仍将在内存中保留旧代码,因为fork() https://en.wikipedia.org/wiki/Fork_(system_call) works.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用ML模型和FastAPI处理多个用户的请求? 的相关文章

随机推荐