FastAPI 结合 JWT
创始人
2024-09-26 00:22:49
0

文章目录

  • FastAPI 结合 JWT
  • 步骤
  • 安装
  • 步骤
    • 导入必要的模块
    • 设置配置和初始化应用
    • 创建数据模型
    • 实现辅助函数
      • 生成 JWT Token
      • 获取用户数据
      • 验证密码
      • 获取当前用户
    • 用户登录获取 Token
    • 受保护的路由示例
  • 所有代码
  • 测试
    • 获取 Token
    • 访问受保护的路由
      • token正确
      • token错误
  • 总结
  • 注意

FastAPI 结合 JWT

JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被篡改。JWT 通常用于认证和授权流程。

步骤

在 FastAPI 中,JWT 主要用于保护 API 路由,使其只允许经过身份验证的用户访问。认证流程大致如下:

  1. **用户登录:**用户通过提交用户名和密码获取 JWT。
  2. **获取 Token:**服务器验证用户凭据后,生成并返回 JWT 给用户。
  3. **访问受保护的路由:**用户在访问受保护的路由时,需要在请求头中携带该 JWT。
  4. **Token 验证:**服务器验证 JWT 的合法性和有效性,允许或拒绝访问受保护资源。

安装

pip install fastapi uvicorn pyjwt  

步骤

导入必要的模块

from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from datetime import datetime, timedelta, timezone import jwt  

设置配置和初始化应用

SECRET_KEY = "your_secret_key"  # 用于签名 JWT 的密钥 ALGORITHM = "HS256"             # 加密算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 过期时间  app = FastAPI()  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")  

创建数据模型

class Token(BaseModel):     access_token: str     token_type: str  class TokenData(BaseModel):     username: str | None = None  class User(BaseModel):     username: str     email: str | None = None     full_name: str | None = None     disabled: bool | None = None  class UserInDB(User):     hashed_password: str  

实现辅助函数

生成 JWT Token

def create_access_token(data: dict, expires_delta: timedelta | None = None):     to_encode = data.copy()     if expires_delta:         expire = datetime.now(timezone.utc) + expires_delta     else:         expire = datetime.now(timezone.utc) + timedelta(minutes=15)     to_encode.update({"exp": expire})     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)     return encoded_jwt  

获取用户数据

fake_users_db = {     "testuser": {         "username": "testuser",         "full_name": "Test User",         "email": "testuser@example.com",         "hashed_password": "fakehashedpassword",         "disabled": False,     } }  def get_user(db, username: str):     if username in db:         user_dict = db[username]         return UserInDB(**user_dict)  

验证密码

def verify_password(plain_password, hashed_password):     return plain_password == hashed_password  

获取当前用户

async def get_current_user(token: str = Depends(oauth2_scheme)):     credentials_exception = HTTPException(         status_code=status.HTTP_401_UNAUTHORIZED,         detail="Could not validate credentials",         headers={"WWW-Authenticate": "Bearer"},     )     try:         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])         username: str = payload.get("sub")         if username is None:             raise credentials_exception         token_data = TokenData(username=username)     except jwt.PyJWTError:         raise credentials_exception     user = get_user(fake_users_db, username=token_data.username)     if user is None:         raise credentials_exception     return user  async def get_current_active_user(current_user: User = Depends(get_current_user)):     if current_user.disabled:         raise HTTPException(status_code=400, detail="Inactive user")     return current_user  

用户登录获取 Token

@app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):     user = get_user(fake_users_db, form_data.username)     if not user or not verify_password(form_data.password, user.hashed_password):         raise HTTPException(             status_code=status.HTTP_401_UNAUTHORIZED,             detail="Incorrect username or password",             headers={"WWW-Authenticate": "Bearer"},         )     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)     access_token = create_access_token(         data={"sub": user.username}, expires_delta=access_token_expires     )     return {"access_token": access_token, "token_type": "bearer"}  

受保护的路由示例

@app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)):     return current_user  

所有代码

