timmy gist felülvizsgálása 1 week ago. Revízióhoz ugrás
Nincsenek változtatások
timmy gist felülvizsgálása 1 week ago. Revízióhoz ugrás
1 file changed, 167 insertions
fastapi_auth_service.py(fájl létrehozva)
| @@ -0,0 +1,167 @@ | |||
| 1 | + | # /// script | |
| 2 | + | # requires-python = ">=3.12" | |
| 3 | + | # dependencies = [ | |
| 4 | + | # "fastapi", | |
| 5 | + | # "uvicorn", | |
| 6 | + | # "pydantic", | |
| 7 | + | # ] | |
| 8 | + | # /// | |
| 9 | + | ||
| 10 | + | import logging | |
| 11 | + | import uvicorn | |
| 12 | + | from typing import Annotated, Dict, Optional | |
| 13 | + | from fastapi import FastAPI, Depends, HTTPException, status | |
| 14 | + | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| 15 | + | from pydantic import BaseModel | |
| 16 | + | ||
| 17 | + | # ----------------------------------------------------------------------------- | |
| 18 | + | # 1. 基礎設定與 Mock 資料 (Mock Data) | |
| 19 | + | # ----------------------------------------------------------------------------- | |
| 20 | + | ||
| 21 | + | # 設定 Logger,分級顯示資訊 | |
| 22 | + | logging.basicConfig( | |
| 23 | + | level=logging.INFO, | |
| 24 | + | format="%(asctime)s - [%(levelname)s] - %(message)s" | |
| 25 | + | ) | |
| 26 | + | logger = logging.getLogger("AuthSystem") | |
| 27 | + | ||
| 28 | + | # 假資料:使用者資料庫 (Table-driven lookup) | |
| 29 | + | # 使用 Dict 作為查找表,取代資料庫查詢 | |
| 30 | + | MOCK_USER_DB: Dict[str, Dict[str, str]] = { | |
| 31 | + | "admin-secret-token": { | |
| 32 | + | "username": "admin_user", | |
| 33 | + | "email": "admin@example.com", | |
| 34 | + | "role": "admin" | |
| 35 | + | }, | |
| 36 | + | "guest-access-token": { | |
| 37 | + | "username": "guest_user", | |
| 38 | + | "email": "guest@example.com", | |
| 39 | + | "role": "guest" | |
| 40 | + | } | |
| 41 | + | } | |
| 42 | + | ||
| 43 | + | # ----------------------------------------------------------------------------- | |
| 44 | + | # 2. 資料模型 (Pydantic Models) | |
| 45 | + | # ----------------------------------------------------------------------------- | |
| 46 | + | ||
| 47 | + | class UserProfile(BaseModel): | |
| 48 | + | """回傳給前端的使用者資料模型""" | |
| 49 | + | username: str | |
| 50 | + | email: str | |
| 51 | + | role: str | |
| 52 | + | ||
| 53 | + | class SystemStatus(BaseModel): | |
| 54 | + | """系統狀態模型""" | |
| 55 | + | status: str | |
| 56 | + | active_users: int | |
| 57 | + | ||
| 58 | + | # ----------------------------------------------------------------------------- | |
| 59 | + | # 3. 核心邏輯類別 (Service Class) | |
| 60 | + | # ----------------------------------------------------------------------------- | |
| 61 | + | ||
| 62 | + | class AuthService: | |
| 63 | + | """ | |
| 64 | + | 處理認證與授權的服務類別。 | |
| 65 | + | 將邏輯封裝在此,避免 API 路由函式過於肥大。 | |
| 66 | + | """ | |
| 67 | + | ||
| 68 | + | def __init__(self): | |
| 69 | + | # FastAPI 的 HTTPBearer 會自動檢查 Authorization header 是否存在 | |
| 70 | + | # auto_error=True 會在 header 缺失時自動回傳 403/401 | |
| 71 | + | self.security_scheme = HTTPBearer(auto_error=True) | |
| 72 | + | ||
| 73 | + | def _get_user_from_db(self, token: str) -> Optional[Dict[str, str]]: | |
| 74 | + | """ | |
| 75 | + | 模擬從資料庫撈取使用者。 | |
| 76 | + | ||
| 77 | + | Args: | |
| 78 | + | token: 用戶提供的 Bearer Token | |
| 79 | + | ||
| 80 | + | Returns: | |
| 81 | + | Dict: 使用者資料或是 None | |
| 82 | + | """ | |
| 83 | + | return MOCK_USER_DB.get(token) | |
| 84 | + | ||
| 85 | + | def _validate_credentials(self, token: str) -> UserProfile: | |
| 86 | + | """ | |
| 87 | + | 驗證 Token 的有效性並轉換為 Pydantic 模型。 | |
| 88 | + | ||
| 89 | + | Raises: | |
| 90 | + | HTTPException: 當 Token 無效時拋出 401 | |
| 91 | + | """ | |
| 92 | + | user_data = self._get_user_from_db(token) | |
| 93 | + | ||
| 94 | + | # 使用 Guard Clauses (衛語句) 避免深層巢狀分支 | |
| 95 | + | if not user_data: | |
| 96 | + | logger.warning(f"收到無效的 Token 嘗試存取: {token}") | |
| 97 | + | raise HTTPException( | |
| 98 | + | status_code=status.HTTP_401_UNAUTHORIZED, | |
| 99 | + | detail="無效的憑證 (Invalid Token)", | |
| 100 | + | headers={"WWW-Authenticate": "Bearer"}, | |
| 101 | + | ) | |
| 102 | + | ||
| 103 | + | logger.info(f"使用者驗證成功: {user_data['username']}") | |
| 104 | + | return UserProfile(**user_data) | |
| 105 | + | ||
| 106 | + | def __call__(self, creds: Annotated[HTTPAuthorizationCredentials, Depends(HTTPBearer())]) -> UserProfile: | |
| 107 | + | """ | |
| 108 | + | 使類別實例可被呼叫 (Callable),作為 FastAPI 的依賴項 (Dependency)。 | |
| 109 | + | ||
| 110 | + | Args: | |
| 111 | + | creds: FastAPI 自動解析的 Bearer 憑證物件 | |
| 112 | + | ||
| 113 | + | Returns: | |
| 114 | + | UserProfile: 驗證通過的使用者物件 | |
| 115 | + | """ | |
| 116 | + | token = creds.credentials | |
| 117 | + | logger.debug(f"開始驗證 Token: {token[:5]}***") # 遮罩敏感資訊 | |
| 118 | + | return self._validate_credentials(token) | |
| 119 | + | ||
| 120 | + | # ----------------------------------------------------------------------------- | |
| 121 | + | # 4. FastAPI 應用程式與路由 | |
| 122 | + | # ----------------------------------------------------------------------------- | |
| 123 | + | ||
| 124 | + | # 初始化服務實例 | |
| 125 | + | auth_service = AuthService() | |
| 126 | + | ||
| 127 | + | app = FastAPI( | |
| 128 | + | title="FastAPI HTTPBearer Example", | |
| 129 | + | description="示範使用 Class-based 依賴注入進行 Token 驗證", | |
| 130 | + | version="1.0.0" | |
| 131 | + | ) | |
| 132 | + | ||
| 133 | + | @app.get("/me", response_model=UserProfile) | |
| 134 | + | def read_current_user(current_user: Annotated[UserProfile, Depends(auth_service)]): | |
| 135 | + | """ | |
| 136 | + | 取得當前登入使用者的資訊。 | |
| 137 | + | ||
| 138 | + | - 需要 Bearer Token 授權 | |
| 139 | + | - 根據 Token 回傳對應的 Mock Data | |
| 140 | + | """ | |
| 141 | + | logger.info(f"API /me 被呼叫,使用者: {current_user.username}") | |
| 142 | + | return current_user | |
| 143 | + | ||
| 144 | + | @app.get("/status", response_model=SystemStatus) | |
| 145 | + | def read_system_status(current_user: Annotated[UserProfile, Depends(auth_service)]): | |
| 146 | + | """ | |
| 147 | + | 取得系統狀態 (僅示範用)。 | |
| 148 | + | """ | |
| 149 | + | # 這裡可以用 switch/match 處理不同角色的權限 (表驅動概念) | |
| 150 | + | # Python 3.10+ match 語法 | |
| 151 | + | match current_user.role: | |
| 152 | + | case "admin": | |
| 153 | + | msg = "系統運作正常 (Admin View)" | |
| 154 | + | case _: | |
| 155 | + | msg = "系統運作正常" | |
| 156 | + | ||
| 157 | + | return SystemStatus(status=msg, active_users=len(MOCK_USER_DB)) | |
| 158 | + | ||
| 159 | + | # ----------------------------------------------------------------------------- | |
| 160 | + | # 5. 程式進入點 | |
| 161 | + | # ----------------------------------------------------------------------------- | |
| 162 | + | ||
| 163 | + | if __name__ == "__main__": | |
| 164 | + | # 使用 uvicorn 啟動服務 | |
| 165 | + | # host="0.0.0.0" 允許外部連線 | |
| 166 | + | logger.info("正在啟動 FastAPI 服務...") | |
| 167 | + | uvicorn.run(app, host="127.0.0.1", port=8000) | |
Újabb
Régebbi