开源模型应用落地-FastAPI-助力模型交互-进阶篇-生命周期事件(一)
创始人
2024-11-13 00:11:16
0

一、前言

   FastAPI 的高级用法可以为开发人员带来许多好处。它能帮助实现更复杂的路由逻辑和参数处理,使应用程序能够处理各种不同的请求场景,提高应用程序的灵活性和可扩展性。

    在数据验证和转换方面,高级用法提供了更精细和准确的控制,确保输入数据的质量和安全性。它还能更高效地处理异步操作,提升应用程序的性能和响应速度,特别是在处理大量并发请求时优势明显。

    此外,高级用法还有助于更好地整合数据库操作、实现数据的持久化和查询优化,以及实现更严格的认证和授权机制,保护应用程序的敏感数据和功能。总之,掌握 FastAPI 的高级用法可以帮助开发人员构建出功能更强大、性能更卓越、安全可靠的 Web 应用程序。

    本篇学习FastAPI的生命周期事件,示例均在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(二)基础上进行扩展,建议有需要的老铁们,先去学习。


二、术语

2.1. Lifespan Events(生命周期事件)

    通过生命周期事件,可以更好地管理应用的整个生命周期中的资源和操作,确保资源的正确初始化和释放,提高应用的性能、可靠性和可维护性。

    Lifespan Events主要有以下作用:

  1. 资源初始化与释放:可以在应用启动时执行一些初始化操作,例如创建数据库连接池、加载共享的机器学习模型等需要在整个应用中使用且可在请求间共享的资源。在应用关闭时,执行清理和释放资源的操作,例如关闭数据库连接、释放内存或其他相关资源。
  2. 避免不必要的操作:如果某些资源的初始化成本较高(如加载大型模型),使用 Lifespan Events 可以避免在每次请求时都进行初始化,仅在应用启动后且接收请求之前执行一次。同时,也可以防止在一些不需要处理实际请求的情况下(如运行简单的自动化测试)进行不必要的资源加载,从而提高性能和效率。
  3. 分离启动和关闭逻辑:将与应用启动和关闭相关的逻辑集中在一个地方进行管理,使代码更加清晰和可维护。
     

三、前置条件

3.1. 创建虚拟环境&安装依赖

conda create -n fastapi_test python=3.10 conda activate fastapi_test pip install fastapi websockets uvicorn transformers==4.32.0 accelerate tiktoken einops transformers_stream_generator==0.0.4 scipy

3.2. 下载Qwen-1_8B-Chat模型

huggingface:

https://huggingface.co/Qwen/Qwen-1_8B-Chaticon-default.png?t=N7T8https://huggingface.co/Qwen/Qwen-1_8B-Chat

​魔搭:

魔搭社区汇聚各领域最先进的机器学习模型,提供模型探索体验、推理、训练、部署和应用的一站式服务。icon-default.png?t=N7T8https://modelscope.cn/models/qwen/Qwen-1_8B-Chat


四、技术实现

4.1. startup & shutdown event

