Skip to content

Python asyncio:非同步 I/O 與協程 (Coroutine)

在處理大量 I/O 操作時,你可能會遇到一個常見的效能瓶頸:阻塞 (Blocking)。當你的程式碼等待網絡回應、讀寫檔案或查詢資料庫時,整個應用程式都會停頓下來,無法處理其他任務。 傳統上,解決這個問題的方法是使用多線程 (Multithreading)。然而,創建和管理系統線程的成本相當高。Python 提供了一個更現代、更輕量的解決方案:asyncio

asyncio 是一個運用協程 (Coroutine) 在單一線程內實現並發 (Concurrency) 的函式庫。你可以將它視為一種「輕量級」的多線程,讓你用更少的資源達到相似的效果,特別是在處理 I/O 密集型 (I/O-bound) 任務時。

asyncio 就是自動切換執行非同步代碼的機制,等待時會掛起並切換到程式的其他部分,防止CPU閒置,有效利用等待時間。

這篇文章將帶你入門 asyncio,解釋其核心概念,並透過程式碼範例展示它如何解決阻塞問題。

⛓️ 多線程 vs. asyncio:兩種並發模型

要理解 asyncio 的優勢,首先要把它與傳統的多線程作比較。

特性Multithreading (基於 thread)asyncio (基於 coroutine)
管理者作業系統Python 事件循環 (Event Loop)
資源消耗較高:每個線程都是一個獨立的執行單元,有自己的記憶體堆疊,創建和切換成本高。極低:所有協程在同一個線程內運行,共享資源,切換速度極快。
最適用場景I/O 密集型任務、需要利用多核心的 CPU 密集型任務 (但受 Python GIL 限制)。I/O 密集型任務 (如網絡爬蟲、Web Server、資料庫連接)。

因為 asyncio 的協程是由 Python 自己管理,而不是作業系統,所以它的切換成本極低,讓單一線程能同時處理成千上萬個等待中的 I/O 操作。

🏁 asyncawait:非同步編程的核心

asyncio 的魔法來自兩個關鍵字:asyncawait

async def:定義協程

當你在一個函數定義前加上 async,這個函數就變成了一個協程函數 (coroutine function)

調用一個協程函數並不會立即執行它的程式碼,而是會返回一個協程物件 (coroutine object)

python
async def my_coroutine():
    print("Hello from coroutine!")

# 調用它會返回一個協程物件,而不是打印訊息
coro_obj = my_coroutine()
print(coro_obj)
# Output: <coroutine object my_coroutine at 0x...>

await:執行並等待協程

要真正執行一個協程並等待其結果,你需要在它前面加上 await 關鍵字。await 的意思是:「暫停當前協程的執行,直到 await 後面的任務完成,在等待期間,讓事件循環去執行其他任務。」 就像告訴 python 現在要執行一個非同步的函數,這可能要等一段時間,Python 看到 await 時自然會掛起等待它完成。注意:你只能 await 一個 非同步的函數,即另一個 async 函數 (coroutine)。

python
# 它會等待my_coroutine執行完成,等待時會掛起並切換到其他協程 (如有)
# 以防CPU閒置,有效利用等待時間
coro_obj = await my_coroutine()

重要規則await 只能在 async def 函數內部使用。

這種 async/await 語法讓非同步程式碼看起來像同步程式碼一樣直觀,易於閱讀和維護。它不用使用 callbacks 函數來處理非同步結果,避免了所謂的「回呼地獄 (Callback Hell)」。

🚀 程式碼實戰:從阻塞到非阻塞

讓我們透過一個簡單的例子,看看 asyncio 如何解決阻塞問題。假設我們有多個任務,每個任務都需要 1 秒鐘的「等待時間」來模擬 I/O 操作。

同步 (Synchronous) 寫法

在傳統的同步程式碼中,任務會一個接一個地執行。

python
import time
import datetime

def sync_task(name):
    print(f"[{datetime.datetime.now().time()}] Task {name}: Starting...")
    time.sleep(1)  # 模擬阻塞的 I/O 操作
    print(f"[{datetime.datetime.now().time()}] Task {name}: Finished.")

def main_sync():
    start_time = time.time()
    sync_task("A")
    sync_task("B")
    sync_task("C")
    end_time = time.time()
    print(f"\nTotal time (sync): {end_time - start_time:.2f} seconds")

