Naposledy aktivní 1 week ago

此腳本展示如何在 FastAPI 中同時實作 Authentication(Token 驗證)與 Authorization(Scope 權限)並使用 loguru 進行結構化日誌。 使用 class‑based 依賴注入,支援 users:read 與 system:status 兩種權限範圍。 提供兩個端點:/users/me(一般使用者)與 /admin/system(管理員),可直接以 uvicorn 啟動測試。

timmy revidoval tento gist 1 week ago. Přejít na revizi

1 file changed, 163 insertions

fastapi_scoped_auth_loguru.py(vytvořil soubor)

@@ -0,0 +1,163 @@
1 + # /// script
2 + # requires-python = ">=3.12"
3 + # dependencies = [
4 + # "fastapi",
5 + # "uvicorn",
6 + # "pydantic",
7 + # "loguru",
8 + # ]
9 + # ///
10 +
11 + import uvicorn
12 + from typing import Annotated, Dict, List, Optional
13 + from fastapi import FastAPI, Depends, HTTPException, status, Security
14 + from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, SecurityScopes
15 + from pydantic import BaseModel
16 + from loguru import logger
17 +
18 + # -----------------------------------------------------------------------------
19 + # 1. 基礎設定與 Mock 資料
20 + # -----------------------------------------------------------------------------
21 +
22 + # 假資料:使用者資料庫 (新增 scopes 欄位)
23 + MOCK_USER_DB: Dict[str, Dict] = {
24 + "admin-secret-token": {
25 + "username": "admin_user",
26 + "role": "admin",
27 + # Admin 擁有所有權限
28 + "scopes": ["users:read", "users:write", "system:status"]
29 + },
30 + "guest-access-token": {
31 + "username": "guest_user",
32 + "role": "guest",
33 + # Guest 只有讀取權限
34 + "scopes": ["users:read"]
35 + }
36 + }
37 +
38 + # -----------------------------------------------------------------------------
39 + # 2. 資料模型 (Pydantic Models)
40 + # -----------------------------------------------------------------------------
41 +
42 + class UserProfile(BaseModel):
43 + """使用者資料模型,包含權限列表"""
44 + username: str
45 + role: str
46 + scopes: List[str] = []
47 +
48 + class SystemStatus(BaseModel):
49 + status: str
50 + version: str
51 +
52 + # -----------------------------------------------------------------------------
53 + # 3. 核心邏輯類別 (Service Class)
54 + # -----------------------------------------------------------------------------
55 +
56 + class AuthService:
57 + """
58 + 處理認證 (Authentication) 與授權 (Authorization) 的服務。
59 + """
60 +
61 + def __init__(self):
62 + self.security_scheme = HTTPBearer(auto_error=True)
63 +
64 + def _get_user_by_token(self, token: str) -> Optional[Dict]:
65 + """從 Mock DB 撈取使用者"""
66 + return MOCK_USER_DB.get(token)
67 +
68 + def _verify_scopes(self, user_scopes: List[str], required_scopes: List[str]) -> None:
69 + """
70 + 檢查使用者是否具備 API 所需的權限範圍。
71 +
72 + Args:
73 + user_scopes: 使用者擁有的權限
74 + required_scopes: API 端點要求的權限
75 +
76 + Raises:
77 + HTTPException: 權限不足時拋出 403
78 + """
79 + # 使用 set 操作來檢查是否包含所有必要權限
80 + # 如果 required_scopes 中的任何一個不在 user_scopes 裡,則拒絕
81 + for scope in required_scopes:
82 + if scope not in user_scopes:
83 + logger.warning(f"權限不足: 缺少 scope '{scope}'")
84 + raise HTTPException(
85 + status_code=status.HTTP_401_UNAUTHORIZED,
86 + detail="權限不足 (Not enough permissions)",
87 + headers={"WWW-Authenticate": f'Bearer scope="{scope}"'},
88 + )
89 +
90 + def __call__(
91 + self,
92 + creds: Annotated[HTTPAuthorizationCredentials, Depends(HTTPBearer())],
93 + security_scopes: SecurityScopes
94 + ) -> UserProfile:
95 + """
96 + FastAPI 依賴注入入口。
97 + 同時接收 Token 與該路由設定的 scopes。
98 + """
99 + token = creds.credentials
100 + required_scopes = security_scopes.scopes
101 +
102 + # 1. 驗證 Token 是否存在
103 + user_data = self._get_user_by_token(token)
104 + if not user_data:
105 + logger.error(f"登入失敗: 無效的 Token - {token[:8]}...")
106 + raise HTTPException(
107 + status_code=status.HTTP_401_UNAUTHORIZED,
108 + detail="無效的憑證",
109 + headers={"WWW-Authenticate": "Bearer"},
110 + )
111 +
112 + # 2. 驗證權限範圍 (如果有要求 Scope)
113 + if required_scopes:
114 + logger.debug(f"驗證權限: 需要 {required_scopes}, 擁有 {user_data['scopes']}")
115 + self._verify_scopes(user_data["scopes"], required_scopes)
116 +
117 + logger.success(f"認證通過: {user_data['username']}")
118 + return UserProfile(**user_data)
119 +
120 + # -----------------------------------------------------------------------------
121 + # 4. FastAPI 應用程式與路由
122 + # -----------------------------------------------------------------------------
123 +
124 + # 初始化服務
125 + auth_service = AuthService()
126 +
127 + app = FastAPI(title="FastAPI Scopes & Loguru Example")
128 +
129 + @app.get(
130 + "/users/me",
131 + response_model=UserProfile,
132 + # 使用 Security() 來注入依賴並指定需要的 scopes
133 + dependencies=[Security(auth_service, scopes=["users:read"])]
134 + )
135 + def read_own_profile(current_user: Annotated[UserProfile, Security(auth_service, scopes=["users:read"])]):
136 + """
137 + 一般使用者路由。
138 + 需要 scope: `users:read`
139 + """
140 + return current_user
141 +
142 + @app.get(
143 + "/admin/system",
144 + response_model=SystemStatus,
145 + # 這是一個受保護的管理員路由
146 + dependencies=[Security(auth_service, scopes=["system:status"])]
147 + )
148 + def read_system_status():
149 + """
150 + 管理員專用路由。
151 + 需要 scope: `system:status`
152 + (Guest Token 嘗試存取此處會失敗)
153 + """
154 + logger.info("管理員正在存取系統狀態...")
155 + return SystemStatus(status="Online", version="2.0.0")
156 +
157 + # -----------------------------------------------------------------------------
158 + # 5. 程式進入點
159 + # -----------------------------------------------------------------------------
160 +
161 + if __name__ == "__main__":
162 + logger.info("啟動服務中 | http://127.0.0.1:8000")
163 + uvicorn.run(app, host="127.0.0.1", port=8000)
Novější Starší