Python asyncio:非同步 I/O 與協程 (Coroutine)
在處理大量 I/O 操作時,你可能會遇到一個常見的效能瓶頸:阻塞 (Blocking)。當你的程式碼等待網絡回應、讀寫檔案或查詢資料庫時,整個應用程式都會停頓下來,無法處理其他任務。 傳統上,解決這個問題的方法是使用多線程 (Multithreading)。然而,創建和管理系統線程的成本相當高。Python 提供了一個更現代、更輕量的解決方案:asyncio
。
asyncio
是一個運用協程 (Coroutine) 在單一線程內實現並發 (Concurrency) 的函式庫。你可以將它視為一種「輕量級」的多線程,讓你用更少的資源達到相似的效果,特別是在處理 I/O 密集型 (I/O-bound) 任務時。
asyncio
就是自動切換執行非同步代碼的機制,等待時會掛起並切換到程式的其他部分,防止CPU閒置,有效利用等待時間。
這篇文章將帶你入門 asyncio
,解釋其核心概念,並透過程式碼範例展示它如何解決阻塞問題。
⛓️ 多線程 vs. asyncio:兩種並發模型
要理解 asyncio
的優勢,首先要把它與傳統的多線程作比較。
特性 | Multithreading (基於 thread) | asyncio (基於 coroutine) |
---|---|---|
管理者 | 作業系統 | Python 事件循環 (Event Loop) |
資源消耗 | 較高:每個線程都是一個獨立的執行單元,有自己的記憶體堆疊,創建和切換成本高。 | 極低:所有協程在同一個線程內運行,共享資源,切換速度極快。 |
最適用場景 | I/O 密集型任務、需要利用多核心的 CPU 密集型任務 (但受 Python GIL 限制)。 | I/O 密集型任務 (如網絡爬蟲、Web Server、資料庫連接)。 |
因為 asyncio
的協程是由 Python 自己管理,而不是作業系統,所以它的切換成本極低,讓單一線程能同時處理成千上萬個等待中的 I/O 操作。
🏁 async
與 await
:非同步編程的核心
asyncio
的魔法來自兩個關鍵字:async
和 await
。
async def
:定義協程
當你在一個函數定義前加上 async
,這個函數就變成了一個協程函數 (coroutine function)。
調用一個協程函數並不會立即執行它的程式碼,而是會返回一個協程物件 (coroutine object)。
async def my_coroutine():
print("Hello from coroutine!")
# 調用它會返回一個協程物件,而不是打印訊息
coro_obj = my_coroutine()
print(coro_obj)
# Output: <coroutine object my_coroutine at 0x...>
await
:執行並等待協程
要真正執行一個協程並等待其結果,你需要在它前面加上 await
關鍵字。await
的意思是:「暫停當前協程的執行,直到 await
後面的任務完成,在等待期間,讓事件循環去執行其他任務。」 就像告訴 python 現在要執行一個非同步的函數,這可能要等一段時間,Python 看到 await 時自然會掛起等待它完成。注意:你只能 await 一個 非同步的函數,即另一個 async 函數 (coroutine)。
# 它會等待my_coroutine執行完成,等待時會掛起並切換到其他協程 (如有)
# 以防CPU閒置,有效利用等待時間
coro_obj = await my_coroutine()
重要規則:await
只能在 async def
函數內部使用。
這種 async/await
語法讓非同步程式碼看起來像同步程式碼一樣直觀,易於閱讀和維護。它不用使用 callbacks 函數來處理非同步結果,避免了所謂的「回呼地獄 (Callback Hell)」。
🚀 程式碼實戰:從阻塞到非阻塞
讓我們透過一個簡單的例子,看看 asyncio
如何解決阻塞問題。假設我們有多個任務,每個任務都需要 1 秒鐘的「等待時間」來模擬 I/O 操作。
同步 (Synchronous) 寫法
在傳統的同步程式碼中,任務會一個接一個地執行。
import time
import datetime
def sync_task(name):
print(f"[{datetime.datetime.now().time()}] Task {name}: Starting...")
time.sleep(1) # 模擬阻塞的 I/O 操作
print(f"[{datetime.datetime.now().time()}] Task {name}: Finished.")
def main_sync():
start_time = time.time()
sync_task("A")
sync_task("B")
sync_task("C")
end_time = time.time()
print(f"\nTotal time (sync): {end_time - start_time:.2f} seconds")
# main_sync()
執行結果:
[10:30:01.123456] Task A: Starting...
[10:30:02.123456] Task A: Finished.
[10:30:02.123456] Task B: Starting...
[10:30:03.123456] Task B: Finished.
[10:30:03.123456] Task C: Starting...
[10:30:04.123456] Task C: Finished.
Total time (sync): 3.01 seconds
如你所見,總耗時是所有任務時間的總和 (1 + 1 + 1 = 3 秒)。
非同步 (Asynchronous) 寫法
現在,我們用 asyncio
來改寫它。
import asyncio
import time
import datetime
async def async_task(name):
print(f"[{datetime.datetime.now().time()}] Task {name}: Starting...")
# 使用 asyncio.sleep() 來模擬非阻塞的 I/O 操作
await asyncio.sleep(1)
print(f"[{datetime.datetime.now().time()}] Task {name}: Finished.")
async def main_async():
start_time = time.time()
# 使用 asyncio.gather 來並發執行多個協程
await asyncio.gather(
async_task("A"),
async_task("B"),
async_task("C")
)
end_time = time.time()
print(f"\nTotal time (async): {end_time - start_time:.2f} seconds")
# asyncio.run() 是啟動非同步程式的入口
# asyncio.run(main_async())
執行結果:
[10:31:01.567890] Task A: Starting...
[10:31:01.567890] Task B: Starting...
[10:31:01.567890] Task C: Starting...
[10:31:02.567890] Task A: Finished.
[10:31:02.567890] Task B: Finished.
[10:31:02.567890] Task C: Finished.
Total time (async): 1.01 seconds
看到差別了嗎?三個任務幾乎是「同時」開始。當 async_task("A")
遇到 await asyncio.sleep(1)
時,它會交出控制權,事件循環立即開始執行 async_task("B")
,然後是 C
。一秒鐘後,所有任務都完成了它們的等待,並相繼結束。總耗時約等於最長的那個任務的耗時,也就是 1 秒。
asyncio.gather()
是一個非常有用的函數,它能接收多個協程,並發地運行它們,並等待所有協程完成。
📜 async
與 sync
函數的調用規則
在使用 asyncio
時,必須遵守以下規則:
- 在
async
函數中:你可以await
其他async
函數,也可以直接調用普通的sync
函數。 - 在
sync
函數中:你不能await
一個async
函數。sync
的世界和async
的世界是隔離的。
那麼,如果想在 sync
程式碼中啟動一個 async
流程怎麼辦?答案是使用 asyncio.run()
。它是連接兩個世界的橋樑,負責啟動事件循環並運行你指定的頂層協程。
def regular_sync_function():
print(" - I am a regular sync function.")
async def my_main_coroutine():
print("Coroutine started.")
# 規則 1: 在 async 函數中調用 sync 函數 (OK)
regular_sync_function()
# 規則 1: 在 async 函數中 await 另一個 async 函數 (OK)
await asyncio.sleep(0.5)
print("Coroutine finished.")
# 規則 2: 在 sync 世界中,使用 asyncio.run() 啟動 async 世界
print("Starting from sync world...")
# asyncio.run(my_main_coroutine())
print("Back to sync world.")
⚠️ 真假非同步
一個常見的誤解是:只要將函數標記為 async def
,它就會自動變成非阻塞的。這是錯誤的。
async def
只告訴 Python 這是一個協程函數。一個協程只有在內部使用 await
並等待一個真正的非同步操作時,才會將控制權交還給事件循環,實現非阻塞。 如果在 async
函數中只調用了阻塞的同步函數,它仍然會阻塞整個事件循環。
錯誤的例子(假的非同步):
import time
import asyncio
async def fake_async_task(name):
print(f"Task {name}: Starting...")
# 錯誤!time.sleep() 是阻塞的,它會凍結整個程式
time.sleep(1)
print(f"Task {name}: Finished.")
async def main_fake():
await asyncio.gather(fake_async_task("A"), fake_async_task("B"))
# 總耗時將會是 2 秒,因為 time.sleep() 阻塞了事件循環
# asyncio.run(main_fake())
在這個例子中,即使 fake_async_task
是 async
函數,time.sleep(1)
也會讓整個單線程程式暫停 1 秒,導致任務無法並發執行。 常見的例子是,在 async 函數中調用一般同步函數進行下載,這時可考慮使用外部的套件如 aiohttp
。
正確的例子(真正的非同步): 要實現真正的非同步,必須使用 asyncio
提供的非同步版本,例如 await asyncio.sleep()
,或者使用支援 asyncio
的函式庫進行 I/O 操作。
async def real_async_task(name):
print(f"Task {name}: Starting...")
# 正確!await asyncio.sleep() 會交出控制權
await asyncio.sleep(1)
print(f"Task {name}: Finished.")
async def main_real():
await asyncio.gather(real_async_task("A"), real_async_task("B"))
# 總耗時將會是 1 秒,任務成功並發
# asyncio.run(main_real())
🔐 處理並發問題:asyncio.Lock
雖然 asyncio
在單一線程中運行,避免了許多傳統多線程的複雜性,但並發問題(如競態條件)依然存在。
這個問題源於 await
關鍵字。當一個協程在 await
處暫停時,事件循環會運行其他協程。如果在一個協程讀取共享數據和寫回數據之間發生了切換,就可能導致數據不一致。
競態條件的例子: 想像一下多個協程同時增加一個共享計數器。
import asyncio
shared_counter = 0
async def increment():
global shared_counter
# 1. 讀取當前值
current_value = shared_counter
# 在讀取後、寫入前,模擬一個 I/O 等待,讓其他協程有機會運行
await asyncio.sleep(0.01)
# 2. 寫回新值
shared_counter = current_value + 1
async def main_race_condition():
tasks = [increment() for _ in range(100)]
await asyncio.gather(*tasks)
print(f"Final counter value: {shared_counter}")
# 理想結果是 100,但實際結果通常遠小於 100
# asyncio.run(main_race_condition())
你會發現,最終結果並不是 100。這是因為多個協程幾乎同時讀取了相同的 current_value
(例如 0),然後它們都將計數器設置為 1,導致多次增加操作只被計算了一次。
解決方案:asyncio.Lock
為了保護共享資源,我們可以使用 asyncio.Lock
。
import asyncio
shared_counter = 0
lock = asyncio.Lock()
async def safe_increment():
global shared_counter
async with lock:
# ---- 進入臨界區 (Critical Section) ----
current_value = shared_counter
await asyncio.sleep(0.01)
shared_counter = current_value + 1
# ---- 離開臨界區 ----
async def main_safe():
tasks = [safe_increment() for _ in range(100)]
await asyncio.gather(*tasks)
print(f"Final counter value (safe): {shared_counter}")
# 這次結果將會是正確的 100
# asyncio.run(main_safe())
async with lock:
確保了在任何時候只有一個協程可以執行 with
區塊內的程式碼。當一個協程進入該區塊時,其他試圖獲取鎖的協程將會 await
,直到鎖被釋放。
✅ 總結
asyncio
為 Python 的 I/O 密集型應用帶來了革命性的效能提升。透過協程和 async/await
語法,你可以用單一線程寫出高並發、易於維護的非阻塞程式碼。
記住以下重點:
asyncio
是輕量級的:它基於協程,由 Python 管理,比系統線程開銷小得多。- 解決阻塞問題:當遇到 I/O 等待時,
asyncio
會自動切換到其他可執行的協程,充分利用 CPU 時間。 - 語法簡潔:
async/await
讓非同步程式碼的邏輯像同步程式碼一樣清晰。 - 警惕假非同步:
async
函數內的阻塞調用 (time.sleep
) 依然會阻塞事件循環,必須await
真正的非同步操作。 - 處理並發:即使是單線程,也要使用
asyncio.Lock
等工具來防止對共享資源的競態條件。 - 適用場景:非常適合網絡爬蟲、API 服務器、實時通訊等需要處理大量並發連接的應用。
掌握 asyncio
是成為一名現代 Python 開發者的重要一步。希望這篇文章能為你的非同步編程之旅提供一個好的開始!