这是一个非常重要的主题,因为它关系到如何利用多核 CPU 提升程序性能,以及如何处理并发任务。

目录
- 为什么需要多线程?
- Python 的 GIL (全局解释器器锁)
threading模块核心概念Thread类:创建和管理线程Lock(锁):解决资源竞争问题RLock(可重入锁):更安全的锁Semaphore(信号量):控制资源访问数量Event(事件):线程间通信Queue(队列):线程安全的任务和数据交换
- 完整代码示例
- 示例1:基础多线程
- 示例2:使用
Lock防止竞争条件 - 示例3:生产者-消费者模型(使用
Queue)
- 多线程 vs. 多进程
- 最佳实践与注意事项
为什么需要多线程?
想象一个场景:你需要下载 10 个文件,如果只用一个线程,程序会一个接一个地下载,总时间是所有文件下载时间的总和。
如果使用 10 个线程,每个线程负责下载一个文件,10 个文件可以(在一定程度上)并行下载,总时间会大大缩短。
多线程的核心用途:
- I/O 密集型任务:当程序需要等待外部资源时,如网络请求、文件读写、数据库查询,在等待时,CPU 可以被释放去执行其他线程的任务,从而提高 CPU 的利用率。
- 提升用户体验:在图形用户界面程序中,将耗时操作放在后台线程执行,可以避免界面卡死,保持响应。
Python 的 GIL (Global Interpreter Lock)
这是理解 Python 多线程最关键的一点!

GIL 是什么? GIL 是一个互斥锁,它确保在任何时刻,一个 Python 进程中只有一个线程在执行 Python 字节码,这意味着,即使在多核处理器上,Python 的多线程也无法实现真正的并行计算。
GIL 的影响:
- 对于 CPU 密集型 任务(如大量数学计算、图像处理),多线程并不能带来性能提升,因为同一时间只有一个线程在运行,在这种情况下,应该使用
multiprocessing模块来创建多个进程,每个进程有自己的 Python 解释器和内存空间,可以真正地在多核上并行。 - 对于 I/O 密集型 任务,GIL 的影响较小,当一个线程因为 I/O 操作(如
requests.get()或time.sleep())而阻塞时,GIL 会被释放,其他线程就可以获得 GIL 并继续执行,这使得多线程在处理 I/O 密集型任务时非常有效。
简单总结:
- 多线程:适合 I/O 密集型 任务。
- 多进程:适合 CPU 密集型 任务。
threading 模块核心概念
Thread 类:创建和管理线程
这是 threading 模块最核心的类,用于创建新线程。

两种创建线程的方式:
继承 Thread 类(面向对象)
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
# 线程要执行的代码
print(f"线程 {self.name} 开始")
time.sleep(2) # 模拟耗时操作
print(f"线程 {self.name} 结束")
# 创建并启动线程
thread1 = MyThread("Thread-1")
thread2 = MyThread("Thread-2")
thread1.start()
thread2.start()
# 等待所有线程执行完毕
thread1.join()
thread2.join()
print("所有线程已结束")
传递一个可调用对象(函数或 lambda)
这是更常见、更简洁的方式。
import threading
import time
def worker_function(name):
print(f"线程 {name} 开始")
time.sleep(2)
print(f"线程 {name} 结束")
# 创建线程对象
thread1 = threading.Thread(target=worker_function, args=("Thread-A",))
thread2 = threading.Thread(target=worker_function, args=("Thread-B",))
# 启动线程
thread1.start()
thread2.start()
# 等待线程结束
thread1.join()
thread2.join()
print("所有线程已结束")
常用方法:
start(): 启动线程,执行run()方法。run(): 线程的执行体,需要重写。join(): 阻塞当前线程,直到被调用的线程执行完毕,非常重要,用于同步。is_alive(): 检查线程是否还在运行。
Lock (锁):解决资源竞争问题
当多个线程同时读写同一个共享变量时,可能会导致数据不一致,这就是“竞争条件”。
例子:银行账户取款
import threading
balance = 1000
def withdraw(amount):
global balance
# 检查余额
if balance >= amount:
# 模拟网络延迟,让问题更容易发生
threading.Event().wait(0.001)
balance -= amount
print(f"取款 {amount},余额为 {balance}")
else:
print("余额不足")
threads = []
for _ in range(10):
t = threading.Thread(target=withdraw, args=(100,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终余额: {balance}") # 可能不是 0,因为多个线程同时检查了余额
使用 Lock 修复:
import threading
balance = 1000
lock = threading.Lock() # 创建一个锁
def withdraw(amount):
global balance
# 在访问共享资源前获取锁
lock.acquire()
try:
if balance >= amount:
threading.Event().wait(0.001)
balance -= amount
print(f"取款 {amount},余额为 {balance}")
else:
print("余额不足")
finally:
# 在 finally 块中释放锁,确保锁一定会被释放
lock.release()
threads = []
for _ in range(10):
t = threading.Thread(target=withdraw, args=(100,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终余额: {balance}") # 正确结果是 0
更推荐的方式:with 语句
with 语句会自动处理锁的获取和释放,即使在代码块中发生异常,锁也会被释放,这是最安全、最简洁的方式。
def withdraw(amount):
global balance
with lock: # 自动 acquire() 和 release()
if balance >= amount:
threading.Event().wait(0.001)
balance -= amount
print(f"取款 {amount},余额为 {balance}")
else:
print("余额不足")
RLock (可重入锁)
RLock (Reentrant Lock) 允许同一个线程多次获取锁,而不会造成死锁,这在递归函数中非常有用。
import threading
lock = threading.RLock()
def recursive_function(depth):
with lock:
if depth > 0:
print(f"递归深度: {depth}")
recursive_function(depth - 1)
recursive_function(3) # 不会死锁
Semaphore (信号量)
Semaphore 是一个计数器,用于控制同时访问某个资源的线程数量,一个只能容纳 5 人的房间(信号量初始值为 5)。
import threading
import time
# 最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def worker(worker_id):
with semaphore:
print(f"Worker {worker_id} 正在工作...")
time.sleep(2)
print(f"Worker {worker_id} 工作完成")
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
Event (事件)
Event 用于线程间的简单通信,一个线程可以通知其他某个事件已经发生。
event.set(): 设置事件,表示事件发生。event.clear(): 清除事件,表示事件未发生。event.wait(): 阻塞线程,直到事件被设置。
import threading
import time
event = threading.Event()
def waiter():
print("Waiter: 等待事件...")
event.wait() # 阻塞在这里
print("Waiter: 事件已发生!继续执行。")
def setter():
print("Setter: 准 