timmy / 使用 asyncio 與 threading 進行非同步延遲執行
0 вподобань
0 форк(-ів)
1 файл(-ів)
Остання активність 10 months ago
這段程式碼結合了 Python 的 asyncio 和 threading,透過 loop.run_in_executor 在背景執行同步的 blocking_sleep(),避免阻塞 asyncio 事件迴圈。這允許 async_sleep() 在等待時仍能執行其他非同步任務,適用於需要在 asyncio 應用程式中處理阻塞性 I/O 操作(如 time.sleep())的情境。
| 1 | import asyncio |
| 2 | import threading |
| 3 | import time |
| 4 | |
| 5 | async def delayed_execution(): |
| 6 | # 在這裡執行延遲後的任務 |
| 7 | print("Delayed execution") |
| 8 | |
| 9 | def blocking_sleep(seconds): |
| 10 | time.sleep(seconds) |
timmy / 使用 Threading 進行非同步延遲執行
0 вподобань
0 форк(-ів)
1 файл(-ів)
Остання активність 10 months ago
這段程式碼使用 Python 的 threading 模組來實現非同步延遲執行。當 async_sleep(seconds) 被呼叫時,它會建立一個新的執行緒來執行 sleep() 函式,而不會阻塞主執行緒。這允許主程式繼續執行其他任務,同時在指定時間後執行 delayed_execution()。這對於需要非同步延遲執行的應用場景,如計時器或背景任務,特別有用。
| 1 | import threading |
| 2 | import time |
| 3 | |
| 4 | def delayed_execution(): |
| 5 | # 在這裡執行延遲後的任務 |
| 6 | print("Delayed execution") |
| 7 | |
| 8 | def async_sleep(seconds): |
| 9 | def sleep(): |
| 10 | time.sleep(seconds) |
timmy / 使用 asyncio 執行非同步 Shell 命令
0 вподобань
0 форк(-ів)
1 файл(-ів)
Остання активність 10 months ago
這段 Python 程式碼使用 asyncio 來 非同步執行系統指令,透過 asyncio.create_subprocess_exec() 建立子行程,並並行執行多個指令(如 ping 和 ls)。它適用於 非同步處理系統指令、提升效能、避免阻塞主執行緒,適合用於 自動化腳本、伺服器管理或批次處理 任務。
| 1 | import asyncio |
| 2 | |
| 3 | async def run_command(*args): |
| 4 | # 建立子行程 |
| 5 | process = await asyncio.create_subprocess_exec( |
| 6 | *args, |
| 7 | stdout=asyncio.subprocess.PIPE, |
| 8 | stderr=asyncio.subprocess.PIPE) |
| 9 | |
| 10 | # 等待子行程完成 |
Новіше
Пізніше