Dernière activité 10 months ago

這段程式碼使用 Streamlit 建立一個 WebSocket 測試介面,當使用者按下按鈕時,會透過 WebSocket 用戶端向伺服器發送訊息,並顯示伺服器的回應。同時,它會在背景啟動 WebSocket 伺服器(如果尚未啟動),允許用戶端與其通訊,適用於即時通訊測試、開發 WebSocket 服務或簡單的雙向訊息傳遞應用。

timmy a révisé ce gist 10 months ago. Aller à la révision

Aucun changement

timmy a révisé ce gist 10 months ago. Aller à la révision

Aucun changement

timmy a révisé ce gist 10 months ago. Aller à la révision

Aucun changement

timmy a révisé ce gist 10 months ago. Aller à la révision

1 file changed, 55 insertions

websocket_streamlit_app.py(fichier créé)

@@ -0,0 +1,55 @@
1 + import asyncio
2 + import websockets
3 + import streamlit as st
4 + import threading
5 + import socket
6 +
7 + # Function to check if a port is free
8 + def is_port_free(port):
9 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
10 + return s.connect_ex(('localhost', port)) != 0
11 +
12 + # WebSocket Server Logic
13 + async def echo(websocket, path):
14 + async for message in websocket:
15 + print(f"收到訊息: {message}")
16 + await websocket.send(f"伺服器回應: {message}")
17 +
18 + # Function to start the WebSocket server
19 + def start_server():
20 + new_loop = asyncio.new_event_loop()
21 + asyncio.set_event_loop(new_loop)
22 + server = websockets.serve(echo, "localhost", 8765)
23 + new_loop.run_until_complete(server)
24 + new_loop.run_forever()
25 +
26 + # Initialize session state for server
27 + if 'server_started' not in st.session_state:
28 + st.session_state.server_started = False
29 +
30 + # Start the server only once
31 + if not st.session_state.server_started:
32 + if is_port_free(8765):
33 + server_thread = threading.Thread(target=start_server, daemon=True)
34 + server_thread.start()
35 + st.session_state.server_started = True
36 + st.success("WebSocket 伺服器已啟動。")
37 + else:
38 + st.error("Port 8765 已被佔用。請確保沒有其他應用程式使用該埠,或更改埠號。")
39 +
40 + st.title("WebSocket Example")
41 +
42 + # WebSocket Client Logic
43 + async def websocket_client():
44 + try:
45 + async with websockets.connect("ws://localhost:8765") as websocket:
46 + await websocket.send("你好,WebSocket!")
47 + response = await websocket.recv()
48 + st.write(f"伺服器回應: {response}")
49 + except Exception as e:
50 + st.error(f"WebSocket 連線失敗: {e}")
51 +
52 + # Run WebSocket Client on Button Click
53 + if st.button("發送訊息"):
54 + asyncio.run(websocket_client())
55 +
Plus récent Plus ancien