timmy / 使用 Threading 進行非同步延遲執行

0 j'aimes
0 forks
1 fichiers
Dernière activité 10 months ago
這段程式碼使用 Python 的 threading 模組來實現非同步延遲執行。當 async_sleep(seconds) 被呼叫時,它會建立一個新的執行緒來執行 sleep() 函式,而不會阻塞主執行緒。這允許主程式繼續執行其他任務,同時在指定時間後執行 delayed_execution()。這對於需要非同步延遲執行的應用場景,如計時器或背景任務,特別有用。
1 import threading
2 import time
3
4 def delayed_execution():
5 # 在這裡執行延遲後的任務
6 print("Delayed execution")
7
8 def async_sleep(seconds):
9 def sleep():
10 time.sleep(seconds)

timmy / 基於 WebSocket 的遠端指令執行服務

0 j'aimes
0 forks
1 fichiers
Dernière activité 10 months ago
這段程式碼建立了一個 WebSocket 伺服器,允許遠端用戶端傳送指令,並在伺服器端執行該指令後回傳執行結果。適用於遠端系統管理、指令控制或測試環境中的即時互動,但因為直接執行來自用戶端的指令,需特別注意安全性風險,如權限控管與輸入驗證,以防止指令注入攻擊。
1 import asyncio
2 import websockets
3 import subprocess
4
5 async def handle_command(websocket, path):
6 try:
7 async for message in websocket:
8 print(f"Received command: {message}")
9
10 # 執行系統指令
Dernière activité 10 months ago
這段程式碼使用 Streamlit 建立一個 WebSocket 測試介面,當使用者按下按鈕時,會透過 WebSocket 用戶端向伺服器發送訊息,並顯示伺服器的回應。同時,它會在背景啟動 WebSocket 伺服器(如果尚未啟動),允許用戶端與其通訊,適用於即時通訊測試、開發 WebSocket 服務或簡單的雙向訊息傳遞應用。
1 import asyncio
2 import websockets
3 import streamlit as st
4 import threading
5 import socket
6
7 # Function to check if a port is free
8 def is_port_free(port):
9 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
10 return s.connect_ex(('localhost', port)) != 0
Plus récent Plus ancien