# main_sync()

執行結果:

[10:30:01.123456] Task A: Starting...
[10:30:02.123456] Task A: Finished.
[10:30:02.123456] Task B: Starting...
[10:30:03.123456] Task B: Finished.
[10:30:03.123456] Task C: Starting...
[10:30:04.123456] Task C: Finished.

Total time (sync): 3.01 seconds

如你所見,總耗時是所有任務時間的總和 (1 + 1 + 1 = 3 秒)。

非同步 (Asynchronous) 寫法

現在,我們用 asyncio 來改寫它。

python
import asyncio
import time
import datetime

async def async_task(name):
    print(f"[{datetime.datetime.now().time()}] Task {name}: Starting...")
    # 使用 asyncio.sleep() 來模擬非阻塞的 I/O 操作
    await asyncio.sleep(1)
    print(f"[{datetime.datetime.now().time()}] Task {name}: Finished.")

async def main_async():
    start_time = time.time()
    # 使用 asyncio.gather 來並發執行多個協程
    await asyncio.gather(
        async_task("A"),
        async_task("B"),
        async_task("C")
    )
    end_time = time.time()
    print(f"\nTotal time (async): {end_time - start_time:.2f} seconds")

# asyncio.run() 是啟動非同步程式的入口
# asyncio.run(main_async())

執行結果:

[10:31:01.567890] Task A: Starting...
[10:31:01.567890] Task B: Starting...
[10:31:01.567890] Task C: Starting...
[10:31:02.567890] Task A: Finished.
[10:31:02.567890] Task B: Finished.
[10:31:02.567890] Task C: Finished.

Total time (async): 1.01 seconds

看到差別了嗎?三個任務幾乎是「同時」開始。當 async_task("A") 遇到 await asyncio.sleep(1) 時,它會交出控制權,事件循環立即開始執行 async_task("B"),然後是 C。一秒鐘後,所有任務都完成了它們的等待,並相繼結束。總耗時約等於最長的那個任務的耗時,也就是 1 秒。

asyncio.gather() 是一個非常有用的函數,它能接收多個協程,並發地運行它們,並等待所有協程完成。

📜 asyncsync 函數的調用規則

在使用 asyncio 時,必須遵守以下規則:

  1. async 函數中:你可以 await 其他 async 函數,也可以直接調用普通的 sync 函數。
  2. sync 函數中:你不能 await 一個 async 函數。sync 的世界和 async 的世界是隔離的。

那麼,如果想在 sync 程式碼中啟動一個 async 流程怎麼辦?答案是使用 asyncio.run()。它是連接兩個世界的橋樑,負責啟動事件循環並運行你指定的頂層協程。

python
def regular_sync_function():
    print("  - I am a regular sync function.")

async def my_main_coroutine():
    print("Coroutine started.")
    
    # 規則 1: 在 async 函數中調用 sync 函數 (OK)
    regular_sync_function()
    
    # 規則 1: 在 async 函數中 await 另一個 async 函數 (OK)
    await asyncio.sleep(0.5)
    
    print("Coroutine finished.")

# 規則 2: 在 sync 世界中,使用 asyncio.run() 啟動 async 世界
print("Starting from sync world...")
# asyncio.run(my_main_coroutine())
print("Back to sync world.")

⚠️ 真假非同步

一個常見的誤解是:只要將函數標記為 async def,它就會自動變成非阻塞的。這是錯誤的

async def 只告訴 Python 這是一個協程函數。一個協程只有在內部使用 await 並等待一個真正的非同步操作時,才會將控制權交還給事件循環,實現非阻塞。 如果在 async 函數中只調用了阻塞的同步函數,它仍然會阻塞整個事件循環。

錯誤的例子(假的非同步):

python
import time
import asyncio

async def fake_async_task(name):
    print(f"Task {name}: Starting...")
    # 錯誤!time.sleep() 是阻塞的,它會凍結整個程式
    time.sleep(1) 
    print(f"Task {name}: Finished.")

async def main_fake():
    await asyncio.gather(fake_async_task("A"), fake_async_task("B"))

# 總耗時將會是 2 秒,因為 time.sleep() 阻塞了事件循環
# asyncio.run(main_fake())

