fastapi_auth_service.py
· 5.4 KiB · Python
Bruto
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "fastapi",
# "uvicorn",
# "pydantic",
# ]
# ///
import logging
import uvicorn
from typing import Annotated, Dict, Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
# -----------------------------------------------------------------------------
# 1. 基礎設定與 Mock 資料 (Mock Data)
# -----------------------------------------------------------------------------
# 設定 Logger,分級顯示資訊
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - [%(levelname)s] - %(message)s"
)
logger = logging.getLogger("AuthSystem")
# 假資料:使用者資料庫 (Table-driven lookup)
# 使用 Dict 作為查找表,取代資料庫查詢
MOCK_USER_DB: Dict[str, Dict[str, str]] = {
"admin-secret-token": {
"username": "admin_user",
"email": "admin@example.com",
"role": "admin"
},
"guest-access-token": {
"username": "guest_user",
"email": "guest@example.com",
"role": "guest"
}
}
# -----------------------------------------------------------------------------
# 2. 資料模型 (Pydantic Models)
# -----------------------------------------------------------------------------
class UserProfile(BaseModel):
"""回傳給前端的使用者資料模型"""
username: str
email: str
role: str
class SystemStatus(BaseModel):
"""系統狀態模型"""
status: str
active_users: int
# -----------------------------------------------------------------------------
# 3. 核心邏輯類別 (Service Class)
# -----------------------------------------------------------------------------
class AuthService:
"""
處理認證與授權的服務類別。
將邏輯封裝在此,避免 API 路由函式過於肥大。
"""
def __init__(self):
# FastAPI 的 HTTPBearer 會自動檢查 Authorization header 是否存在
# auto_error=True 會在 header 缺失時自動回傳 403/401
self.security_scheme = HTTPBearer(auto_error=True)
def _get_user_from_db(self, token: str) -> Optional[Dict[str, str]]:
"""
模擬從資料庫撈取使用者。
Args:
token: 用戶提供的 Bearer Token
Returns:
Dict: 使用者資料或是 None
"""
return MOCK_USER_DB.get(token)
def _validate_credentials(self, token: str) -> UserProfile:
"""
驗證 Token 的有效性並轉換為 Pydantic 模型。
Raises:
HTTPException: 當 Token 無效時拋出 401
"""
user_data = self._get_user_from_db(token)
# 使用 Guard Clauses (衛語句) 避免深層巢狀分支
if not user_data:
logger.warning(f"收到無效的 Token 嘗試存取: {token}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="無效的憑證 (Invalid Token)",
headers={"WWW-Authenticate": "Bearer"},
)
logger.info(f"使用者驗證成功: {user_data['username']}")
return UserProfile(**user_data)
def __call__(self, creds: Annotated[HTTPAuthorizationCredentials, Depends(HTTPBearer())]) -> UserProfile:
"""
使類別實例可被呼叫 (Callable),作為 FastAPI 的依賴項 (Dependency)。
Args:
creds: FastAPI 自動解析的 Bearer 憑證物件
Returns:
UserProfile: 驗證通過的使用者物件
"""
token = creds.credentials
logger.debug(f"開始驗證 Token: {token[:5]}***") # 遮罩敏感資訊
return self._validate_credentials(token)
# -----------------------------------------------------------------------------
# 4. FastAPI 應用程式與路由
# -----------------------------------------------------------------------------
# 初始化服務實例
auth_service = AuthService()
app = FastAPI(
title="FastAPI HTTPBearer Example",
description="示範使用 Class-based 依賴注入進行 Token 驗證",
version="1.0.0"
)
@app.get("/me", response_model=UserProfile)
def read_current_user(current_user: Annotated[UserProfile, Depends(auth_service)]):
"""
取得當前登入使用者的資訊。
- 需要 Bearer Token 授權
- 根據 Token 回傳對應的 Mock Data
"""
logger.info(f"API /me 被呼叫,使用者: {current_user.username}")
return current_user
@app.get("/status", response_model=SystemStatus)
def read_system_status(current_user: Annotated[UserProfile, Depends(auth_service)]):
"""
取得系統狀態 (僅示範用)。
"""
# 這裡可以用 switch/match 處理不同角色的權限 (表驅動概念)
# Python 3.10+ match 語法
match current_user.role:
case "admin":
msg = "系統運作正常 (Admin View)"
case _:
msg = "系統運作正常"
return SystemStatus(status=msg, active_users=len(MOCK_USER_DB))
# -----------------------------------------------------------------------------
# 5. 程式進入點
# -----------------------------------------------------------------------------
if __name__ == "__main__":
# 使用 uvicorn 啟動服務
# host="0.0.0.0" 允許外部連線
logger.info("正在啟動 FastAPI 服務...")
uvicorn.run(app, host="127.0.0.1", port=8000)
| 1 | # /// script |
| 2 | # requires-python = ">=3.12" |
| 3 | # dependencies = [ |
| 4 | # "fastapi", |
| 5 | # "uvicorn", |
| 6 | # "pydantic", |
| 7 | # ] |
| 8 | # /// |
| 9 | |
| 10 | import logging |
| 11 | import uvicorn |
| 12 | from typing import Annotated, Dict, Optional |
| 13 | from fastapi import FastAPI, Depends, HTTPException, status |
| 14 | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| 15 | from pydantic import BaseModel |
| 16 | |
| 17 | # ----------------------------------------------------------------------------- |
| 18 | # 1. 基礎設定與 Mock 資料 (Mock Data) |
| 19 | # ----------------------------------------------------------------------------- |
| 20 | |
| 21 | # 設定 Logger,分級顯示資訊 |
| 22 | logging.basicConfig( |
| 23 | level=logging.INFO, |
| 24 | format="%(asctime)s - [%(levelname)s] - %(message)s" |
| 25 | ) |
| 26 | logger = logging.getLogger("AuthSystem") |
| 27 | |
| 28 | # 假資料:使用者資料庫 (Table-driven lookup) |
| 29 | # 使用 Dict 作為查找表,取代資料庫查詢 |
| 30 | MOCK_USER_DB: Dict[str, Dict[str, str]] = { |
| 31 | "admin-secret-token": { |
| 32 | "username": "admin_user", |
| 33 | "email": "admin@example.com", |
| 34 | "role": "admin" |
| 35 | }, |
| 36 | "guest-access-token": { |
| 37 | "username": "guest_user", |
| 38 | "email": "guest@example.com", |
| 39 | "role": "guest" |
| 40 | } |
| 41 | } |
| 42 | |
| 43 | # ----------------------------------------------------------------------------- |
| 44 | # 2. 資料模型 (Pydantic Models) |
| 45 | # ----------------------------------------------------------------------------- |
| 46 | |
| 47 | class UserProfile(BaseModel): |
| 48 | """回傳給前端的使用者資料模型""" |
| 49 | username: str |
| 50 | email: str |
| 51 | role: str |
| 52 | |
| 53 | class SystemStatus(BaseModel): |
| 54 | """系統狀態模型""" |
| 55 | status: str |
| 56 | active_users: int |
| 57 | |
| 58 | # ----------------------------------------------------------------------------- |
| 59 | # 3. 核心邏輯類別 (Service Class) |
| 60 | # ----------------------------------------------------------------------------- |
| 61 | |
| 62 | class AuthService: |
| 63 | """ |
| 64 | 處理認證與授權的服務類別。 |
| 65 | 將邏輯封裝在此,避免 API 路由函式過於肥大。 |
| 66 | """ |
| 67 | |
| 68 | def __init__(self): |
| 69 | # FastAPI 的 HTTPBearer 會自動檢查 Authorization header 是否存在 |
| 70 | # auto_error=True 會在 header 缺失時自動回傳 403/401 |
| 71 | self.security_scheme = HTTPBearer(auto_error=True) |
| 72 | |
| 73 | def _get_user_from_db(self, token: str) -> Optional[Dict[str, str]]: |
| 74 | """ |
| 75 | 模擬從資料庫撈取使用者。 |
| 76 | |
| 77 | Args: |
| 78 | token: 用戶提供的 Bearer Token |
| 79 | |
| 80 | Returns: |
| 81 | Dict: 使用者資料或是 None |
| 82 | """ |
| 83 | return MOCK_USER_DB.get(token) |
| 84 | |
| 85 | def _validate_credentials(self, token: str) -> UserProfile: |
| 86 | """ |
| 87 | 驗證 Token 的有效性並轉換為 Pydantic 模型。 |
| 88 | |
| 89 | Raises: |
| 90 | HTTPException: 當 Token 無效時拋出 401 |
| 91 | """ |
| 92 | user_data = self._get_user_from_db(token) |
| 93 | |
| 94 | # 使用 Guard Clauses (衛語句) 避免深層巢狀分支 |
| 95 | if not user_data: |
| 96 | logger.warning(f"收到無效的 Token 嘗試存取: {token}") |
| 97 | raise HTTPException( |
| 98 | status_code=status.HTTP_401_UNAUTHORIZED, |
| 99 | detail="無效的憑證 (Invalid Token)", |
| 100 | headers={"WWW-Authenticate": "Bearer"}, |
| 101 | ) |
| 102 | |
| 103 | logger.info(f"使用者驗證成功: {user_data['username']}") |
| 104 | return UserProfile(**user_data) |
| 105 | |
| 106 | def __call__(self, creds: Annotated[HTTPAuthorizationCredentials, Depends(HTTPBearer())]) -> UserProfile: |
| 107 | """ |
| 108 | 使類別實例可被呼叫 (Callable),作為 FastAPI 的依賴項 (Dependency)。 |
| 109 | |
| 110 | Args: |
| 111 | creds: FastAPI 自動解析的 Bearer 憑證物件 |
| 112 | |
| 113 | Returns: |
| 114 | UserProfile: 驗證通過的使用者物件 |
| 115 | """ |
| 116 | token = creds.credentials |
| 117 | logger.debug(f"開始驗證 Token: {token[:5]}***") # 遮罩敏感資訊 |
| 118 | return self._validate_credentials(token) |
| 119 | |
| 120 | # ----------------------------------------------------------------------------- |
| 121 | # 4. FastAPI 應用程式與路由 |
| 122 | # ----------------------------------------------------------------------------- |
| 123 | |
| 124 | # 初始化服務實例 |
| 125 | auth_service = AuthService() |
| 126 | |
| 127 | app = FastAPI( |
| 128 | title="FastAPI HTTPBearer Example", |
| 129 | description="示範使用 Class-based 依賴注入進行 Token 驗證", |
| 130 | version="1.0.0" |
| 131 | ) |
| 132 | |
| 133 | @app.get("/me", response_model=UserProfile) |
| 134 | def read_current_user(current_user: Annotated[UserProfile, Depends(auth_service)]): |
| 135 | """ |
| 136 | 取得當前登入使用者的資訊。 |
| 137 | |
| 138 | - 需要 Bearer Token 授權 |
| 139 | - 根據 Token 回傳對應的 Mock Data |
| 140 | """ |
| 141 | logger.info(f"API /me 被呼叫,使用者: {current_user.username}") |
| 142 | return current_user |
| 143 | |
| 144 | @app.get("/status", response_model=SystemStatus) |
| 145 | def read_system_status(current_user: Annotated[UserProfile, Depends(auth_service)]): |
| 146 | """ |
| 147 | 取得系統狀態 (僅示範用)。 |
| 148 | """ |
| 149 | # 這裡可以用 switch/match 處理不同角色的權限 (表驅動概念) |
| 150 | # Python 3.10+ match 語法 |
| 151 | match current_user.role: |
| 152 | case "admin": |
| 153 | msg = "系統運作正常 (Admin View)" |
| 154 | case _: |
| 155 | msg = "系統運作正常" |
| 156 | |
| 157 | return SystemStatus(status=msg, active_users=len(MOCK_USER_DB)) |
| 158 | |
| 159 | # ----------------------------------------------------------------------------- |
| 160 | # 5. 程式進入點 |
| 161 | # ----------------------------------------------------------------------------- |
| 162 | |
| 163 | if __name__ == "__main__": |
| 164 | # 使用 uvicorn 啟動服務 |
| 165 | # host="0.0.0.0" 允許外部連線 |
| 166 | logger.info("正在啟動 FastAPI 服務...") |
| 167 | uvicorn.run(app, host="127.0.0.1", port=8000) |