Utoljára aktív 10 months ago

這段程式碼使用 Streamlit 建立一個 WebSocket 測試介面,當使用者按下按鈕時,會透過 WebSocket 用戶端向伺服器發送訊息,並顯示伺服器的回應。同時,它會在背景啟動 WebSocket 伺服器(如果尚未啟動),允許用戶端與其通訊,適用於即時通訊測試、開發 WebSocket 服務或簡單的雙向訊息傳遞應用。

Revízió 8053b5bd99684a40c4dfb5bcb255ecab1bd65130

websocket_streamlit_app.py Eredeti
1import asyncio
2import websockets
3import streamlit as st
4import threading
5import socket
6
7# Function to check if a port is free
8def is_port_free(port):
9 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
10 return s.connect_ex(('localhost', port)) != 0
11
12# WebSocket Server Logic
13async def echo(websocket, path):
14 async for message in websocket:
15 print(f"收到訊息: {message}")
16 await websocket.send(f"伺服器回應: {message}")
17
18# Function to start the WebSocket server
19def start_server():
20 new_loop = asyncio.new_event_loop()
21 asyncio.set_event_loop(new_loop)
22 server = websockets.serve(echo, "localhost", 8765)
23 new_loop.run_until_complete(server)
24 new_loop.run_forever()
25
26# Initialize session state for server
27if 'server_started' not in st.session_state:
28 st.session_state.server_started = False
29
30# Start the server only once
31if not st.session_state.server_started:
32 if is_port_free(8765):
33 server_thread = threading.Thread(target=start_server, daemon=True)
34 server_thread.start()
35 st.session_state.server_started = True
36 st.success("WebSocket 伺服器已啟動。")
37 else:
38 st.error("Port 8765 已被佔用。請確保沒有其他應用程式使用該埠,或更改埠號。")
39
40st.title("WebSocket Example")
41
42# WebSocket Client Logic
43async def websocket_client():
44 try:
45 async with websockets.connect("ws://localhost:8765") as websocket:
46 await websocket.send("你好,WebSocket!")
47 response = await websocket.recv()
48 st.write(f"伺服器回應: {response}")
49 except Exception as e:
50 st.error(f"WebSocket 連線失敗: {e}")
51
52# Run WebSocket Client on Button Click
53if st.button("發送訊息"):
54 asyncio.run(websocket_client())
55
56