为什么需要 multiprocessing?—— GIL 的限制
在深入 multiprocessing 之前,必须先理解 Python 的 全局解释器锁。

- GIL 是什么? GIL 是一个互斥锁,它保证在任何时刻,一个 Python 进程中只有一个线程在执行 Python 字节码,这意味着即使在多核处理器上,Python 的多线程也无法实现真正的并行计算,而是通过快速切换线程来实现并发。
- GIL 的影响? 对于 I/O 密集型 任务(如网络请求、文件读写),线程在等待 I/O 时会释放 GIL,让其他线程运行,因此多线程仍然有效,但对于 CPU 密集型 任务(如数值计算、图像处理),由于 GIL 的存在,多线程并不能利用多核 CPU 的优势,反而因为线程切换的开销而可能比单线程更慢。
multiprocessing 的解决方案:
multiprocessing 模块通过创建多个独立的 进程 来绕过 GIL,每个进程都有自己独立的 Python 解释器和内存空间,因此不受 GIL 的限制,可以真正地在多个 CPU 核心上并行运行。
multiprocessing 核心概念与组件
multiprocessing 模块提供了多种方式来创建和管理进程,类似于 threading 模块的 API,但操作的是进程而不是线程。
Process 类
这是最基础、最核心的组件,用于创建一个新进程。
基本用法:

import multiprocessing
import time
import os
def worker(num):
"""一个简单的 worker 函数"""
process_id = os.getpid()
print(f'Worker {num} is running in Process {process_id}')
time.sleep(2) # 模拟耗时任务
print(f'Worker {num} finished')
if __name__ == '__main__':
print(f'Main process ID: {os.getpid()}')
# 创建一个进程对象
p = multiprocessing.Process(target=worker, args=(1,))
# 启动进程
p.start()
# 主进程继续执行
print('Main process is doing other work...')
# 等待子进程结束 (非常重要!)
p.join()
print('Main process has finished.')
代码解释:
if __name__ == '__main__'::这是使用multiprocessing的标准写法,在 Windows 等平台上,子进程会重新导入父进程的脚本,如果没有这个判断,子进程会再次执行if块内的代码,导致无限递归创建进程。multiprocessing.Process(target=worker, args=(1,)):target: 指定子进程要执行的函数。args: 一个元组,传递给target函数的位置参数。
p.start():启动进程,这会调用worker函数。p.join():主进程会在此处阻塞,直到子进程p执行完毕。如果你不调用join(),主进程可能会在子进程完成前就结束,导致子进程被意外终止。
Pool 类
当你需要创建和管理大量相似的工作进程时,手动为每个任务创建一个 Process 对象会变得很繁琐。Pool 类提供了一个进程池,可以方便地分发任务。
Pool 主要有两种使用模式:
a) map() 方法
map() 是一个阻塞式方法,它将一个任务列表分配给进程池中的工作进程,并收集所有结果,最后返回一个结果列表,它的行为类似于内置的 map() 函数。
import multiprocessing
import time
def square(n):
"""计算一个数的平方"""
time.sleep(1) # 模拟计算耗时
return n * n
if __name__ == '__main__':
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
# 创建一个包含4个工作进程的进程池
with multiprocessing.Pool(processes=4) as pool:
print("Using pool.map()...")
# pool.map 会阻塞,直到所有任务完成
results = pool.map(square, numbers)
print(f"Results from map: {results}")
b) apply_async() 方法
apply_async() 是一个非阻塞式方法,它提交一个任务到进程池并立即返回一个 AsyncResult 对象,你可以稍后通过这个对象获取结果,这种方式更灵活,适合需要立即处理其他任务或处理耗时不同任务的场景。
import multiprocessing
import time
def square(n):
"""计算一个数的平方"""
time.sleep(1) # 让不同任务有不同的耗时
print(f"Calculating square of {n}")
return n * n
if __name__ == '__main__':
numbers = [1, 2, 3, 4, 5]
with multiprocessing.Pool(processes=2) as pool:
print("Using pool.apply_async()...")
# 提交所有任务,但不会阻塞
results = [pool.apply_async(square, (num,)) for num in numbers]
# 主进程可以在这里做其他事情...
print("All tasks submitted. Main process is free.")
# 稍后,从 AsyncResult 对象中获取结果
# get() 方法会阻塞,直到对应的结果准备好
final_results = [r.get() for r in results]
print(f"Final results: {final_results}")
Pool 的优势:
- 自动管理进程: 你不需要手动创建和销毁进程。
- 任务队列: 内部实现了任务队列,自动将任务分配给空闲的工作进程。
- 负载均衡: 自动处理任务的分发和负载均衡。
进程间通信
由于每个进程有独立的内存空间,它们不能像线程那样直接共享变量,如果需要进程间共享数据,必须使用特定的 IPC 机制。
-
Queue(队列):Queue是最常用的进程间通信方式,实现了生产者-消费者模型,一个进程(生产者)可以把数据放入队列,另一个进程(消费者)可以从队列中取出数据。import multiprocessing def producer(queue): for i in range(5): queue.put(i) print(f'Producer put {i}') queue.put(None) # 发送结束信号 def consumer(queue): while True: item = queue.get() if item is None: break # 收到结束信号,退出循环 print(f'Consumer got {item}') if __name__ == '__main__': q = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(q,)) p2 = multiprocessing.Process(target=consumer, args=(q,)) p1.start() p2.start() p1.join() p2.join() print("All done.") -
Pipe(管道):Pipe创建一对连接对象,代表管道的两端,进程可以通过这些对象的send()和recv()方法来通信,它适用于两个进程之间的直接通信。import multiprocessing def worker(conn): """子进程接收数据并处理""" print("Worker: Receiving data...") data = conn.recv() print(f"Worker: Received {data}") result = data * 2 conn.send(result) conn.close() if __name__ == '__main__': # 创建一个管道,返回两个连接对象 parent_conn, child_conn = multiprocessing.Pipe() p = multiprocessing.Process(target=worker, args=(child_conn,)) p.start() # 父进程发送数据 print("Parent: Sending data 10...") parent_conn.send(10) # 父进程接收返回结果 print("Parent: Receiving result...") result = parent_conn.recv() print(f"Parent: Received result {result}") p.join() -
Manager(管理器):Manager提供了一种高级方式,允许你在不同进程间共享 Python 对象(如列表、字典、Value,Array等)。Manager在一个单独的进程中运行所有共享对象,其他进程通过代理来访问这些对象,这种方式比Queue或Pipe更灵活,但速度也慢得多,因为涉及到进程间通信的开销。import multiprocessing def worker(shared_list, shared_dict): shared_list.append(multiprocessing.current_process().name) shared_dict['key'] = 'value from ' + multiprocessing.current_process().name if __name__ == '__main__': with multiprocessing.Manager() as manager: # 创建共享的列表和字典 shared_list = manager.list() shared_dict = manager.dict() processes = [] for i in range(3): p = multiprocessing.Process(target=worker, args=(shared_list, shared_dict)) processes.append(p) p.start()