from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from datetime import datetime, timedelta, timezone import uvicorn import jwt import os  # JWT 相关配置 SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3"  # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中) ALGORITHM = "HS256"  # 使用的加密算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 的有效时间,以分钟为单位  app = FastAPI()  # 创建 FastAPI 应用实例  # OAuth2PasswordBearer 实例,用于依赖项 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")  # 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息 fake_users_db = {     "testuser": {         "username": "testuser",         "full_name": "Test User",         "email": "testuser@example.com",         "hashed_password": "fakehashedpassword",  # 在实际应用中,存储经过哈希处理的密码         "disabled": False,  # 用户是否被禁用     } }  # Pydantic 模型,用于定义请求和响应的数据结构 class Token(BaseModel):     access_token: str  # Token 字符串     token_type: str  # Token 类型(一般为 "bearer")  class TokenData(BaseModel):     username: str | None = None  # 从 Token 中提取的用户名  class User(BaseModel):     username: str  # 用户名     email: str | None = None  # 邮箱地址,可选     full_name: str | None = None  # 用户全名,可选     disabled: bool | None = None  # 用户是否被禁用,可选  class UserInDB(User):     hashed_password: str  # 存储在数据库中的哈希密码  # 生成 JWT Token 的函数 def create_access_token(data: dict, expires_delta: timedelta | None = None):     """     生成 JWT Token。      参数:     - data (dict): 要编码到 JWT 中的数据。     - expires_delta (timedelta, 可选): Token 的过期时间。      返回:     - str: 编码后的 JWT 字符串。     """     to_encode = data.copy()  # 创建副本,以避免修改原始数据     if expires_delta:         expire = datetime.now(timezone.utc) + expires_delta     else:         expire = datetime.now(timezone.utc) + timedelta(minutes=15)     to_encode.update({"exp": expire})  # 添加过期时间到 JWT 数据中     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # 生成 JWT     return encoded_jwt  # 从假数据库获取用户信息 def get_user(db, username: str):     """     从数据库中获取用户信息。      参数:     - db (dict): 用户数据库(在本例中为假数据)。     - username (str): 用户名。      返回:     - UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。     """     if username in db:         user_dict = db[username]         return UserInDB(**user_dict)  # 验证用户密码 def verify_password(plain_password, hashed_password):     """     验证用户密码。      参数:     - plain_password (str): 用户输入的明文密码。     - hashed_password (str): 存储在数据库中的哈希密码。      返回:     - bool: 密码匹配返回 True,否则返回 False。     """     return plain_password == hashed_password  # 在实际应用中,这里应该使用哈希函数进行比较  # 验证 Token 并获取当前用户 async def get_current_user(token: str = Depends(oauth2_scheme)):     """     从 JWT Token 中提取用户信息,并验证 Token 的合法性。      参数:     - token (str): JWT Token。      返回:     - User: 返回当前用户信息。      抛出:     - HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。     """     credentials_exception = HTTPException(         status_code=status.HTTP_401_UNAUTHORIZED,         detail="Could not validate credentials",         headers={"WWW-Authenticate": "Bearer"},     )     try:         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])  # 解码 JWT         username: str = payload.get("sub")  # 获取 JWT 中的用户名         if username is None:             raise credentials_exception         token_data = TokenData(username=username)     except jwt.PyJWTError:         raise credentials_exception     user = get_user(fake_users_db, username=token_data.username)  # 获取用户信息     if user is None:         raise credentials_exception     return user  # 验证用户是否被禁用 async def get_current_active_user(current_user: User = Depends(get_current_user)):     """     验证当前用户是否被禁用。      参数:     - current_user (User): 当前用户信息。      返回:     - User: 如果用户未被禁用,返回用户信息。      抛出:     - HTTPException: 当用户被禁用时抛出 400 错误。     """     if current_user.disabled:         raise HTTPException(status_code=400, detail="Inactive user")     return current_user  # 用户登录获取 Token @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):     """     用户登录接口,用于获取 JWT Token。      参数:     - form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。      返回:     - dict: 包含 access_token 和 token_type 的响应数据。      抛出:     - HTTPException: 当用户名或密码错误时抛出 401 错误。     """     user = get_user(fake_users_db, form_data.username)     if not user or not verify_password(form_data.password, user.hashed_password):         raise HTTPException(             status_code=status.HTTP_401_UNAUTHORIZED,             detail="Incorrect username or password",             headers={"WWW-Authenticate": "Bearer"},         )     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # 设置 Token 过期时间     access_token = create_access_token(         data={"sub": user.username}, expires_delta=access_token_expires     )     return {"access_token": access_token, "token_type": "bearer"}  # 受保护的路由示例 @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)):     """     获取当前用户信息的受保护路由。      参数:     - current_user (User): 当前登录的用户信息(通过 JWT 验证)。      返回:     - User: 返回当前用户的信息。     """     return current_user  # 运行应用 if __name__ == "__main__":     uvicorn.run(         f"{os.path.basename(__file__).split('.')[0]}:app",         host="127.0.0.1",         port=8000,         reload=True,  # 启用自动重载     )  

测试

获取 Token

在这里插入图片描述

