timmy / 使用 asyncio 與 threading 進行非同步延遲執行
0 вподобань
0 форк(-ів)
1 файл(-ів)
Остання активність 10 months ago
這段程式碼結合了 Python 的 asyncio 和 threading,透過 loop.run_in_executor 在背景執行同步的 blocking_sleep(),避免阻塞 asyncio 事件迴圈。這允許 async_sleep() 在等待時仍能執行其他非同步任務,適用於需要在 asyncio 應用程式中處理阻塞性 I/O 操作(如 time.sleep())的情境。
| 1 | import asyncio |
| 2 | import threading |
| 3 | import time |
| 4 | |
| 5 | async def delayed_execution(): |
| 6 | # 在這裡執行延遲後的任務 |
| 7 | print("Delayed execution") |
| 8 | |
| 9 | def blocking_sleep(seconds): |
| 10 | time.sleep(seconds) |
timmy / Telegram Bot 通知系統
0 вподобань
0 форк(-ів)
2 файл(-ів)
Остання активність 10 months ago
這段程式碼提供了一個 TelegramNotifier 類別,透過 Telegram Bot API 發送訊息到指定的聊天 ID (chat_id),並透過 loguru 記錄發送狀態。程式會從環境變數載入 Bot Token 和 Chat ID,並在啟動時發送通知,適用於自動化系統監控或事件提醒。
| 1 | TELEGRAM_BOT_TOKEN=你的_bot_token |
| 2 | TELEGRAM_CHAT_ID=你的_chat_id |
timmy / 使用 asyncio 執行非同步 Shell 命令
0 вподобань
0 форк(-ів)
1 файл(-ів)
Остання активність 10 months ago
這段 Python 程式碼使用 asyncio 來 非同步執行系統指令,透過 asyncio.create_subprocess_exec() 建立子行程,並並行執行多個指令(如 ping 和 ls)。它適用於 非同步處理系統指令、提升效能、避免阻塞主執行緒,適合用於 自動化腳本、伺服器管理或批次處理 任務。
| 1 | import asyncio |
| 2 | |
| 3 | async def run_command(*args): |
| 4 | # 建立子行程 |
| 5 | process = await asyncio.create_subprocess_exec( |
| 6 | *args, |
| 7 | stdout=asyncio.subprocess.PIPE, |
| 8 | stderr=asyncio.subprocess.PIPE) |
| 9 | |
| 10 | # 等待子行程完成 |
Новіше
Пізніше