# -*- coding: utf-8 -*- import traceback  from transformers import AutoTokenizer, AutoModelForCausalLM from transformers import GenerationConfig  import torch import uvicorn  from typing import Annotated from fastapi import (     Depends,     FastAPI,     WebSocket,     WebSocketException,     WebSocketDisconnect,     status, )   model_path = "E:/model/qwen-1_8b-chat"  class ConnectionManager:     def __init__(self):         self.active_connections: list[WebSocket] = []      async def connect(self, websocket: WebSocket):         await websocket.accept()         self.active_connections.append(websocket)      def disconnect(self, websocket: WebSocket):         self.active_connections.remove(websocket)      async def send_personal_message(self, message: str, websocket: WebSocket):         await websocket.send_text(message)      async def broadcast(self, message: str):         for connection in self.active_connections:             await connection.send_text(message)  manager = ConnectionManager()  app = FastAPI()  async def authenticate(     websocket: WebSocket,     userid: str,     secret: str, ):     if userid is None or secret is None:         raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)      print(f'userid: {userid},secret: {secret}')     if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:         return 'pass'     else:         return 'fail'  async def chat(query):     position = 0     try:         for response in model.chat_stream(tokenizer, query, history = None):             result = response[position:]             position = len(response)             yield result      except Exception:         traceback.print_exc()  @app.websocket("/ws") async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):     await manager.connect(websocket)     try:         while True:             text = await websocket.receive_text()              if 'fail' == permission:                 await manager.send_personal_message(                     f"authentication failed", websocket                 )             else:                 if text is not None and len(text) > 0:                     async for msg in chat(text):                         await manager.send_personal_message(msg, websocket)      except WebSocketDisconnect:         manager.disconnect(websocket)         print(f"Client #{userid} left the chat")         await manager.broadcast(f"Client #{userid} left the chat")   def loadTokenizer():     tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)     return tokenizer  def loadModel(config):     model = AutoModelForCausalLM.from_pretrained(model_path, device_map="cpu", trust_remote_code=True).eval()     model.generation_config = config     return model  @app.on_event("startup") async def startup_event():     global model,tokenizer     config = GenerationConfig.from_pretrained(model_path, trust_remote_code=True, top_p=0.9, temperature=0.45,repetition_penalty=1.1, do_sample=True, max_new_tokens=8192)     tokenizer = loadTokenizer()     model = loadModel(config)  @app.on_event("shutdown") def shutdown_event():     torch.cuda.empty_cache()  if __name__ == '__main__':     uvicorn.run(app, host='0.0.0.0',port=7777) 

调用结果:

用户输入:你好

模型输出:你好!有什么我能帮助你的吗?

说明:

  1. 在startup事件函数中加载模型资源
  2. 在shutdown时间函数中释放资源
  3. startup & shutdown event已过期,后面可能会被移除,建议使用lifespan event代替

4.2. lifespan event

import traceback from contextlib import asynccontextmanager  from transformers import AutoTokenizer, AutoModelForCausalLM from transformers import GenerationConfig  import torch import uvicorn  from typing import Annotated from fastapi import (     Depends,     FastAPI,     WebSocket,     WebSocketException,     WebSocketDisconnect,     status, )   model_path = "E:/model/qwen-1_8b-chat"  class ConnectionManager:     def __init__(self):         self.active_connections: list[WebSocket] = []      async def connect(self, websocket: WebSocket):         await websocket.accept()         self.active_connections.append(websocket)      def disconnect(self, websocket: WebSocket):         self.active_connections.remove(websocket)      async def send_personal_message(self, message: str, websocket: WebSocket):         await websocket.send_text(message)      async def broadcast(self, message: str):         for connection in self.active_connections:             await connection.send_text(message)  manager = ConnectionManager()   def loadTokenizer():     tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)     return tokenizer   def loadModel(config):     model = AutoModelForCausalLM.from_pretrained(model_path, device_map="cpu", trust_remote_code=True).eval()     model.generation_config = config     return model   @asynccontextmanager async def lifespan(app: FastAPI):     # 加载模型     global model, tokenizer     config = GenerationConfig.from_pretrained(model_path, trust_remote_code=True, top_p=0.9, temperature=0.45,                                               repetition_penalty=1.1, do_sample=True, max_new_tokens=8192)     tokenizer = loadTokenizer()     model = loadModel(config)     yield     # 释放资源     torch.cuda.empty_cache()    app = FastAPI(lifespan=lifespan)  async def authenticate(     websocket: WebSocket,     userid: str,     secret: str, ):     if userid is None or secret is None:         raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)      print(f'userid: {userid},secret: {secret}')     if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:         return 'pass'     else:         return 'fail'  async def chat(query):     position = 0     try:         for response in model.chat_stream(tokenizer, query, history = None):             result = response[position:]             position = len(response)             yield result      except Exception:         traceback.print_exc()  @app.websocket("/ws") async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):     await manager.connect(websocket)     try:         while True:             text = await websocket.receive_text()              if 'fail' == permission:                 await manager.send_personal_message(                     f"authentication failed", websocket                 )             else:                 if text is not None and len(text) > 0:                     async for msg in chat(text):                         await manager.send_personal_message(msg, websocket)      except WebSocketDisconnect:         manager.disconnect(websocket)         print(f"Client #{userid} left the chat")         await manager.broadcast(f"Client #{userid} left the chat")   if __name__ == '__main__':     uvicorn.run(app, host='0.0.0.0',port=7777) 

