All gists matching topic non-blocking

Ostatnio aktywne 10 months ago
這段程式碼結合了 Python 的 asyncio 和 threading,透過 loop.run_in_executor 在背景執行同步的 blocking_sleep(),避免阻塞 asyncio 事件迴圈。這允許 async_sleep() 在等待時仍能執行其他非同步任務,適用於需要在 asyncio 應用程式中處理阻塞性 I/O 操作(如 time.sleep())的情境。
1 import asyncio
2 import threading
3 import time
4
5 async def delayed_execution():
6 # 在這裡執行延遲後的任務
7 print("Delayed execution")
8
9 def blocking_sleep(seconds):
10 time.sleep(seconds)

timmy / 使用 Threading 進行非同步延遲執行

0 polubień
0 forków
1 plików
Ostatnio aktywne 10 months ago
這段程式碼使用 Python 的 threading 模組來實現非同步延遲執行。當 async_sleep(seconds) 被呼叫時,它會建立一個新的執行緒來執行 sleep() 函式,而不會阻塞主執行緒。這允許主程式繼續執行其他任務,同時在指定時間後執行 delayed_execution()。這對於需要非同步延遲執行的應用場景,如計時器或背景任務,特別有用。
1 import threading
2 import time
3
4 def delayed_execution():
5 # 在這裡執行延遲後的任務
6 print("Delayed execution")
7
8 def async_sleep(seconds):
9 def sleep():
10 time.sleep(seconds)

timmy / 使用 asyncio 執行非同步 Shell 命令

0 polubień
0 forków
1 plików
Ostatnio aktywne 10 months ago
這段 Python 程式碼使用 asyncio 來 非同步執行系統指令,透過 asyncio.create_subprocess_exec() 建立子行程,並並行執行多個指令(如 ping 和 ls)。它適用於 非同步處理系統指令、提升效能、避免阻塞主執行緒,適合用於 自動化腳本、伺服器管理或批次處理 任務。
1 import asyncio
2
3 async def run_command(*args):
4 # 建立子行程
5 process = await asyncio.create_subprocess_exec(
6 *args,
7 stdout=asyncio.subprocess.PIPE,
8 stderr=asyncio.subprocess.PIPE)
9
10 # 等待子行程完成
Nowsze Starsze