Python Multiprocessing
多進程
在 Python 編程中,提升程式執行效率是一個常見的課題,特別是當面對計算密集型 (CPU-bound) 任務時。雖然 threading
(多線程) 模組廣為人知,但由於全域直譯器鎖 (Global Interpreter Lock, GIL) 的存在,它在 CPython 中無法實現真正的平行計算。
這時,multiprocessing
(多程序) 模組就成為了突破 GIL 限制、充分利用多核心 CPU 的關鍵。
如果你已經熟悉 multithreading
,你會發現 multiprocessing
的 API 設計驚人地相似,讓你幾乎可以無痛上手。然而,它們底層的運作邏輯卻截然不同。這篇文章將詳細探討 multiprocessing
的核心概念,比較它與 multithreading
的異同,並指導你何時該使用哪種技術。
🤔 Multithreading vs. Multiprocessing:基本概念回顧
在深入 multiprocessing
之前,讓我們先快速回顧一下兩者的基本概念。
Multithreading (多線程)
- 在單一進程 (Process) 中執行多個線程 (Thread)。
- 所有線程共享同一個記憶體空間。
- 受 GIL 限制,在任何時間點,只有一個線程能執行 Python bytecode。
- 適用場景:I/O 密集型 (I/O-bound) 任務,例如網絡請求、檔案讀寫。當一個線程等待 I/O 時,GIL 會被釋放,讓其他線程可以執行。
Multiprocessing (多程序)
- 啟動多個獨立的 Python 進程 (Process)。
- 每個進程都擁有獨立的記憶體空間和自己的 Python 直譯器。
- 每個進程都有自己的 GIL,因此可以在不同的 CPU 核心上平行 (Parallel) 執行。
- 適用場景:CPU 密集型 (CPU-bound) 任務,例如複雜數學運算、數據分析、影像處理。
簡單來說,multithreading
是「併發」(Concurrency),multiprocessing
則是「平行」(Parallelism)。
🛠️ multiprocessing
API:熟悉的陌生人
multiprocessing
的 API 被設計成與 threading
非常相似。如果你懂得如何建立一個 Thread
,你就懂得如何建立一個 Process
。
看看以下範例,它們的結構幾乎一模一樣:
import threading
import multiprocessing
import time
def task(name):
print(f"任務 '{name}' 開始執行...")
time.sleep(1)
print(f"任務 '{name}' 執行完畢。")
# --- Multithreading 範例 ---
print("--- 開始執行多線程 ---")
t1 = threading.Thread(target=task, args=("線程 A",))
t2 = threading.Thread(target=task, args=("線程 B",))
t1.start()
t2.start()
t1.join()
t2.join()
print("--- 多線程執行完畢 ---\n")
# --- Multiprocessing 範例 ---
print("--- 開始執行多程序 ---")
# 為了跨平台兼容性,建議將 multiprocessing 的啟動代碼放在 if __name__ == '__main__': 內
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task, args=("程序 C",))
p2 = multiprocessing.Process(target=task, args=("程序 D",))
p1.start()
p2.start()
p1.join()
p2.join()
print("--- 多程序執行完畢 ---")
你會發現,除了 threading.Thread
變成 multiprocessing.Process
,其餘的 target
、args
、start()
和 join()
方法用法完全相同。這種設計大大降低了學習成本,但請謹記,選擇哪一個應該基於你的任務類型,而非 API 的便利性。
注意:在 Windows 和 macOS 上,
multiprocessing
預設使用 'spawn' 模式來建立新程序,這意味著子程序不會繼承父程序的資源。因此,主程式碼必須放在if __name__ == '__main__':
區塊內,以避免在子程序中重複執行初始化代碼,導致無限遞歸的錯誤。
🚀 繞過 GIL,實現真正的平行運算
GIL 的存在是為了保護 Python 的記憶體管理,確保其線程安全,但也限制了多線程在多核心 CPU 上的表現。無論你有多少個核心,CPython 的多線程程式在同一時間只能利用一個核心來執行 Python bytecode。
multiprocessing
透過建立獨立的程序來完美繞過這個限制。每個程序都是一個獨立的 Python 直譯器實例,擁有自己的記憶體和 GIL。作業系統可以將這些獨立的程序分派到不同的 CPU 核心上執行,從而實現真正的平行處理。
讓以下 CPU 密集型任務的例子來證明這一點:
import time
import threading
import multiprocessing
# 一個簡單的 CPU 密集型任務
def cpu_bound_task(n):
count = 0
for i in range(n):
count += i
# 任務參數
NUM_CALCULATIONS = 100_000_000
def run_sequentially():
start_time = time.time()
cpu_bound_task(NUM_CALCULATIONS)
cpu_bound_task(NUM_CALCULATIONS)
end_time = time.time()
print(f"循序執行時間: {end_time - start_time:.4f} 秒")
def run_with_threads():
t1 = threading.Thread(target=cpu_bound_task, args=(NUM_CALCULATIONS,))
t2 = threading.Thread(target=cpu_bound_task, args=(NUM_CALCULATIONS,))
start_time = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end_time = time.time()
print(f"多線程執行時間: {end_time - start_time:.4f} 秒")
def run_with_processes():
p1 = multiprocessing.Process(target=cpu_bound_task, args=(NUM_CALCULATIONS,))
p2 = multiprocessing.Process(target=cpu_bound_task, args=(NUM_CALCULATIONS,))
start_time = time.time()
p1.start()
p2.start()
p1.join()
p2.join()
end_time = time.time()
print(f"多程序執行時間: {end_time - start_time:.4f} 秒")
if __name__ == '__main__':
run_sequentially()
run_with_threads()
run_with_processes()
在一部多核心電腦上執行這段代碼,你會觀察到類似以下的結果:
循序執行時間: 6.8451 秒
多線程執行時間: 6.9532 秒
多程序執行時間: 3.5123 秒
結果非常明顯:
- 多線程 的執行時間與循序執行相約,甚至可能因為線程切換的開銷而稍慢。這是因為 GIL 限制了它們只能輪流使用一個 CPU 核心。
- 多程序 的執行時間大約是循序執行的一半,因為兩個程序被分配到不同的 CPU 核心上平行執行,效率顯著提升。
✅ 何時該用 Multithreading?何時該用 Multiprocessing?
這是一個關鍵的決策點,以下是一個簡單的指南:
選擇
multithreading
如果你的任務是 I/O 密集型 (I/O-Bound)- 例子:同時下載多個網頁、讀取多個檔案、與多個資料庫連線。
- 原因:在這些任務中,程式大部分時間都在等待外部資源 (如網絡、硬碟) 的回應。當一個線程在等待時,GIL 會被釋放,讓其他線程可以繼續工作,從而提高整體效率。線程比程序更輕量,建立和切換的成本較低。
選擇
multiprocessing
如果你的任務是 CPU 密集型 (CPU-Bound)- 例子:大型數據集的科學計算、影片編碼、影像濾鏡處理、複雜的模擬。
- 原因:這些任務需要大量的 CPU 運算。
multiprocessing
可以將工作分配到多個 CPU 核心上,實現真正的平行計算,從而大幅縮短執行時間。
⛓️ 程序的挑戰:數據共享與同步
由於每個程序都有獨立的記憶體空間,這帶來了一個挑戰:如何安全地在程序之間共享數據?multiprocessing
模組提供了一些內建的工具來處理程序間通訊 (Inter-Process Communication, IPC)。
Queue
: 一個程序安全的佇列,是程序間傳遞訊息最常用和最推薦的方式。Pipe
: 建立一個雙向的通訊管道。- 共享記憶體 (
Value
,Array
): 用於在程序間共享簡單的數據類型或陣列。雖然高效,但需要手動處理同步問題,以避免競爭條件 (Race Condition)。
即使程序是獨立的,當它們存取共享資源時(例如一個共享的計數器或檔案),仍然會出現併發問題。這時,就需要使用同步工具,例如 Lock
。
以下是一個未使用 Lock
導致競爭條件的例子:
import multiprocessing
import time
def worker(shared_value):
for _ in range(100_000):
# 讀取、增加、寫回不是一個原子操作
shared_value.value += 1
if __name__ == '__main__':
shared_value = multiprocessing.Value('i', 0) # 'i' 表示 integer
p1 = multiprocessing.Process(target=worker, args=(shared_value,))
p2 = multiprocessing.Process(target=worker, args=(shared_value,))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"未使用 Lock 的最終結果: {shared_value.value}") # 結果可能不是 200,000
為了修正這個問題,我們需要引入 Lock
來確保一次只有一個程序可以修改 shared_value
:
import multiprocessing
import time
def worker_with_lock(shared_value, lock):
for _ in range(100_000):
with lock: # 使用 with 陳述句自動獲取和釋放鎖
shared_value.value += 1
if __name__ == '__main__':
shared_value = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=worker_with_lock, args=(shared_value, lock))
p2 = multiprocessing.Process(target=worker_with_lock, args=(shared_value, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"使用 Lock 後的最終結果: {shared_value.value}") # 結果會是 200,000
✅ 結論
multiprocessing
是 Python 中解決 CPU 密集型任務效能瓶頸的強大工具。它透過建立獨立程序來繞過 GIL,實現真正的平行運算。
總結一下本文的要點:
- API 相似性:
multiprocessing
的 API 與threading
高度相似,易於上手。 - 核心差異:
multithreading
適用於 I/O 密集型任務的併發,而multiprocessing
適用於 CPU 密集型任務的平行。 - GIL:
multiprocessing
是繞過 GIL 以利用多核心 CPU 的標準方法。 - 數據共享: 程序間共享數據比線程複雜,需要使用
Queue
、Pipe
或共享記憶體,並注意使用Lock
等工具進行同步。
理解兩者的根本差異並根據任務類型做出正確的選擇,是編寫高效能 Python 程式的關鍵一步。