threading 是 Python 中用于实现多线程编程的标准库,它允许你在单个进程中同时运行多个线程,从而可以执行多个任务,特别适合处理 I/O 密集型(如网络请求、文件读写)操作。

为什么需要多线程?(核心动机)
想象一下你正在餐厅点餐:
- 单线程:你一个人点餐,从点餐、等餐、吃完、买单,再到下一位客人,如果等餐需要很长时间(20 分钟),整个餐厅的效率都会被拖慢。
- 多线程:你点完餐后,服务员可以去服务其他客人,你的餐点由厨师(另一个“线程”)在后台准备,这样,即使你在等餐,餐厅的其他服务(如点餐、上菜)依然可以并行进行,大大提高了效率。
在编程中,这种“等待”I/O 操作。
- 从网络下载一个文件(大部分时间在等待服务器响应)。
- 读取一个大文件(大部分时间在等待磁盘 I/O)。
在这些等待期间,CPU 是空闲的,多线程可以让一个线程在等待 I/O 时,切换到另一个线程执行计算任务,从而充分利用 CPU 时间,提高程序的响应速度和吞吐量。
threading 模块基础
1 创建和启动线程
最基本的方式是创建一个 Thread 对象,并传入一个函数作为目标。
import threading
import time
def worker(num):
"""线程要执行的函数"""
print(f"Worker {num}: 开始工作")
time.sleep(2) # 模拟 I/O 操作
print(f"Worker {num}: 工作完成")
# 创建线程对象
# target: 线程要执行的函数
# args: 传递给 target 函数的参数元组
t1 = threading.Thread(target=worker, args=(1,))
t2 = threading.Thread(target=worker, args=(2,))
# 启动线程 (调用 run() 方法)
# 这不会阻塞主线程,主线程会继续往下执行
t1.start()
t2.start()
# 等待所有线程执行完毕 (可选)
# 如果没有 join(),主线程可能会在子线程完成前就退出
t1.join()
t2.join()
print("主线程结束")
输出结果:
Worker 1: 开始工作
Worker 2: 开始工作
# (等待2秒)
Worker 1: 工作完成
Worker 2: 工作完成
主线程结束
注意: t1.start() 和 t2.start() 几乎是同时被调用的,所以两个 "开始工作" 的打印语句几乎同时出现。
2 join() 方法详解
join() 是一个非常重要的方法,它的作用是“阻塞当前线程,直到被调用的那个线程执行完毕”。
- 没有
join()的情况:主线程启动子线程后,会立即继续执行自己的代码,不会等待子线程。 - 有
join()的情况:主线程执行到t1.join()时,会暂停,直到t1线程执行完毕,然后才会执行t2.join(),暂停直到t2完成。
这确保了在主线程退出前,所有子线程都有机会完成它们的工作。
3 继承 threading.Thread 类
除了函数式创建,你还可以通过继承 Thread 类来创建线程,这种方式更面向对象,可以方便地在类中管理线程的状态。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, thread_id):
super().__init__()
self.thread_id = thread_id
def run(self):
"""必须重写 run() 方法,这是线程的执行体"""
print(f"线程 {self.thread_id} 启动")
time.sleep(1)
print(f"线程 {self.thread_id} 结束")
# 创建并启动线程
threads = []
for i in range(3):
thread = MyThread(i)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("所有线程执行完毕")
线程同步与锁
当多个线程需要访问和修改同一个共享资源时,就会发生 竞态条件,这可能导致数据不一致或程序崩溃。
经典例子:银行取款
import threading
# 共享资源
balance = 100
def withdraw(amount):
global balance
if balance >= amount:
# 模拟网络延迟或处理时间
# 另一个线程可能会趁虚而入
time.sleep(0.1)
balance -= amount
print(f"取款成功,余额: {balance}")
else:
print("余额不足")
# 两个人同时取款
t1 = threading.Thread(target=withdraw, args=(50,))
t2 = threading.Thread(target=withdraw, args=(70,))
t1.start()
t2.start()
t1.join()
t2.join()
可能出现的错误输出:
取款成功,余额: 50 (t1 检查时 balance=100, t2 检查时 balance=100)
取款成功,余额: -20 (t2 执行时,t1 还没来得及扣款)
这显然是错误的,为了避免这种情况,我们需要引入 锁。
1 threading.Lock
锁是一个同步原语,它确保一次只有一个线程可以访问共享资源。
acquire(): 获取锁,如果锁已经被其他线程获取,则当前线程会阻塞,直到锁被释放。release(): 释放锁。
使用锁修复上面的例子:
import threading
import time
balance = 100
# 创建一个锁对象
lock = threading.Lock()
def withdraw(amount):
global balance
# 在访问共享资源前,尝试获取锁
lock.acquire()
try:
if balance >= amount:
time.sleep(0.1)
balance -= amount
print(f"取款成功,余额: {balance}")
else:
print("余额不足")
finally:
# 在 finally 块中确保锁一定会被释放,即使发生异常
lock.release()
t1 = threading.Thread(target=withdraw, args=(50,))
t2 = threading.Thread(target=withdraw, args=(70,))
t1.start()
t2.start()
t1.join()
t2.join()
正确的输出:
取款成功,余额: 50 (t1 获取了锁)
余额不足 (t2 等待 t1 释放锁后,再次检查 balance=50)
无论哪个线程先获取锁,另一个线程都必须等待,从而保证了数据的安全性。
更推荐的方式:with 语句
with 语句可以自动处理锁的获取和释放,即使发生异常,也能保证锁被释放,代码更简洁、安全。
def withdraw(amount):
global balance
with lock: # 等同于 lock.acquire() ... lock.release()
if balance >= amount:
time.sleep(0.1)
balance -= amount
print(f"取款成功,余额: {balance}")
else:
print("余额不足")
线程通信
线程间需要交换数据,Python 提供了多种线程安全的队列来实现线程间的通信,其中最常用的是 queue.Queue。
queue.Queue 是一个先进先出的数据结构,并且是线程安全的,非常适合在生产者和消费者模型中使用。
示例:生产者-消费者模型
import threading
import queue
import time
import random
# 创建一个线程安全的队列
q = queue.Queue()
def producer():
"""生产者线程:向队列中添加数据"""
for i in range(5):
item = f"产品-{i}"
print(f"生产者: 正在生产 {item}")
q.put(item) # 将数据放入队列
time.sleep(random.random()) # 模拟生产耗时
print("生产者: 生产完毕,退出")
def consumer():
"""消费者线程:从队列中取出数据"""
while True:
try:
# get() 会阻塞,直到队列中有数据
item = q.get(timeout=2) # 设置超时,避免永久阻塞
print(f"消费者: 正在消费 {item}")
time.sleep(random.random()) # 模拟消费耗时
q.task_done() # 标记任务已完成
except queue.Empty:
print("消费者: 队列为空,等待超时,退出")
break
# 创建并启动线程
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
# 等待生产者完成
p.join()
# 等待队列中的所有任务被处理完毕
q.join()
print("主线程: 所有任务已完成,程序退出")
queue.Queue 的常用方法:
q.put(item): 向队列中添加一个项目。q.get(): 从队列中获取一个项目,如果队列为空,它会阻塞。q.get_nowait(): 尝试获取一个项目,如果队列为空,则立即抛出queue.Empty异常。q.task_done(): 当一个消费者完成从队列中获取的项目处理后调用它。q.join(): 阻塞,直到队列中的所有项目都被处理(即,对于每个已放入队列的项目,都调用了一次task_done())。
GIL (Global Interpreter Lock) - 全局解释器锁
这是理解 Python 多线程最关键的一点。
GIL 是什么? GIL 是 CPython 解释器(Python 的标准实现)的一个互斥锁,它确保在任何时刻,只有一个线程可以执行 Python 字节码。
GIL 的影响:
- CPU 密集型任务:对于纯计算任务(如大量数学运算),多线程并不能利用多核 CPU 的优势,因为同一时间只有一个线程能在运行,GIL 会在线程之间快速切换,但并不能真正并行执行,在这种情况下,多进程 (
multiprocessing) 是更好的选择。 - I/O 密集型任务:当线程因为等待 I/O(如网络请求、文件读写)而阻塞时,GIL 会被释放,允许其他线程运行,对于 I/O 密集型任务,多线程仍然非常有效,可以显著提高程序的效率。
- 用
threading处理 I/O 密集型任务(如爬虫、Web 服务器)。 - 用
multiprocessing处理 CPU 密集型任务(如科学计算、图像处理)。
完整示例:并发下载器
这个例子结合了 threading 和 queue,实现一个简单的并发下载器。
import threading
import queue
import requests
import time
# 1. 准备任务队列
# 这里用一些示例图片链接
image_urls = [
"https://www.python.org/static/community_logos/python-logo-master-v3-TM.png",
"https://www.djangoproject.com/m/img/logos/django-logo-negative.png",
"https://flask.palletsprojects.com/en/2.2.x/_static/flask-icon.png",
"https://www.fastapi.tiangolo.com/img/logo-margin/logo-teal.png"
]
# 2. 创建线程安全的队列
download_queue = queue.Queue()
# 3. 生产者:将所有 URL 放入队列
def producer():
for url in image_urls:
download_queue.put(url)
print("生产者: 所有 URL 已加入队列")
# 4. 消费者:从队列中取出 URL 并下载
def consumer():
while True:
try:
url = download_queue.get_nowait()
print(f"消费者: 开始下载 {url.split('/')[-1]}")
response = requests.get(url, timeout=5)
if response.status_code == 200:
filename = url.split('/')[-1]
with open(filename, 'wb') as f:
f.write(response.content)
print(f"消费者: {filename} 下载完成")
else:
print(f"消费者: 下载 {url.split('/')[-1]} 失败,状态码: {response.status_code}")
download_queue.task_done()
except queue.Empty:
print("消费者: 队列已空,退出")
break
# 5. 创建并启动线程
num_consumers = 3 # 3个下载线程
threads = []
# 启动生产者
producer_thread = threading.Thread(target=producer)
threads.append(producer_thread)
producer_thread.start()
# 启动消费者
for _ in range(num_consumers):
consumer_thread = threading.Thread(target=consumer)
threads.append(consumer_thread)
consumer_thread.start()
# 6. 等待所有任务完成
producer_thread.join()
download_queue.join() # 等待队列清空
# 等待所有消费者线程结束(虽然它们在队列为空后会自己退出)
for t in threads:
if t != producer_thread: # 生产者已经结束了
t.join()
print("主线程: 所有下载任务完成!")
高级主题
threading.Event: 线程间的事件通知机制,一个线程可以等待一个事件的发生,另一个线程在特定条件满足时可以设置该事件,从而唤醒等待的线程。threading.Semaphore: 信号量,用于控制同时访问某个特定资源的线程数量,限制一个只能容纳 3 人的房间(信号量为 3)。threading.Timer: 定时器,在指定的时间后执行一个函数。
| 特性 | 描述 | 适用场景 |
|---|---|---|
| 创建线程 | threading.Thread(target=func, args=()) 或继承 Thread 类 |
任何需要多线程的场景 |
| 启动线程 | thread.start() |
启动线程执行 |
| 等待线程 | thread.join() |
确保主线程在子线程完成后才退出 |
| 同步机制 | threading.Lock |
保护共享资源,防止竞态条件 |
| 线程通信 | queue.Queue |
安全地在线程间传递数据(生产者-消费者模型) |
| 核心限制 | GIL | I/O 密集型任务效果好,CPU 密集型任务效果差 |
threading 是 Python 并发编程的重要工具,掌握它对于编写高效、响应迅速的应用程序至关重要。
