timmy bu gisti düzenledi 1 week ago. Düzenlemeye git
1 file changed, 163 insertions
fastapi_scoped_auth_loguru.py(dosya oluşturuldu)
| @@ -0,0 +1,163 @@ | |||
| 1 | + | # /// script | |
| 2 | + | # requires-python = ">=3.12" | |
| 3 | + | # dependencies = [ | |
| 4 | + | # "fastapi", | |
| 5 | + | # "uvicorn", | |
| 6 | + | # "pydantic", | |
| 7 | + | # "loguru", | |
| 8 | + | # ] | |
| 9 | + | # /// | |
| 10 | + | ||
| 11 | + | import uvicorn | |
| 12 | + | from typing import Annotated, Dict, List, Optional | |
| 13 | + | from fastapi import FastAPI, Depends, HTTPException, status, Security | |
| 14 | + | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, SecurityScopes | |
| 15 | + | from pydantic import BaseModel | |
| 16 | + | from loguru import logger | |
| 17 | + | ||
| 18 | + | # ----------------------------------------------------------------------------- | |
| 19 | + | # 1. 基礎設定與 Mock 資料 | |
| 20 | + | # ----------------------------------------------------------------------------- | |
| 21 | + | ||
| 22 | + | # 假資料:使用者資料庫 (新增 scopes 欄位) | |
| 23 | + | MOCK_USER_DB: Dict[str, Dict] = { | |
| 24 | + | "admin-secret-token": { | |
| 25 | + | "username": "admin_user", | |
| 26 | + | "role": "admin", | |
| 27 | + | # Admin 擁有所有權限 | |
| 28 | + | "scopes": ["users:read", "users:write", "system:status"] | |
| 29 | + | }, | |
| 30 | + | "guest-access-token": { | |
| 31 | + | "username": "guest_user", | |
| 32 | + | "role": "guest", | |
| 33 | + | # Guest 只有讀取權限 | |
| 34 | + | "scopes": ["users:read"] | |
| 35 | + | } | |
| 36 | + | } | |
| 37 | + | ||
| 38 | + | # ----------------------------------------------------------------------------- | |
| 39 | + | # 2. 資料模型 (Pydantic Models) | |
| 40 | + | # ----------------------------------------------------------------------------- | |
| 41 | + | ||
| 42 | + | class UserProfile(BaseModel): | |
| 43 | + | """使用者資料模型,包含權限列表""" | |
| 44 | + | username: str | |
| 45 | + | role: str | |
| 46 | + | scopes: List[str] = [] | |
| 47 | + | ||
| 48 | + | class SystemStatus(BaseModel): | |
| 49 | + | status: str | |
| 50 | + | version: str | |
| 51 | + | ||
| 52 | + | # ----------------------------------------------------------------------------- | |
| 53 | + | # 3. 核心邏輯類別 (Service Class) | |
| 54 | + | # ----------------------------------------------------------------------------- | |
| 55 | + | ||
| 56 | + | class AuthService: | |
| 57 | + | """ | |
| 58 | + | 處理認證 (Authentication) 與授權 (Authorization) 的服務。 | |
| 59 | + | """ | |
| 60 | + | ||
| 61 | + | def __init__(self): | |
| 62 | + | self.security_scheme = HTTPBearer(auto_error=True) | |
| 63 | + | ||
| 64 | + | def _get_user_by_token(self, token: str) -> Optional[Dict]: | |
| 65 | + | """從 Mock DB 撈取使用者""" | |
| 66 | + | return MOCK_USER_DB.get(token) | |
| 67 | + | ||
| 68 | + | def _verify_scopes(self, user_scopes: List[str], required_scopes: List[str]) -> None: | |
| 69 | + | """ | |
| 70 | + | 檢查使用者是否具備 API 所需的權限範圍。 | |
| 71 | + | ||
| 72 | + | Args: | |
| 73 | + | user_scopes: 使用者擁有的權限 | |
| 74 | + | required_scopes: API 端點要求的權限 | |
| 75 | + | ||
| 76 | + | Raises: | |
| 77 | + | HTTPException: 權限不足時拋出 403 | |
| 78 | + | """ | |
| 79 | + | # 使用 set 操作來檢查是否包含所有必要權限 | |
| 80 | + | # 如果 required_scopes 中的任何一個不在 user_scopes 裡,則拒絕 | |
| 81 | + | for scope in required_scopes: | |
| 82 | + | if scope not in user_scopes: | |
| 83 | + | logger.warning(f"權限不足: 缺少 scope '{scope}'") | |
| 84 | + | raise HTTPException( | |
| 85 | + | status_code=status.HTTP_401_UNAUTHORIZED, | |
| 86 | + | detail="權限不足 (Not enough permissions)", | |
| 87 | + | headers={"WWW-Authenticate": f'Bearer scope="{scope}"'}, | |
| 88 | + | ) | |
| 89 | + | ||
| 90 | + | def __call__( | |
| 91 | + | self, | |
| 92 | + | creds: Annotated[HTTPAuthorizationCredentials, Depends(HTTPBearer())], | |
| 93 | + | security_scopes: SecurityScopes | |
| 94 | + | ) -> UserProfile: | |
| 95 | + | """ | |
| 96 | + | FastAPI 依賴注入入口。 | |
| 97 | + | 同時接收 Token 與該路由設定的 scopes。 | |
| 98 | + | """ | |
| 99 | + | token = creds.credentials | |
| 100 | + | required_scopes = security_scopes.scopes | |
| 101 | + | ||
| 102 | + | # 1. 驗證 Token 是否存在 | |
| 103 | + | user_data = self._get_user_by_token(token) | |
| 104 | + | if not user_data: | |
| 105 | + | logger.error(f"登入失敗: 無效的 Token - {token[:8]}...") | |
| 106 | + | raise HTTPException( | |
| 107 | + | status_code=status.HTTP_401_UNAUTHORIZED, | |
| 108 | + | detail="無效的憑證", | |
| 109 | + | headers={"WWW-Authenticate": "Bearer"}, | |
| 110 | + | ) | |
| 111 | + | ||
| 112 | + | # 2. 驗證權限範圍 (如果有要求 Scope) | |
| 113 | + | if required_scopes: | |
| 114 | + | logger.debug(f"驗證權限: 需要 {required_scopes}, 擁有 {user_data['scopes']}") | |
| 115 | + | self._verify_scopes(user_data["scopes"], required_scopes) | |
| 116 | + | ||
| 117 | + | logger.success(f"認證通過: {user_data['username']}") | |
| 118 | + | return UserProfile(**user_data) | |
| 119 | + | ||
| 120 | + | # ----------------------------------------------------------------------------- | |
| 121 | + | # 4. FastAPI 應用程式與路由 | |
| 122 | + | # ----------------------------------------------------------------------------- | |
| 123 | + | ||
| 124 | + | # 初始化服務 | |
| 125 | + | auth_service = AuthService() | |
| 126 | + | ||
| 127 | + | app = FastAPI(title="FastAPI Scopes & Loguru Example") | |
| 128 | + | ||
| 129 | + | @app.get( | |
| 130 | + | "/users/me", | |
| 131 | + | response_model=UserProfile, | |
| 132 | + | # 使用 Security() 來注入依賴並指定需要的 scopes | |
| 133 | + | dependencies=[Security(auth_service, scopes=["users:read"])] | |
| 134 | + | ) | |
| 135 | + | def read_own_profile(current_user: Annotated[UserProfile, Security(auth_service, scopes=["users:read"])]): | |
| 136 | + | """ | |
| 137 | + | 一般使用者路由。 | |
| 138 | + | 需要 scope: `users:read` | |
| 139 | + | """ | |
| 140 | + | return current_user | |
| 141 | + | ||
| 142 | + | @app.get( | |
| 143 | + | "/admin/system", | |
| 144 | + | response_model=SystemStatus, | |
| 145 | + | # 這是一個受保護的管理員路由 | |
| 146 | + | dependencies=[Security(auth_service, scopes=["system:status"])] | |
| 147 | + | ) | |
| 148 | + | def read_system_status(): | |
| 149 | + | """ | |
| 150 | + | 管理員專用路由。 | |
| 151 | + | 需要 scope: `system:status` | |
| 152 | + | (Guest Token 嘗試存取此處會失敗) | |
| 153 | + | """ | |
| 154 | + | logger.info("管理員正在存取系統狀態...") | |
| 155 | + | return SystemStatus(status="Online", version="2.0.0") | |
| 156 | + | ||
| 157 | + | # ----------------------------------------------------------------------------- | |
| 158 | + | # 5. 程式進入點 | |
| 159 | + | # ----------------------------------------------------------------------------- | |
| 160 | + | ||
| 161 | + | if __name__ == "__main__": | |
| 162 | + | logger.info("啟動服務中 | http://127.0.0.1:8000") | |
| 163 | + | uvicorn.run(app, host="127.0.0.1", port=8000) | |
Daha yeni
Daha eski