访问受保护的路由

token正确

在这里插入图片描述

token错误

在这里插入图片描述

总结

JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。

在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的合法性后,允许用户访问受保护的资源。

实现 JWT 认证的步骤包括安装必要的依赖项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。

  • JWT 配置:包括密钥(SECRET_KEY)和加密算法(ALGORITHM),用于生成和验证令牌。
  • 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处理认证逻辑。
  • API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只允许携带有效 JWT 的请求访问。

使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有经过身份验证的用户才能访问受保护的 API 路由。

注意

示例中密码并没有经过hash加密实际应用中要加密的

相关内容

热门资讯

安卓系统app更新软件,And... 亲爱的手机控们,你们有没有发现,最近你的手机里那些熟悉的APP们,好像都悄悄地换上了新装呢?没错,安...
手机怎么安双卡安卓系统,轻松实... 你有没有想过,拥有一部可以同时使用两张SIM卡的手机是多么的方便呢?想象一张卡用来工作,另一张卡用来...
安卓系统卸载软件api,功能与... 手机里的软件越来越多,是不是感觉内存都要不够用了?别急,今天就来给你揭秘安卓系统卸载软件的神秘面纱,...
miui操作系统和安卓系统,深... 亲爱的手机控们,今天咱们来聊聊一个让无数米粉心动的系统——MIUI操作系统,还有那个它背后的老大哥—...
原生安卓系统使用教学,原生安卓... 哇,你手里拿的那部手机,是不是也觉得它有点儿特别呢?它可能没有那些花里胡哨的界面,但它却有着自己独特...
安卓系统玩咸鱼之王,三国名将助... 你有没有发现,最近安卓系统上的游戏圈里,有一款叫做《咸鱼之王》的游戏火得一塌糊涂?没错,就是那个让你...
鸿蒙1.0系统是安卓系统吗,揭... 你有没有听说最近华为的鸿蒙1.0系统?是不是有点好奇,这鸿蒙1.0系统是不是安卓系统的“亲戚”呢?别...
优盘安卓系统用桃,U盘安装An... 你有没有想过,你的电脑也能变身成安卓手机?没错,就是那种可以安装各种APP、玩游戏的安卓手机!这可不...
怎样使用安卓8系统,安卓8系统... 你有没有想过,你的安卓手机其实是个小智能助手,只要你会使用,它能帮你做很多事情呢!今天,就让我来带你...
鼎威安卓系统版本,性能升级与用... 你有没有发现,现在车机系统越来越智能了?这不,鼎威的安卓系统版本就让我眼前一亮。想象坐在车里,手指轻...
安卓系统安装抢红包,轻松成为抢... 亲爱的手机控们,是不是每次微信群里抢红包都感觉手慢无?别急,今天我要给你揭秘如何在安卓系统上轻松安装...
写ios系统和安卓系统的人,揭... 你有没有想过,那些默默无闻的程序员们,他们是如何创造出我们每天离不开的iOS系统和安卓系统呢?想象他...
安卓系统设计尺寸规范,适配与优... 亲爱的设计师们,你是否在为安卓系统的设计尺寸规范而头疼?别担心,今天我要带你一起探索这个神秘的领域,...
旧主机改安卓系统,安卓系统改造... 亲爱的读者们,你是否有过这样的经历:家里的旧主机闲置在角落,看着它那略显过时的外观,心里不禁感叹:“...
安卓系统里有趣的,尽在掌握 探索安卓乐园:那些让你笑出声的趣味游戏 开篇:手机里的欢乐小天地想象你手握一部安卓手机,屏幕上跳动...
法兰规格查询系统安卓,安卓版功... 你有没有想过,在繁忙的工程现场,如何快速找到合适的法兰规格呢?别急,今天就来给你揭秘一个神器——法兰...
目前安卓系统最高配置,极致性能... 你有没有发现,现在的手机越来越厉害了,就像是科幻电影里的高科技产品一样。今天,咱们就来聊聊这个话题:...
安卓修改系统返回键,个性化设置... 你有没有发现,手机里的那个小小的返回键,有时候就像是个顽皮的小家伙,让你摸不着头脑?别急,今天就来教...
安卓订餐系统教程视频,从设计到... 你是不是也和我一样,每天忙碌的生活中,最期待的就是那一顿美味的午餐或晚餐呢?现在,有了安卓订餐系统,...
安卓系统限制外部软件,探索外部... 亲爱的手机控们,你是否曾遇到过这样的烦恼:明明打开了“未知来源”,却还是无法安装那些心仪的外部软件?...