Python Multithreading
多線程
Multithreading (多線程) 是 Python 中一種實現並行 (concurrency) 的方式,允許程式在等待某些操作(尤其是 I/O 操作)完成時,能夠執行其他任務,從而提高應用程式的響應性和效率。
🤔 什麼是 Threading ? (執行序列)
一個 Thread (線程) 是個獨立的 執行序列 (execution sequence)。它在一個 Process (進程) 內部循序執行指令。每個 Process 至少會有一個 Thread,通常稱為 主線程 (main thread)。當程式啟動時,主線程便開始執行。
一個 Process 可包含多個 Threads。這些 Threads 共享該 Process 的記憶體空間 (memory space) 和資源 (例如程式碼、全域變數),但每個 Thread 擁有自己獨立的執行堆疊 (call stack)。
- Single-threading (單線程): 應用程式只有一個執行序列 (主線程)。任務必須一個接一個完成。如果一個任務需要長時間等待(例如,等待網絡回應),整個應用程式都會被卡住。
- Multithreading (多線程): 應用程式可以有多個執行序列。主線程可以創建額外的 Threads (工作線程)。當一個 Thread 因為等待 I/O 操作(如讀取文件、網絡請求)而阻塞時,作業系統可以切換到另一個 Thread,讓它繼續執行。這使得應用程式看起來像是在同時做多件事情。
當主線程需要執行一個可能耗時的操作(例如網絡請求)但又不希望阻塞自身時,它可以創建一個新的 Worker Thread 來處理該操作。主線程隨後可以繼續執行其他任務,而 Worker Thread 則獨立執行其被分配的任務。
下圖示意了主線程如何創建一個 Worker Thread,並且兩者可以並行執行各自的任務序列:
從圖中可見,主線程在啟動 Worker Thread 後,並不需要等待 Worker Thread 完成即可繼續執行自己的任務。Worker Thread 則在背景執行分配給它的任務。
🚧 為何需要 Multithreading? (解決阻塞問題)
Multithreading 的主要用來防止 Blocking - 阻塞 問題。當程式執行一個阻塞操作時,例如從網絡下載一個大文件,或者等待用戶輸入,如果採用單線程模型,整個應用程式都會停止響應,直到該操作完成。
範例:同時處理多個任務
想像一下,您需要在晚上睡覺時下載一部電影,同時還需要設定一個鬧鐘在早上叫醒您。
- 下載電影:這是一個典型的 I/O-bound 任務,大部分時間花在等待網絡數據。
- *上床睡覺:這是一個睡覺任務。
如果沒有 Multithreading,您可能需要先等待電影下載完成才能上床睡覺。但這顯然不切實際,透過 Multithreading,您可以:
- 啟動一個 Thread 負責下載電影。
- 啟動另一個 Thread 負責計做其他事情 (如睡覺任務)。
這兩個 Threads 可以「同時」運行。當下載 Thread 等待網絡數據包時,另一個 Thread 仍然可以做任何事情。系統會在這些 Threads 之間切換,給人一種同時運行的感覺。
GIL
Python 的標準直譯器中有一個 Global Interpreter Lock (GIL) 的機制。GIL 確保在任何時刻只有一個 Python Thread 能夠持有 Python 直譯器的控制權並執行 Python bytecode。這意味著,即使使用多執行緒 (Multithreading),Python 也無法實現真正意義上的平行運算 (parallelism)。
🛠️ 如何建立 Thread? (基本範例)
Python 的 threading
module 提供了建立和管理 Threads 的基本工具。
以下範例模擬同時下載兩個文件:
import threading
import time
def download_file(filename, delay):
print(f"開始下載 {filename}...")
time.sleep(delay) # 模擬下載耗時
print(f"{filename} 下載完成。")
def go_to_bed():
print("I go to bed now.")
if __name__ == "__main__":
print("主程式開始。")
# 建立 Thread 物件
# target 指定 Thread 要執行的函數
# args 是傳遞給目標函數的參數,必須是 tuple
download_thread = threading.Thread(target=download_file, args=("movie.mp4", 5))
# 啟動 Thread
download_thread.start()
go_to_bed()
# 等待所有 Thread 完成
# 如果沒有 .join(),主線程可能會在子線程完成前就結束
download_thread.join() # 等待 download_thread 完成
print("所有任務完成,主程式結束。")
在這個例子中:
download_file
函數模擬一個耗時的下載操作。- 我們為下載任務建立了一個
threading.Thread
物件。 thread.start()
方法會啟動 Thread 的執行。thread.join()
方法會阻塞主線程,直到被呼叫的 Thread 執行完畢。這確保了主程式會在下載任務都完成後才結束。
⚙️ Multithreading 帶來的挑戰與同步工具
當多個 Threads 共享數據時,如果沒有適當的協調,就可能出現問題。threading
module 提供了一些同步原語 (synchronization primitives) 來幫助管理這些情況。
Race Conditions (競爭條件)
當兩個或多個 Threads 同時存取共享資源,並且至少有一個 Thread 修改該資源時,最終結果取決於 Threads執行的相對順序,這就可能導致 Race Condition (競爭條件),產生不可預期的錯誤。
Locks (threading.Lock
)
Lock (鎖) 是最基本的同步原語。一個 Lock 有兩種狀態:「locked」和「unlocked」。它包含兩個基本方法:acquire()
和 release()
。
- 當狀態是「unlocked」時,呼叫
acquire()
會將狀態變為「locked」並立即返回。 - 當狀態是「locked」時,呼叫
acquire()
會阻塞,直到另一個 Thread 呼叫release()
將 Lock 狀態變回「unlocked」。
import threading
import time
shared_resource_counter = 0
lock = threading.Lock() # 建立一個 Lock 物件
def worker_with_lock():
global shared_resource_counter
for _ in range(100000):
lock.acquire() # 獲取鎖
try:
# 這是 critical section (臨界區段)
current_value = shared_resource_counter
time.sleep(0.000001) # 模擬一些處理時間,增加競爭機會
shared_resource_counter = current_value + 1
finally:
lock.release() # 釋放鎖,確保即使發生錯誤也會釋放
def worker_without_lock():
global shared_resource_counter
for _ in range(100000):
current_value = shared_resource_counter
time.sleep(0.000001) # 模擬一些處理時間,增加競爭機會
shared_resource_counter = current_value + 1
# 測試未使用 Lock 的情況 (可能出現 Race Condition)
# shared_resource_counter = 0 # 重設計數器
# threads_no_lock = [threading.Thread(target=worker_without_lock) for _ in range(5)]
# for t in threads_no_lock: t.start()
# for t in threads_no_lock: t.join()
# print(f"未使用 Lock,最終計數: {shared_resource_counter}") # 結果很可能小於 500000
# 測試使用 Lock 的情況
shared_resource_counter = 0 # 重設計數器
threads_with_lock = [threading.Thread(target=worker_with_lock) for _ in range(5)]
for t in threads_with_lock: t.start()
for t in threads_with_lock: t.join()
print(f"使用 Lock,最終計數: {shared_resource_counter}") # 結果應為 500000
使用 try...finally
結構確保 lock.release()
總是被執行,即使在 try
區塊中發生異常。 更簡潔的方式是使用 with
陳述句 (context manager protocol):
# lock = threading.Lock()
# def worker_with_lock_context_manager():
# global shared_resource_counter
# for _ in range(100000):
# with lock: # 自動 acquire() 和 release()
# current_value = shared_resource_counter
# time.sleep(0.000001)
# shared_resource_counter = current_value + 1
Deadlocks (死鎖)
Deadlock (死鎖) 發生在兩個或多個 Threads 無限期地等待一個永遠不會被釋放的資源。例如,Thread A 持有資源 X 並等待資源 Y,而 Thread B 持有資源 Y 並等待資源 X。 避免死鎖的策略包括:
- 鎖排序 (Lock ordering):所有 Threads 都以相同的順序獲取多個鎖。
- 鎖超時 (Lock timeout):在嘗試獲取鎖時設定超時時間。
- 避免嵌套鎖 (nested locks):盡量減少一個鎖內部還需要獲取其他鎖的情況。
其他同步工具
threading
module 還提供了其他有用的同步工具:
- RLock (Reentrant Lock): 可重入鎖。允許同一個 Thread 多次
acquire()
同一個 RLock 而不會造成死鎖。該 Thread 必須release()
相同次數才能真正釋放該鎖給其他 Threads。適用於遞歸函數中需要鎖保護的情況。 - Semaphore: 信號標。允許一定數量的 Threads 同時存取一個資源池。它維護一個計數器,
acquire()
時遞減,release()
時遞增。當計數器為零時,acquire()
會阻塞。例如,限制同時下載的連接數。 - Event: 事件。一種簡單的同步機制,一個 Thread 發送信號通知事件發生 (
event.set()
),其他 Threads 可以等待該事件 (event.wait()
)。事件被設定後,所有等待的 Threads 都會被喚醒。event.clear()
可以重設事件狀態。 - Condition: 條件變數。比 Event 更複雜,允許 Threads 等待直到某個條件為真,或者被其他 Thread 通知。通常與 Lock 關聯使用,一個 Thread 獲取 Lock 後,如果條件不滿足,可以呼叫
condition.wait()
釋放 Lock 並等待;其他 Thread 修改條件後,可以呼叫condition.notify()
或condition.notify_all()
來喚醒等待的 Thread(s)。 - Barrier: 屏障。允許固定數量的 Threads 互相等待,直到所有 Threads 都到達屏障點 (呼叫
barrier.wait()
),然後它們才能同時繼續執行。
Timeout (超時) 許多同步原語的阻塞方法 (如 lock.acquire()
, event.wait()
, condition.wait()
, thread.join()
) 都接受一個可選的 timeout
參數(以秒為單位)。如果操作在指定的超時時間內未能完成,它將停止阻塞並通常返回一個指示超時的值 (例如 False
for lock.acquire(timeout=...)
) 或引發異常。使用 timeout
可以防止程式因等待資源而無限期掛起。
lock = threading.Lock()
# ...
if lock.acquire(timeout=1.0): # 嘗試獲取鎖,最多等待1秒
try:
# 操作共享資源
print("成功獲取鎖")
finally:
lock.release()
else:
print("獲取鎖超時!")
🧩 Thread Pools (concurrent.futures.ThreadPoolExecutor
)
手動建立和管理大量 Threads 可能會變得很繁瑣且容易出錯。Python 的 concurrent.futures
module 提供了 ThreadPoolExecutor
,這是一個更高級別的抽象,用於管理一個 Thread Pool (線程池)。
線程池會預先建立一定數量的 worker threads,並將提交的任務分配給這些 threads 執行。這樣可以重用 threads,避免了頻繁建立和銷毀 threads 的開銷,並且可以限制並發執行的 thread 數量。
import concurrent.futures
import threading # 僅用於獲取 thread name
import time
def task_with_argument(name, duration):
current_thread_name = threading.current_thread().name
print(f"任務 {name} 開始執行,執行者:{current_thread_name}")
time.sleep(duration)
result = f"任務 {name} 完成"
print(result)
return result
if __name__ == "__main__":
# 建立一個最多包含 3 個 worker threads 的 ThreadPoolExecutor
# 如果不指定 max_workers,它通常會根據系統核心數來決定 (Python 3.8+ 預設 min(32, os.cpu_count() + 4))
with concurrent.futures.ThreadPoolExecutor(max_workers=3, thread_name_prefix='MyPoolWorker') as executor:
print("提交任務到 ThreadPoolExecutor...")
# submit 方法會將函數和其參數提交到線程池,並返回一個 Future 物件
# Future 物件代表一個異步操作的最終結果
future1 = executor.submit(task_with_argument, "A", 2)
future2 = executor.submit(task_with_argument, "B", 4)
future3 = executor.submit(task_with_argument, "C", 1)
future4 = executor.submit(task_with_argument, "D", 3) # 此任務會等待,直到池中有空閒 thread
# 可以使用 as_completed 來獲取已完成的 future
futures = [future1, future2, future3, future4]
for future in concurrent.futures.as_completed(futures):
try:
print(f"從 as_completed 取得結果: {future.result()}")
except Exception as exc:
print(f"任務產生異常: {exc}")
# 或者單獨等待每個 future (順序可能不同於 as_completed)
# print(f"任務 A 的結果: {future1.result()}")
# print(f"任務 B 的結果: {future2.result()}")
# print(f"任務 C 的結果: {future3.result()}")
# print(f"任務 D 的結果: {future4.result()}")
print("所有線程池任務完成。")
使用 with
陳述式可以確保 ThreadPoolExecutor
在使用完畢後會自動關閉 (呼叫 executor.shutdown(wait=True)
),等待所有已提交的任務完成。
📝 總結
Python Multithreading 是防止阻塞的技術,適用於改善 I/O 密集型應用程式的響應性。儘管 GIL 限制了 CPython 在 CPU 密集型任務上利用多核心實現真正平行運算的能力,但對於涉及大量等待的操作,Multithreading 依然能帶來顯著的性能提升。
理解並正確使用同步原語 (如 Locks, Semaphores, Events) 對於避免 Race Conditions 和 Deadlocks 至關重要。而 ThreadPoolExecutor
則提供了一種更方便、更高效的方式來管理和使用 Threads。