Naposledy aktivní 10 months ago

這段程式碼結合了 Python 的 asyncio 和 threading,透過 loop.run_in_executor 在背景執行同步的 blocking_sleep(),避免阻塞 asyncio 事件迴圈。這允許 async_sleep() 在等待時仍能執行其他非同步任務,適用於需要在 asyncio 應用程式中處理阻塞性 I/O 操作(如 time.sleep())的情境。

async_delayed_execution_with_threading.py Raw
1import asyncio
2import threading
3import time
4
5async def delayed_execution():
6 # 在這裡執行延遲後的任務
7 print("Delayed execution")
8
9def blocking_sleep(seconds):
10 time.sleep(seconds)
11
12async def async_sleep(seconds):
13 loop = asyncio.get_running_loop()
14 await loop.run_in_executor(None, blocking_sleep, seconds)
15 await delayed_execution()
16
17async def main():
18 # 呼叫非同步的 sleep 函數
19 await async_sleep(1)
20
21 # 繼續執行其他任務
22 print("Continuing execution")
23
24# 建立事件迴圈並執行主程式
25asyncio.run(main())
26