fastapi_scoped_auth_loguru.py
· 5.4 KiB · Python
Неформатований
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "fastapi",
# "uvicorn",
# "pydantic",
# "loguru",
# ]
# ///
import uvicorn
from typing import Annotated, Dict, List, Optional
from fastapi import FastAPI, Depends, HTTPException, status, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, SecurityScopes
from pydantic import BaseModel
from loguru import logger
# -----------------------------------------------------------------------------
# 1. 基礎設定與 Mock 資料
# -----------------------------------------------------------------------------
# 假資料:使用者資料庫 (新增 scopes 欄位)
MOCK_USER_DB: Dict[str, Dict] = {
"admin-secret-token": {
"username": "admin_user",
"role": "admin",
# Admin 擁有所有權限
"scopes": ["users:read", "users:write", "system:status"]
},
"guest-access-token": {
"username": "guest_user",
"role": "guest",
# Guest 只有讀取權限
"scopes": ["users:read"]
}
}
# -----------------------------------------------------------------------------
# 2. 資料模型 (Pydantic Models)
# -----------------------------------------------------------------------------
class UserProfile(BaseModel):
"""使用者資料模型,包含權限列表"""
username: str
role: str
scopes: List[str] = []
class SystemStatus(BaseModel):
status: str
version: str
# -----------------------------------------------------------------------------
# 3. 核心邏輯類別 (Service Class)
# -----------------------------------------------------------------------------
class AuthService:
"""
處理認證 (Authentication) 與授權 (Authorization) 的服務。
"""
def __init__(self):
self.security_scheme = HTTPBearer(auto_error=True)
def _get_user_by_token(self, token: str) -> Optional[Dict]:
"""從 Mock DB 撈取使用者"""
return MOCK_USER_DB.get(token)
def _verify_scopes(self, user_scopes: List[str], required_scopes: List[str]) -> None:
"""
檢查使用者是否具備 API 所需的權限範圍。
Args:
user_scopes: 使用者擁有的權限
required_scopes: API 端點要求的權限
Raises:
HTTPException: 權限不足時拋出 403
"""
# 使用 set 操作來檢查是否包含所有必要權限
# 如果 required_scopes 中的任何一個不在 user_scopes 裡,則拒絕
for scope in required_scopes:
if scope not in user_scopes:
logger.warning(f"權限不足: 缺少 scope '{scope}'")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="權限不足 (Not enough permissions)",
headers={"WWW-Authenticate": f'Bearer scope="{scope}"'},
)
def __call__(
self,
creds: Annotated[HTTPAuthorizationCredentials, Depends(HTTPBearer())],
security_scopes: SecurityScopes
) -> UserProfile:
"""
FastAPI 依賴注入入口。
同時接收 Token 與該路由設定的 scopes。
"""
token = creds.credentials
required_scopes = security_scopes.scopes
# 1. 驗證 Token 是否存在
user_data = self._get_user_by_token(token)
if not user_data:
logger.error(f"登入失敗: 無效的 Token - {token[:8]}...")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="無效的憑證",
headers={"WWW-Authenticate": "Bearer"},
)
# 2. 驗證權限範圍 (如果有要求 Scope)
if required_scopes:
logger.debug(f"驗證權限: 需要 {required_scopes}, 擁有 {user_data['scopes']}")
self._verify_scopes(user_data["scopes"], required_scopes)
logger.success(f"認證通過: {user_data['username']}")
return UserProfile(**user_data)
# -----------------------------------------------------------------------------
# 4. FastAPI 應用程式與路由
# -----------------------------------------------------------------------------
# 初始化服務
auth_service = AuthService()
app = FastAPI(title="FastAPI Scopes & Loguru Example")
@app.get(
"/users/me",
response_model=UserProfile,
# 使用 Security() 來注入依賴並指定需要的 scopes
dependencies=[Security(auth_service, scopes=["users:read"])]
)
def read_own_profile(current_user: Annotated[UserProfile, Security(auth_service, scopes=["users:read"])]):
"""
一般使用者路由。
需要 scope: `users:read`
"""
return current_user
@app.get(
"/admin/system",
response_model=SystemStatus,
# 這是一個受保護的管理員路由
dependencies=[Security(auth_service, scopes=["system:status"])]
)
def read_system_status():
"""
管理員專用路由。
需要 scope: `system:status`
(Guest Token 嘗試存取此處會失敗)
"""
logger.info("管理員正在存取系統狀態...")
return SystemStatus(status="Online", version="2.0.0")
# -----------------------------------------------------------------------------
# 5. 程式進入點
# -----------------------------------------------------------------------------
if __name__ == "__main__":
logger.info("啟動服務中 | http://127.0.0.1:8000")
uvicorn.run(app, host="127.0.0.1", port=8000)
| 1 | # /// script |
| 2 | # requires-python = ">=3.12" |
| 3 | # dependencies = [ |
| 4 | # "fastapi", |
| 5 | # "uvicorn", |
| 6 | # "pydantic", |
| 7 | # "loguru", |
| 8 | # ] |
| 9 | # /// |
| 10 | |
| 11 | import uvicorn |
| 12 | from typing import Annotated, Dict, List, Optional |
| 13 | from fastapi import FastAPI, Depends, HTTPException, status, Security |
| 14 | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, SecurityScopes |
| 15 | from pydantic import BaseModel |
| 16 | from loguru import logger |
| 17 | |
| 18 | # ----------------------------------------------------------------------------- |
| 19 | # 1. 基礎設定與 Mock 資料 |
| 20 | # ----------------------------------------------------------------------------- |
| 21 | |
| 22 | # 假資料:使用者資料庫 (新增 scopes 欄位) |
| 23 | MOCK_USER_DB: Dict[str, Dict] = { |
| 24 | "admin-secret-token": { |
| 25 | "username": "admin_user", |
| 26 | "role": "admin", |
| 27 | # Admin 擁有所有權限 |
| 28 | "scopes": ["users:read", "users:write", "system:status"] |
| 29 | }, |
| 30 | "guest-access-token": { |
| 31 | "username": "guest_user", |
| 32 | "role": "guest", |
| 33 | # Guest 只有讀取權限 |
| 34 | "scopes": ["users:read"] |
| 35 | } |
| 36 | } |
| 37 | |
| 38 | # ----------------------------------------------------------------------------- |
| 39 | # 2. 資料模型 (Pydantic Models) |
| 40 | # ----------------------------------------------------------------------------- |
| 41 | |
| 42 | class UserProfile(BaseModel): |
| 43 | """使用者資料模型,包含權限列表""" |
| 44 | username: str |
| 45 | role: str |
| 46 | scopes: List[str] = [] |
| 47 | |
| 48 | class SystemStatus(BaseModel): |
| 49 | status: str |
| 50 | version: str |
| 51 | |
| 52 | # ----------------------------------------------------------------------------- |
| 53 | # 3. 核心邏輯類別 (Service Class) |
| 54 | # ----------------------------------------------------------------------------- |
| 55 | |
| 56 | class AuthService: |
| 57 | """ |
| 58 | 處理認證 (Authentication) 與授權 (Authorization) 的服務。 |
| 59 | """ |
| 60 | |
| 61 | def __init__(self): |
| 62 | self.security_scheme = HTTPBearer(auto_error=True) |
| 63 | |
| 64 | def _get_user_by_token(self, token: str) -> Optional[Dict]: |
| 65 | """從 Mock DB 撈取使用者""" |
| 66 | return MOCK_USER_DB.get(token) |
| 67 | |
| 68 | def _verify_scopes(self, user_scopes: List[str], required_scopes: List[str]) -> None: |
| 69 | """ |
| 70 | 檢查使用者是否具備 API 所需的權限範圍。 |
| 71 | |
| 72 | Args: |
| 73 | user_scopes: 使用者擁有的權限 |
| 74 | required_scopes: API 端點要求的權限 |
| 75 | |
| 76 | Raises: |
| 77 | HTTPException: 權限不足時拋出 403 |
| 78 | """ |
| 79 | # 使用 set 操作來檢查是否包含所有必要權限 |
| 80 | # 如果 required_scopes 中的任何一個不在 user_scopes 裡,則拒絕 |
| 81 | for scope in required_scopes: |
| 82 | if scope not in user_scopes: |
| 83 | logger.warning(f"權限不足: 缺少 scope '{scope}'") |
| 84 | raise HTTPException( |
| 85 | status_code=status.HTTP_401_UNAUTHORIZED, |
| 86 | detail="權限不足 (Not enough permissions)", |
| 87 | headers={"WWW-Authenticate": f'Bearer scope="{scope}"'}, |
| 88 | ) |
| 89 | |
| 90 | def __call__( |
| 91 | self, |
| 92 | creds: Annotated[HTTPAuthorizationCredentials, Depends(HTTPBearer())], |
| 93 | security_scopes: SecurityScopes |
| 94 | ) -> UserProfile: |
| 95 | """ |
| 96 | FastAPI 依賴注入入口。 |
| 97 | 同時接收 Token 與該路由設定的 scopes。 |
| 98 | """ |
| 99 | token = creds.credentials |
| 100 | required_scopes = security_scopes.scopes |
| 101 | |
| 102 | # 1. 驗證 Token 是否存在 |
| 103 | user_data = self._get_user_by_token(token) |
| 104 | if not user_data: |
| 105 | logger.error(f"登入失敗: 無效的 Token - {token[:8]}...") |
| 106 | raise HTTPException( |
| 107 | status_code=status.HTTP_401_UNAUTHORIZED, |
| 108 | detail="無效的憑證", |
| 109 | headers={"WWW-Authenticate": "Bearer"}, |
| 110 | ) |
| 111 | |
| 112 | # 2. 驗證權限範圍 (如果有要求 Scope) |
| 113 | if required_scopes: |
| 114 | logger.debug(f"驗證權限: 需要 {required_scopes}, 擁有 {user_data['scopes']}") |
| 115 | self._verify_scopes(user_data["scopes"], required_scopes) |
| 116 | |
| 117 | logger.success(f"認證通過: {user_data['username']}") |
| 118 | return UserProfile(**user_data) |
| 119 | |
| 120 | # ----------------------------------------------------------------------------- |
| 121 | # 4. FastAPI 應用程式與路由 |
| 122 | # ----------------------------------------------------------------------------- |
| 123 | |
| 124 | # 初始化服務 |
| 125 | auth_service = AuthService() |
| 126 | |
| 127 | app = FastAPI(title="FastAPI Scopes & Loguru Example") |
| 128 | |
| 129 | @app.get( |
| 130 | "/users/me", |
| 131 | response_model=UserProfile, |
| 132 | # 使用 Security() 來注入依賴並指定需要的 scopes |
| 133 | dependencies=[Security(auth_service, scopes=["users:read"])] |
| 134 | ) |
| 135 | def read_own_profile(current_user: Annotated[UserProfile, Security(auth_service, scopes=["users:read"])]): |
| 136 | """ |
| 137 | 一般使用者路由。 |
| 138 | 需要 scope: `users:read` |
| 139 | """ |
| 140 | return current_user |
| 141 | |
| 142 | @app.get( |
| 143 | "/admin/system", |
| 144 | response_model=SystemStatus, |
| 145 | # 這是一個受保護的管理員路由 |
| 146 | dependencies=[Security(auth_service, scopes=["system:status"])] |
| 147 | ) |
| 148 | def read_system_status(): |
| 149 | """ |
| 150 | 管理員專用路由。 |
| 151 | 需要 scope: `system:status` |
| 152 | (Guest Token 嘗試存取此處會失敗) |
| 153 | """ |
| 154 | logger.info("管理員正在存取系統狀態...") |
| 155 | return SystemStatus(status="Online", version="2.0.0") |
| 156 | |
| 157 | # ----------------------------------------------------------------------------- |
| 158 | # 5. 程式進入點 |
| 159 | # ----------------------------------------------------------------------------- |
| 160 | |
| 161 | if __name__ == "__main__": |
| 162 | logger.info("啟動服務中 | http://127.0.0.1:8000") |
| 163 | uvicorn.run(app, host="127.0.0.1", port=8000) |