Última actividad 1 week ago

此腳本展示如何在 FastAPI 中同時實作 Authentication(Token 驗證)與 Authorization(Scope 權限)並使用 loguru 進行結構化日誌。 使用 class‑based 依賴注入,支援 users:read 與 system:status 兩種權限範圍。 提供兩個端點:/users/me(一般使用者)與 /admin/system(管理員),可直接以 uvicorn 啟動測試。

fastapi_scoped_auth_loguru.py Sin formato
1# /// script
2# requires-python = ">=3.12"
3# dependencies = [
4# "fastapi",
5# "uvicorn",
6# "pydantic",
7# "loguru",
8# ]
9# ///
10
11import uvicorn
12from typing import Annotated, Dict, List, Optional
13from fastapi import FastAPI, Depends, HTTPException, status, Security
14from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, SecurityScopes
15from pydantic import BaseModel
16from loguru import logger
17
18# -----------------------------------------------------------------------------
19# 1. 基礎設定與 Mock 資料
20# -----------------------------------------------------------------------------
21
22# 假資料:使用者資料庫 (新增 scopes 欄位)
23MOCK_USER_DB: Dict[str, Dict] = {
24 "admin-secret-token": {
25 "username": "admin_user",
26 "role": "admin",
27 # Admin 擁有所有權限
28 "scopes": ["users:read", "users:write", "system:status"]
29 },
30 "guest-access-token": {
31 "username": "guest_user",
32 "role": "guest",
33 # Guest 只有讀取權限
34 "scopes": ["users:read"]
35 }
36}
37
38# -----------------------------------------------------------------------------
39# 2. 資料模型 (Pydantic Models)
40# -----------------------------------------------------------------------------
41
42class UserProfile(BaseModel):
43 """使用者資料模型,包含權限列表"""
44 username: str
45 role: str
46 scopes: List[str] = []
47
48class SystemStatus(BaseModel):
49 status: str
50 version: str
51
52# -----------------------------------------------------------------------------
53# 3. 核心邏輯類別 (Service Class)
54# -----------------------------------------------------------------------------
55
56class AuthService:
57 """
58 處理認證 (Authentication) 與授權 (Authorization) 的服務。
59 """
60
61 def __init__(self):
62 self.security_scheme = HTTPBearer(auto_error=True)
63
64 def _get_user_by_token(self, token: str) -> Optional[Dict]:
65 """從 Mock DB 撈取使用者"""
66 return MOCK_USER_DB.get(token)
67
68 def _verify_scopes(self, user_scopes: List[str], required_scopes: List[str]) -> None:
69 """
70 檢查使用者是否具備 API 所需的權限範圍。
71
72 Args:
73 user_scopes: 使用者擁有的權限
74 required_scopes: API 端點要求的權限
75
76 Raises:
77 HTTPException: 權限不足時拋出 403
78 """
79 # 使用 set 操作來檢查是否包含所有必要權限
80 # 如果 required_scopes 中的任何一個不在 user_scopes 裡,則拒絕
81 for scope in required_scopes:
82 if scope not in user_scopes:
83 logger.warning(f"權限不足: 缺少 scope '{scope}'")
84 raise HTTPException(
85 status_code=status.HTTP_401_UNAUTHORIZED,
86 detail="權限不足 (Not enough permissions)",
87 headers={"WWW-Authenticate": f'Bearer scope="{scope}"'},
88 )
89
90 def __call__(
91 self,
92 creds: Annotated[HTTPAuthorizationCredentials, Depends(HTTPBearer())],
93 security_scopes: SecurityScopes
94 ) -> UserProfile:
95 """
96 FastAPI 依賴注入入口。
97 同時接收 Token 與該路由設定的 scopes。
98 """
99 token = creds.credentials
100 required_scopes = security_scopes.scopes
101
102 # 1. 驗證 Token 是否存在
103 user_data = self._get_user_by_token(token)
104 if not user_data:
105 logger.error(f"登入失敗: 無效的 Token - {token[:8]}...")
106 raise HTTPException(
107 status_code=status.HTTP_401_UNAUTHORIZED,
108 detail="無效的憑證",
109 headers={"WWW-Authenticate": "Bearer"},
110 )
111
112 # 2. 驗證權限範圍 (如果有要求 Scope)
113 if required_scopes:
114 logger.debug(f"驗證權限: 需要 {required_scopes}, 擁有 {user_data['scopes']}")
115 self._verify_scopes(user_data["scopes"], required_scopes)
116
117 logger.success(f"認證通過: {user_data['username']}")
118 return UserProfile(**user_data)
119
120# -----------------------------------------------------------------------------
121# 4. FastAPI 應用程式與路由
122# -----------------------------------------------------------------------------
123
124# 初始化服務
125auth_service = AuthService()
126
127app = FastAPI(title="FastAPI Scopes & Loguru Example")
128
129@app.get(
130 "/users/me",
131 response_model=UserProfile,
132 # 使用 Security() 來注入依賴並指定需要的 scopes
133 dependencies=[Security(auth_service, scopes=["users:read"])]
134)
135def read_own_profile(current_user: Annotated[UserProfile, Security(auth_service, scopes=["users:read"])]):
136 """
137 一般使用者路由。
138 需要 scope: `users:read`
139 """
140 return current_user
141
142@app.get(
143 "/admin/system",
144 response_model=SystemStatus,
145 # 這是一個受保護的管理員路由
146 dependencies=[Security(auth_service, scopes=["system:status"])]
147)
148def read_system_status():
149 """
150 管理員專用路由。
151 需要 scope: `system:status`
152 (Guest Token 嘗試存取此處會失敗)
153 """
154 logger.info("管理員正在存取系統狀態...")
155 return SystemStatus(status="Online", version="2.0.0")
156
157# -----------------------------------------------------------------------------
158# 5. 程式進入點
159# -----------------------------------------------------------------------------
160
161if __name__ == "__main__":
162 logger.info("啟動服務中 | http://127.0.0.1:8000")
163 uvicorn.run(app, host="127.0.0.1", port=8000)