data_handler_strategy_yaml_authenticator.py
· 5.3 KiB · Python
Eredeti
import json
from abc import ABC, abstractmethod
import streamlit as st
import streamlit_authenticator as stauth
import yaml
# 策略介面
class DataHandlerStrategy(ABC):
"""資料處理策略介面"""
@abstractmethod
def read_data(self, path):
"""讀取資料"""
pass
@abstractmethod
def write_data(self, path, data):
"""寫入資料"""
pass
# YAML 策略
class YamlHandler(DataHandlerStrategy):
"""YAML 資料處理策略"""
def read_data(self, path):
"""讀取 YAML 資料"""
with open(path, "r") as f:
return yaml.safe_load(f)
def write_data(self, path, data):
"""寫入 YAML 資料"""
with open(path, "w") as f:
yaml.safe_dump(data, f)
# JSON 策略
class JsonHandler(DataHandlerStrategy):
"""JSON 資料處理策略"""
def read_data(self, path):
"""讀取 JSON 資料"""
with open(path, "r") as f:
return json.load(f)
def write_data(self, path, data):
"""寫入 JSON 資料"""
with open(path, "w") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
class AuthMixin:
"""驗證混合類別"""
def __init__(self, config_path="./config.yaml", handler: DataHandlerStrategy = YamlHandler()):
self.config_path = config_path
self.handler = handler
self.config = self.load_config(config_path)
self.authenticator = self.create_authenticator()
def load_config(self, config_path):
"""載入配置檔案"""
try:
return self.handler.read_data(config_path)
except FileNotFoundError:
st.error(f"The configuration file {config_path} was not found.")
st.stop()
def create_authenticator(self):
"""建立驗證器"""
config = self.config
return stauth.Authenticate(
config["credentials"],
config["cookie"]["name"],
config["cookie"]["key"],
config["cookie"]["expiry_days"],
config["preauthorized"],
)
def login(self):
"""登入"""
fields = {
"Form name": "Login",
"Username": "Username",
"Password": "Password",
"Login": "Login",
}
name, authentication_status, username = self.authenticator.login(location="main", fields=fields)
print(f"Name: {name}, Auth Status: {authentication_status}, Username: {username}")
return name, authentication_status, username
def logout(self):
"""登出"""
self.authenticator.logout("Logout", "main", key="unique_key")
def reset_password(self, username):
"""重設密碼"""
fields = {
"Form name": "Reset password",
"Username": "Username",
"Password": "New password",
"Repeat password": "Repeat password",
"Submit": "Submit",
}
try:
if self.authenticator.reset_password(username, fields=fields):
st.success("Password modified successfully")
self.update_config() # 在成功修改密碼後更新配置文件
except Exception as e:
st.error(e)
def update_config(self):
"""更新配置檔案"""
try:
self.handler.write_data(self.config_path, self.config)
except FileNotFoundError:
st.error(f"The configuration file {self.config_path} was not found for writing.")
def is_authenticated(self):
"""檢查是否已驗證"""
_, authentication_status, _ = self.login()
return authentication_status is True
class MyApp(AuthMixin):
"""應用程式主類別"""
def __init__(self, config_path=None, handler=None):
if config_path and handler:
AuthMixin.__init__(self, config_path, handler)
self.run()
def run(self):
"""執行應用程式"""
if hasattr(self, "authenticator"):
self.handle_authenticated()
else:
self.display_content()
def handle_authenticated(self):
"""處理已驗證使用者"""
name, authentication_status, username = self.login()
if authentication_status: # 如果驗證狀態為真
self.logout() # 顯示 "Logout" 按鈕,連接到 "main",並使用特定的 key
st.write(f"Welcome *{name}*") # 顯示歡迎訊息,使用 st.session_state 中的 name 屬性
self.display_content()
self.reset_password(username) # 嘗試重設密碼
elif authentication_status is False: # 如果驗證狀態為假
st.error("Username/password is incorrect") # 顯示錯誤訊息,提示使用者帳號或密碼不正確
elif authentication_status is None: # 如果驗證狀態為空
st.warning("Please enter your username and password") # 顯示警告訊息,提示使用者輸入帳號和密碼
def display_content(self):
"""顯示內容"""
st.title("Some content") # 顯示標題為 "Some content"
if __name__ == "__main__":
# 使用 AuthMixin 進行驗證
# MyApp(config_path="./config.yaml", handler=YamlHandler())
# 使用 JSON 處理器進行驗證
# MyApp(config_path="./config.json", handler=JsonHandler())
# 不使用 AuthMixin 進行驗證
MyApp()
| 1 | import json |
| 2 | from abc import ABC, abstractmethod |
| 3 | |
| 4 | import streamlit as st |
| 5 | import streamlit_authenticator as stauth |
| 6 | import yaml |
| 7 | |
| 8 | |
| 9 | # 策略介面 |
| 10 | class DataHandlerStrategy(ABC): |
| 11 | """資料處理策略介面""" |
| 12 | |
| 13 | @abstractmethod |
| 14 | def read_data(self, path): |
| 15 | """讀取資料""" |
| 16 | pass |
| 17 | |
| 18 | @abstractmethod |
| 19 | def write_data(self, path, data): |
| 20 | """寫入資料""" |
| 21 | pass |
| 22 | |
| 23 | |
| 24 | # YAML 策略 |
| 25 | class YamlHandler(DataHandlerStrategy): |
| 26 | """YAML 資料處理策略""" |
| 27 | |
| 28 | def read_data(self, path): |
| 29 | """讀取 YAML 資料""" |
| 30 | with open(path, "r") as f: |
| 31 | return yaml.safe_load(f) |
| 32 | |
| 33 | def write_data(self, path, data): |
| 34 | """寫入 YAML 資料""" |
| 35 | with open(path, "w") as f: |
| 36 | yaml.safe_dump(data, f) |
| 37 | |
| 38 | |
| 39 | # JSON 策略 |
| 40 | class JsonHandler(DataHandlerStrategy): |
| 41 | """JSON 資料處理策略""" |
| 42 | |
| 43 | def read_data(self, path): |
| 44 | """讀取 JSON 資料""" |
| 45 | with open(path, "r") as f: |
| 46 | return json.load(f) |
| 47 | |
| 48 | def write_data(self, path, data): |
| 49 | """寫入 JSON 資料""" |
| 50 | with open(path, "w") as f: |
| 51 | json.dump(data, f, ensure_ascii=False, indent=4) |
| 52 | |
| 53 | |
| 54 | class AuthMixin: |
| 55 | """驗證混合類別""" |
| 56 | |
| 57 | def __init__(self, config_path="./config.yaml", handler: DataHandlerStrategy = YamlHandler()): |
| 58 | self.config_path = config_path |
| 59 | self.handler = handler |
| 60 | self.config = self.load_config(config_path) |
| 61 | self.authenticator = self.create_authenticator() |
| 62 | |
| 63 | def load_config(self, config_path): |
| 64 | """載入配置檔案""" |
| 65 | try: |
| 66 | return self.handler.read_data(config_path) |
| 67 | except FileNotFoundError: |
| 68 | st.error(f"The configuration file {config_path} was not found.") |
| 69 | st.stop() |
| 70 | |
| 71 | def create_authenticator(self): |
| 72 | """建立驗證器""" |
| 73 | config = self.config |
| 74 | return stauth.Authenticate( |
| 75 | config["credentials"], |
| 76 | config["cookie"]["name"], |
| 77 | config["cookie"]["key"], |
| 78 | config["cookie"]["expiry_days"], |
| 79 | config["preauthorized"], |
| 80 | ) |
| 81 | |
| 82 | def login(self): |
| 83 | """登入""" |
| 84 | fields = { |
| 85 | "Form name": "Login", |
| 86 | "Username": "Username", |
| 87 | "Password": "Password", |
| 88 | "Login": "Login", |
| 89 | } |
| 90 | name, authentication_status, username = self.authenticator.login(location="main", fields=fields) |
| 91 | print(f"Name: {name}, Auth Status: {authentication_status}, Username: {username}") |
| 92 | return name, authentication_status, username |
| 93 | |
| 94 | def logout(self): |
| 95 | """登出""" |
| 96 | self.authenticator.logout("Logout", "main", key="unique_key") |
| 97 | |
| 98 | def reset_password(self, username): |
| 99 | """重設密碼""" |
| 100 | fields = { |
| 101 | "Form name": "Reset password", |
| 102 | "Username": "Username", |
| 103 | "Password": "New password", |
| 104 | "Repeat password": "Repeat password", |
| 105 | "Submit": "Submit", |
| 106 | } |
| 107 | try: |
| 108 | if self.authenticator.reset_password(username, fields=fields): |
| 109 | st.success("Password modified successfully") |
| 110 | self.update_config() # 在成功修改密碼後更新配置文件 |
| 111 | except Exception as e: |
| 112 | st.error(e) |
| 113 | |
| 114 | def update_config(self): |
| 115 | """更新配置檔案""" |
| 116 | try: |
| 117 | self.handler.write_data(self.config_path, self.config) |
| 118 | except FileNotFoundError: |
| 119 | st.error(f"The configuration file {self.config_path} was not found for writing.") |
| 120 | |
| 121 | def is_authenticated(self): |
| 122 | """檢查是否已驗證""" |
| 123 | _, authentication_status, _ = self.login() |
| 124 | return authentication_status is True |
| 125 | |
| 126 | |
| 127 | class MyApp(AuthMixin): |
| 128 | """應用程式主類別""" |
| 129 | |
| 130 | def __init__(self, config_path=None, handler=None): |
| 131 | if config_path and handler: |
| 132 | AuthMixin.__init__(self, config_path, handler) |
| 133 | self.run() |
| 134 | |
| 135 | def run(self): |
| 136 | """執行應用程式""" |
| 137 | |
| 138 | if hasattr(self, "authenticator"): |
| 139 | self.handle_authenticated() |
| 140 | else: |
| 141 | self.display_content() |
| 142 | |
| 143 | def handle_authenticated(self): |
| 144 | """處理已驗證使用者""" |
| 145 | |
| 146 | name, authentication_status, username = self.login() |
| 147 | |
| 148 | if authentication_status: # 如果驗證狀態為真 |
| 149 | self.logout() # 顯示 "Logout" 按鈕,連接到 "main",並使用特定的 key |
| 150 | st.write(f"Welcome *{name}*") # 顯示歡迎訊息,使用 st.session_state 中的 name 屬性 |
| 151 | self.display_content() |
| 152 | self.reset_password(username) # 嘗試重設密碼 |
| 153 | elif authentication_status is False: # 如果驗證狀態為假 |
| 154 | st.error("Username/password is incorrect") # 顯示錯誤訊息,提示使用者帳號或密碼不正確 |
| 155 | elif authentication_status is None: # 如果驗證狀態為空 |
| 156 | st.warning("Please enter your username and password") # 顯示警告訊息,提示使用者輸入帳號和密碼 |
| 157 | |
| 158 | def display_content(self): |
| 159 | """顯示內容""" |
| 160 | st.title("Some content") # 顯示標題為 "Some content" |
| 161 | |
| 162 | |
| 163 | if __name__ == "__main__": |
| 164 | # 使用 AuthMixin 進行驗證 |
| 165 | # MyApp(config_path="./config.yaml", handler=YamlHandler()) |
| 166 | |
| 167 | # 使用 JSON 處理器進行驗證 |
| 168 | # MyApp(config_path="./config.json", handler=JsonHandler()) |
| 169 | |
| 170 | # 不使用 AuthMixin 進行驗證 |
| 171 | MyApp() |
| 172 |