websocket_streamlit_app.py
· 1.7 KiB · Python
原始檔案
import asyncio
import websockets
import streamlit as st
import threading
import socket
# Function to check if a port is free
def is_port_free(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) != 0
# WebSocket Server Logic
async def echo(websocket, path):
async for message in websocket:
print(f"收到訊息: {message}")
await websocket.send(f"伺服器回應: {message}")
# Function to start the WebSocket server
def start_server():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
server = websockets.serve(echo, "localhost", 8765)
new_loop.run_until_complete(server)
new_loop.run_forever()
# Initialize session state for server
if 'server_started' not in st.session_state:
st.session_state.server_started = False
# Start the server only once
if not st.session_state.server_started:
if is_port_free(8765):
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()
st.session_state.server_started = True
st.success("WebSocket 伺服器已啟動。")
else:
st.error("Port 8765 已被佔用。請確保沒有其他應用程式使用該埠,或更改埠號。")
st.title("WebSocket Example")
# WebSocket Client Logic
async def websocket_client():
try:
async with websockets.connect("ws://localhost:8765") as websocket:
await websocket.send("你好,WebSocket!")
response = await websocket.recv()
st.write(f"伺服器回應: {response}")
except Exception as e:
st.error(f"WebSocket 連線失敗: {e}")
# Run WebSocket Client on Button Click
if st.button("發送訊息"):
asyncio.run(websocket_client())
| 1 | import asyncio |
| 2 | import websockets |
| 3 | import streamlit as st |
| 4 | import threading |
| 5 | import socket |
| 6 | |
| 7 | # Function to check if a port is free |
| 8 | def is_port_free(port): |
| 9 | with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
| 10 | return s.connect_ex(('localhost', port)) != 0 |
| 11 | |
| 12 | # WebSocket Server Logic |
| 13 | async def echo(websocket, path): |
| 14 | async for message in websocket: |
| 15 | print(f"收到訊息: {message}") |
| 16 | await websocket.send(f"伺服器回應: {message}") |
| 17 | |
| 18 | # Function to start the WebSocket server |
| 19 | def start_server(): |
| 20 | new_loop = asyncio.new_event_loop() |
| 21 | asyncio.set_event_loop(new_loop) |
| 22 | server = websockets.serve(echo, "localhost", 8765) |
| 23 | new_loop.run_until_complete(server) |
| 24 | new_loop.run_forever() |
| 25 | |
| 26 | # Initialize session state for server |
| 27 | if 'server_started' not in st.session_state: |
| 28 | st.session_state.server_started = False |
| 29 | |
| 30 | # Start the server only once |
| 31 | if not st.session_state.server_started: |
| 32 | if is_port_free(8765): |
| 33 | server_thread = threading.Thread(target=start_server, daemon=True) |
| 34 | server_thread.start() |
| 35 | st.session_state.server_started = True |
| 36 | st.success("WebSocket 伺服器已啟動。") |
| 37 | else: |
| 38 | st.error("Port 8765 已被佔用。請確保沒有其他應用程式使用該埠,或更改埠號。") |
| 39 | |
| 40 | st.title("WebSocket Example") |
| 41 | |
| 42 | # WebSocket Client Logic |
| 43 | async def websocket_client(): |
| 44 | try: |
| 45 | async with websockets.connect("ws://localhost:8765") as websocket: |
| 46 | await websocket.send("你好,WebSocket!") |
| 47 | response = await websocket.recv() |
| 48 | st.write(f"伺服器回應: {response}") |
| 49 | except Exception as e: |
| 50 | st.error(f"WebSocket 連線失敗: {e}") |
| 51 | |
| 52 | # Run WebSocket Client on Button Click |
| 53 | if st.button("發送訊息"): |
| 54 | asyncio.run(websocket_client()) |
| 55 | |
| 56 |