调用结果:

没有输出警告信息

用户输入:你好,广州有什么好玩的地方推荐?

模型输出:广州有很多值得一去的景点,比如白云山、长隆野生动物园、陈家祠、珠江夜游等。此外,你还可以去逛逛上下九步行街,品尝当地的美食,或者参观广州塔等高楼大厦。


五、附带说明

5.1. 测试界面

               Chat                   

WebSocket Chat



相关内容

热门资讯

ia攻略/牛牛房卡代理九酷大厅... 今 日消息,九酷大厅/随意玩房卡添加微信33549083 苹果今日发布了 iOS 16.1 正式版更...
我来教你/金花房卡专卖店新超圣... 新超圣房卡更多详情添加微:33549083、 2、在商城页面中选择房卡选项。 3、根据...
正规平台有哪些,牛牛房卡怎么获... 微信游戏中心:青鸟大厅房卡在哪里买打开微信,添加客服微信【88355042】,进入游戏中心或相关小程...
一分钟实测分享”热玩吧房卡怎么... 一分钟实测分享”热玩吧房卡怎么弄“牛牛房卡哪里有卖游戏中心打开微信,添加客服【113857776】,...
一分钟了解“如何购买金花房卡普... 悠悠大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房卡...
IA解析/斗牛房卡充值天道联盟... IA解析/斗牛房卡充值天道联盟/随意玩/房卡怎么买Sa9Ix苹果iPhone 17手机即将进入量产阶...
我来教你/牛牛充值房卡新竹大厅... 今 日消息,新竹大厅房卡添加微信33549083 苹果今日发布了 iOS 16.1 正式版更新,简单...
玩家攻略”王者大厅房卡“牛牛房... 玩家攻略”王者大厅房卡“牛牛房卡哪里有卖 微信牛牛房卡客服微信号微信游戏中心打开微信,添加客服【11...
推荐一款!牛牛房卡代理天蝎大厅... 微信游戏中心:天蝎大厅房卡在哪里买打开微信,添加客服微信【88355042】,进入游戏中心或相关小程...
重大通报,金花充值房卡星驰娱乐... 星驰娱乐是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:【3329006910】或QQ:332900...
科技实测!金花房卡出售新二号/... 您好!微信新二号大厅链接获取房卡可以通过以下几种方式购买: 1.微信渠道:(新二号)大厅介绍:咨询...
终于找到“微信斗牛房卡如何购买... 微信斗牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来...
重大通报,金花房卡是正规的新道... 今 日消息,新道游/新皇豪房卡添加微信33549083 苹果今日发布了 iOS 16.1 正式版更新...
一秒了解”百万牛房卡获取“哪里... 房卡获取是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:113857776许多玩家在游戏中会购买房卡...
我来教你/斗牛房间怎么创建的南... 南瓜大厅/新道游房卡更多详情添加微:33549083、 2、在商城页面中选择房卡选项。 ...
重大通报,牛牛房卡制作链接新众... 微信游戏中心:新众亿/皇豪互娱房卡在哪里买打开微信,添加客服微信【88355042】,进入游戏中心或...
ia实测“微信链接斗牛房卡多少... 斗牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:15984933许多玩家在游戏中会购买房卡来享受...
正版授权!金花房卡专卖店鲨鱼众... 您好!微信鲨鱼众娱大厅链接获取房卡可以通过以下几种方式购买: 1.微信渠道:(鲨鱼众娱)大厅介绍:...
实测分享”辣椒互娱房卡详细充值... 实测分享”辣椒互娱房卡详细充值“牛牛房卡批发平台游戏中心打开微信,添加客服【113857776】,进...
IA解析/牛牛房卡怎么获得天酷... 天酷大厅房卡更多详情添加微:33549083、 2、在商城页面中选择房卡选项。 3、根...