threading 是 Python 中用于实现多线程编程的标准库,它允许你在同一个进程中运行多个线程,从而可以“执行多个任务,特别适合用于 I/O 密集型(如网络请求、文件读写)任务。

为什么需要多线程?(核心动机)
在 Python 中,由于 全局解释器锁 的存在,多线程在 CPU 密集型 任务上并不能实现真正的并行执行(即多个线程在同一时刻由多个 CPU 核心同时运行),但对于 I/O 密集型 任务,情况就完全不同了。
I/O 密集型任务的特点: 程序的大部分时间都在等待 I/O 操作完成(如等待网络数据返回、等待磁盘读取文件),在这等待期间,CPU 是空闲的。
多线程在这里的作用: 当一个线程因为等待 I/O 而被阻塞时,GIL 会被释放,Python 解释器可以切换到另一个就绪的线程去执行,这样,CPU 的空闲时间就被利用起来了,看起来就像是多个任务在“进行,极大地提高了程序的执行效率。
threading 模块的核心概念与组件
1 Thread 类
threading.Thread 是创建和管理线程的主要工具。

创建线程的两种方式:
-
函数式创建(最简单) 创建一个函数,然后将这个函数作为
target参数传递给Thread对象。import threading import time def worker(name): """线程要执行的函数""" print(f"Worker {name} is starting...") time.sleep(2) # 模拟 I/O 操作 print(f"Worker {name} has finished.") # 创建线程对象 # target: 指定线程要执行的函数 # args: 以元组形式传递给 target 函数的位置参数 t1 = threading.Thread(target=worker, args=("Thread-1",)) # 启动线程 t1.start() print("Main thread continues to run...") # 等待线程执行完毕(非常重要!) t1.join() print("Main thread has ended.")输出分析: 你会看到 "Main thread continues to run..." 在 "Worker Thread-1 is starting..." 之后立即打印,因为主线程没有等待
t1完成,而 "Main thread has ended" 会在t1.join()之后,也就是t1执行完毕后才打印。 -
类继承式创建(更面向对象,更灵活) 创建一个类,继承自
threading.Thread,并重写run()方法。import threading import time class MyThread(threading.Thread): def __init__(self, name): super().__init__() self.name = name def run(self): """线程启动时自动调用的方法""" print(f"{self.name} is starting...") time.sleep(2) print(f"{self.name} has finished.") # 创建并启动线程 t2 = MyThread("Class-Thread-1") t2.start() print("Main thread continues to run...") t2.join() print("Main thread has ended.")这种方式更易于管理线程的状态和行为,适合复杂的线程逻辑。
2 线程的生命周期
一个线程从创建到销毁,会经历以下状态:
- New (新建): 线程对象被创建,但
start()方法还未被调用。 - Runnable (就绪):
start()方法被调用,线程进入就绪队列,等待 GIL 的调度。 - Running (运行): 线程获得了 GIL,正在执行
run()方法中的代码。 - Blocked (阻塞): 线程因为某些原因(如等待 I/O、获取锁)暂停运行,暂时让出 GIL。
- Terminated (终止): 线程的
run()方法执行完毕,线程生命周期结束。
3 主线程与子线程
当你运行一个 Python 脚本时,默认就有一个“主线程”,通过 threading.Thread 创建的线程都被称为“子线程”。
threading.current_thread(): 可以获取当前正在执行的线程对象。
threading.main_thread(): 可以获取主线程对象。
import threading
import time
def show_current_thread():
t = threading.current_thread()
print(f"Running in thread: {t.name} (is Main: {t is threading.main_thread()})")
show_current_thread() # 显示主线程
t = threading.Thread(target=show_current_thread)
t.start()
t.join()
线程间的通信与同步
当多个线程需要访问共享资源(如一个全局变量、一个文件)时,就会引发 竞态条件,导致数据不一致或不正确,为了解决这个问题,我们需要使用同步机制。
1 锁
锁是最基本的同步工具,它确保了在任何时刻,只有一个线程可以访问被锁保护的代码块(称为“临界区”)。
threading.Lock(): 创建一个锁。lock.acquire(): 获取锁,如果锁已经被其他线程获取,则当前线程会阻塞,直到锁被释放。lock.release(): 释放锁。
重要提示: 推荐使用 with lock: 语句,它会自动处理 acquire() 和 release(),即使发生异常也能保证锁被释放,避免死锁。
示例:不使用锁(竞态条件)
import threading
counter = 0
threads = []
def increment():
global counter
for _ in range(100000):
counter += 1 # 这不是一个原子操作, read-modify-write
for i in range(5):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Final counter value: {counter}") # 结果几乎总是小于 500000
示例:使用锁(解决竞态条件)
import threading
counter = 0
lock = threading.Lock()
threads = []
def increment_with_lock():
global counter
for _ in range(100000):
with lock: # 获取锁,执行完代码块后自动释放
counter += 1
for i in range(5):
t = threading.Thread(target=increment_with_lock)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Final counter value: {counter}") # 结果总是 500000
2 其他同步原语
除了 Lock,threading 模块还提供了其他有用的同步工具:
RLock(可重入锁): 允许同一个线程多次获取同一个锁,避免死锁,适用于递归函数。Semaphore(信号量): 控制同时访问某个资源的线程数量,可以看作是“多个许可”的锁。Event(事件): 一个简单的线程间通信机制,一个线程可以发出一个事件信号,其他线程可以等待这个信号。Condition(条件变量): 允许线程在某些条件满足时才继续执行,常与锁配合使用,实现复杂的线程间协作。
线程间数据共享与通信
1 共享数据
共享数据(如全局变量、列表、字典)是线程间通信最直接的方式,但也是最容易出问题的方式。必须配合锁等同步机制来使用。
import threading
shared_data = []
lock = threading.Lock()
def append_data(value):
with lock:
shared_data.append(value)
print(f"{threading.current_thread().name} appended {value}")
threads = []
for i in range(5):
t = threading.Thread(target=append_data, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Final shared_data: {shared_data}")
2 Queue (线程安全的队列)
queue.Queue 是多线程编程中 最推荐、最安全 的数据交换方式,它是一个线程安全的队列,一个线程可以把数据放入队列,另一个线程可以从队列中取出数据。Queue 内部已经实现了必要的锁,你无需手动加锁。
put(item): 向队列中放入一个数据,如果队列已满,则阻塞。get(): 从队列中取出一个数据,如果队列为空,则阻塞。task_done(): 当消费者完成一项任务后调用,通常用于join()方法。join(): 阻塞主线程,直到队列中的所有任务都被处理完毕(即每个put的项目都有对应的task_done)。qsize(): 返回队列中大致的项目数。
生产者-消费者模型示例
import threading
import queue
import time
import random
# 创建一个线程安全的队列
q = queue.Queue()
def producer():
"""生产者线程"""
for i in range(5):
item = f"Product-{i}"
print(f"Producer is producing {item}")
q.put(item)
time.sleep(random.random()) # 模拟生产耗时
print("Producer finished.")
def consumer():
"""消费者线程"""
while True:
try:
# 设置超时,避免永久阻塞
item = q.get(timeout=1)
print(f"Consumer is consuming {item}")
time.sleep(random.random()) # 模拟消费耗时
q.task_done() # 标记任务完成
except queue.Empty:
print("Queue is empty, consumer exiting.")
break
# 创建并启动线程
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
# 等待生产者完成
p.join()
# 等待队列中所有任务被消费者处理完毕
q.join()
print("All products have been consumed.")
# 确保消费者线程也能退出
# 在这个简单例子中,消费者会因为超时而退出
高级主题与最佳实践
1 线程池
手动创建和管理大量线程(如成百上千个)会消耗大量资源,并且难以控制。concurrent.futures.ThreadPoolExecutor 提供了一个更高级、更易用的线程池接口。
import concurrent.futures
import time
def do_task(task_id):
print(f"Task {task_id} is starting...")
time.sleep(2)
print(f"Task {task_id} is finished.")
return f"Result of task {task_id}"
# 创建一个包含 4 个线程的线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 提交多个任务到线程池
futures = [executor.submit(do_task, i) for i in range(8)]
# 等待所有任务完成并获取结果
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"Got: {result}")
except Exception as e:
print(f"A task generated an exception: {e}")
print("All tasks done in the thread pool.")
2 线程本地存储
threading.local() 为每个线程创建一个独立的存储空间,即使多个线程访问同一个 local 对象,它们也只能看到和修改自己存储的数据,而不会互相干扰,这对于避免在多线程环境下传递大量上下文数据非常有用。
import threading
local_data = threading.local()
def process_data(user_id):
local_data.user_id = user_id # 将数据存储在本地
print(f"Thread {threading.current_thread().name} is processing for user {local_data.user_id}")
# ... 其他操作,都可以使用 local_data.user_id
threads = []
for i in range(3):
t = threading.Thread(target=process_data, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
3 停止线程
Python 没有直接强制终止线程的 API(_stop() 是危险的,已废弃),正确的做法是使用 “哨兵值” 或 事件 来通知线程应该退出了。
使用事件 Event 示例:
import threading
import time
stop_event = threading.Event()
def worker():
print("Worker is running...")
while not stop_event.is_set(): # 检查事件是否被设置
print("Worker is working...")
time.sleep(1)
print("Worker is stopping...")
t = threading.Thread(target=worker)
t.start()
time.sleep(5)
print("Main thread is signaling worker to stop...")
stop_event.set() # 设置事件,通知线程退出
t.join()
print("Worker has stopped.")
总结与最佳实践
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 简单的并行任务 | threading.Thread + join() |
直观,易于理解。 |
| 生产者-消费者模型 | queue.Queue |
线程安全,设计经典,解耦生产者和消费者。 |
| 大量并发任务 | concurrent.futures.ThreadPoolExecutor |
避免了手动管理线程的复杂性,复用线程,效率高。 |
| 需要线程独立状态 | threading.local() |
隔离线程数据,避免冲突。 |
| 需要优雅地停止线程 | threading.Event 或 哨兵值 |
安全、可控,避免强制终止带来的资源问题。 |
| CPU 密集型任务 | multiprocessing 模块 |
绕过 GIL,实现真正的多核并行。 |
核心原则:
- 优先考虑
Queue:在线程间传递数据时,queue.Queue是你的首选。 - 锁要最小化:只在保护共享数据的临界区使用锁,并尽量缩短持有锁的时间。
- 使用
with语句管理锁:确保锁一定会被释放,防止死锁。 - 避免共享可变状态:最好的同步是没有同步,尽量让每个线程操作自己的数据。
- 了解 GIL:Python 多线程在 CPU 密集型任务上的局限性,并知道何时该选择多进程。