在這個例子中,即使 fake_async_taskasync 函數,time.sleep(1) 也會讓整個單線程程式暫停 1 秒,導致任務無法並發執行。 常見的例子是,在 async 函數中調用一般同步函數進行下載,這時可考慮使用外部的套件如 aiohttp

正確的例子(真正的非同步): 要實現真正的非同步,必須使用 asyncio 提供的非同步版本,例如 await asyncio.sleep(),或者使用支援 asyncio 的函式庫進行 I/O 操作。

python
async def real_async_task(name):
    print(f"Task {name}: Starting...")
    # 正確!await asyncio.sleep() 會交出控制權
    await asyncio.sleep(1)
    print(f"Task {name}: Finished.")

async def main_real():
    await asyncio.gather(real_async_task("A"), real_async_task("B"))

# 總耗時將會是 1 秒,任務成功並發
# asyncio.run(main_real())

🔐 處理並發問題:asyncio.Lock

雖然 asyncio 在單一線程中運行,避免了許多傳統多線程的複雜性,但並發問題(如競態條件)依然存在

這個問題源於 await 關鍵字。當一個協程在 await 處暫停時,事件循環會運行其他協程。如果在一個協程讀取共享數據和寫回數據之間發生了切換,就可能導致數據不一致。

競態條件的例子: 想像一下多個協程同時增加一個共享計數器。

python
import asyncio

shared_counter = 0

async def increment():
    global shared_counter
    # 1. 讀取當前值
    current_value = shared_counter
    # 在讀取後、寫入前,模擬一個 I/O 等待,讓其他協程有機會運行
    await asyncio.sleep(0.01)
    # 2. 寫回新值
    shared_counter = current_value + 1

async def main_race_condition():
    tasks = [increment() for _ in range(100)]
    await asyncio.gather(*tasks)
    print(f"Final counter value: {shared_counter}")

# 理想結果是 100,但實際結果通常遠小於 100
# asyncio.run(main_race_condition())

你會發現,最終結果並不是 100。這是因為多個協程幾乎同時讀取了相同的 current_value(例如 0),然後它們都將計數器設置為 1,導致多次增加操作只被計算了一次。

解決方案:asyncio.Lock 為了保護共享資源,我們可以使用 asyncio.Lock

python
import asyncio

shared_counter = 0
lock = asyncio.Lock()

async def safe_increment():
    global shared_counter
    async with lock:
        # ---- 進入臨界區 (Critical Section) ----
        current_value = shared_counter
        await asyncio.sleep(0.01)
        shared_counter = current_value + 1
        # ---- 離開臨界區 ----

async def main_safe():
    tasks = [safe_increment() for _ in range(100)]
    await asyncio.gather(*tasks)
    print(f"Final counter value (safe): {shared_counter}")

# 這次結果將會是正確的 100
# asyncio.run(main_safe())

async with lock: 確保了在任何時候只有一個協程可以執行 with 區塊內的程式碼。當一個協程進入該區塊時,其他試圖獲取鎖的協程將會 await,直到鎖被釋放。

✅ 總結

asyncio 為 Python 的 I/O 密集型應用帶來了革命性的效能提升。透過協程和 async/await 語法,你可以用單一線程寫出高並發、易於維護的非阻塞程式碼。

記住以下重點:

  • asyncio 是輕量級的:它基於協程,由 Python 管理,比系統線程開銷小得多。
  • 解決阻塞問題:當遇到 I/O 等待時,asyncio 會自動切換到其他可執行的協程,充分利用 CPU 時間。
  • 語法簡潔async/await 讓非同步程式碼的邏輯像同步程式碼一樣清晰。
  • 警惕假非同步async 函數內的阻塞調用 (time.sleep) 依然會阻塞事件循環,必須 await 真正的非同步操作。
  • 處理並發:即使是單線程,也要使用 asyncio.Lock 等工具來防止對共享資源的競態條件。
  • 適用場景:非常適合網絡爬蟲、API 服務器、實時通訊等需要處理大量並發連接的應用。

掌握 asyncio 是成為一名現代 Python 開發者的重要一步。希望這篇文章能為你的非同步編程之旅提供一個好的開始!


📚 參考資料 (References)

KF Software House