目录
- 为什么需要多线程? - 理解多线程的用途
threading模块核心概念 - 线程、锁、守护线程threading基础用法 - 从创建到启动- 线程同步:
Lock(锁) - 解决数据竞争问题 - 线程同步:
RLock(可重入锁) - 处理同一线程的多次加锁 - 线程同步:
Semaphore(信号量) - 控制访问资源的线程数量 - 线程间通信:
Queue(队列) - 安全地在线程间传递数据 - 守护线程 (
Daemon Thread) - 主线程退出时自动结束的线程 - 线程池 (
ThreadPoolExecutor) - 现代化的线程管理方式 - 重要陷阱与注意事项
- 全局解释器锁
- 避免使用
kill命令 - 注意资源清理
- 完整示例:一个多线程下载器
为什么需要多线程?
想象一下你正在一家餐厅点餐,传统单线程的方式就像只有一个服务员:他先记下你的点单,然后去厨房,等菜做好再端回来,期间不能服务其他任何客人,效率非常低。

多线程就像餐厅雇佣了多个服务员(线程):
- 线程 1 接受你的点单。
- 线程 2 同时为另一桌客人服务。
- 线程 3 去厨房取菜。
在编程中,多线程主要有两个用途:
- I/O 密集型任务:当程序需要等待外部资源(如网络请求、文件读写、数据库查询)时,CPU 是空闲的,此时启动另一个线程去执行其他任务,可以充分利用 CPU 的等待时间,提高程序的并发处理能力,这是 Python 多线程最常用且最有效的场景。
- CPU 密集型任务:当程序需要进行大量的计算(如科学计算、图像处理)时,多线程并不能真正实现并行,因为 Python 的 GIL(见第10节)的存在,但有时可以通过多线程让计算任务和界面响应同时进行(在 GUI 程序中,一个线程负责计算,另一个线程负责保持界面不卡死)。
threading 模块核心概念
Thread类:threading模块的核心,用于创建和控制线程。Lock(锁):一种同步原语,用于保护共享资源,防止多个线程同时修改它导致数据错乱(即“竞态条件”)。RLock(可重入锁):一种特殊的锁,同一个线程可以多次获取它而不会造成死锁。Semaphore(信号量):一个计数器,用于控制同时访问某个特定资源的线程数量。Queue(队列):一个线程安全的队列,是生产者-消费者模型的完美实现,可以安全地在多个线程之间传递数据。Daemon Thread(守护线程):一个“后台”线程,当主程序(主线程)结束时,所有守护线程都会被强制终止,垃圾回收线程通常就是守护线程。
threading 基础用法
创建和启动线程有两种主要方式。
继承 Thread 类 (面向对象风格)
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
"""线程要执行的代码放在 run 方法中"""
print(f"线程 {self.name} 开始运行")
time.sleep(2) # 模拟耗时操作
print(f"线程 {self.name} 运行结束")
# 创建线程实例
thread1 = MyThread("线程-A")
thread2 = MyThread("线程-B")
# 启动线程
thread1.start()
thread2.start()
# 等待所有线程执行完毕
thread1.join()
thread2.join()
print("主线程结束")
传递一个函数 (函数式风格,更常用)
import threading
import time
def worker(name):
"""线程要执行的函数"""
print(f"线程 {name} 开始运行")
time.sleep(2)
print(f"线程 {name} 运行结束")
# 创建线程实例,target 是目标函数,args 是传递给函数的参数元组
thread1 = threading.Thread(target=worker, args=("线程-A",))
thread2 = threading.Thread(target=worker, args=("线程-B",))
# 启动线程
thread1.start()
thread2.start()
# 等待所有线程执行完毕
thread1.join()
thread2.join()
print("主线程结束")
输出 (两种方式都类似):

线程 线程-A 开始运行
线程 线程-B 开始运行
(等待2秒后...)
线程 线程-A 运行结束
线程 线程-B 运行结束
主线程结束
关键点:
start(): 启动线程,这会自动调用run()方法。run(): 线程执行体的入口,直接调用run()不会创建新线程,而是在当前线程中执行。join(): 阻塞主线程,直到调用join()的线程执行完毕,这是实现“等待”的关键。
线程同步:Lock (锁)
当多个线程访问和修改同一个共享变量时,会发生竞态条件。
不加锁的错误示例:
import threading
counter = 0
threads = []
def increment():
global counter
for _ in range(100000):
counter += 1 # 这一步不是原子操作,包含了"读取-修改-写入"
for i in range(5):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()
for t in threads:
t.join()
print(f"最终计数器值: {counter}") # 预期是 500000,但实际结果几乎总是小于它
使用 Lock 修复:

import threading
counter = 0
lock = threading.Lock() # 创建一个锁
threads = []
def increment():
global counter
for _ in range(100000):
with lock: # 使用 with 语句获取锁,代码块执行完后自动释放锁
counter += 1
for i in range(5):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()
for t in threads:
t.join()
print(f"最终计数器值: {counter}") # 输出 500000
with lock: 是获取锁的推荐方式,因为它能确保锁一定会被释放(即使在代码块中发生异常),手动使用 lock.acquire() 和 lock.release() 也可以,但容易忘记释放而导致死锁。
线程同步:RLock (可重入锁)
如果在一个线程中,同一个锁被多次获取,Lock 会导致死锁。
Lock 导致的死锁示例:
import threading
lock = threading.Lock()
def worker():
print("Worker 尝试获取锁...")
with lock:
print("Worker 已获取锁")
# 在这里再次尝试获取同一个锁
with lock: # 会卡住,因为锁已经被自己获取,无法再次获取
print("Worker 尝试再次获取锁 (会失败)")
print("Worker 释放锁")
thread = threading.Thread(target=worker)
thread.start()
thread.join()
# 程序会卡住,无法退出
使用 RLock 解决:
import threading
lock = threading.RLock() # 使用 RLock
def worker():
print("Worker 尝试获取锁...")
with lock:
print("Worker 已获取锁")
# RLock 允许同一个线程多次获取
with lock:
print("Worker 再次获取锁成功")
print("Worker 释放锁")
thread = threading.Thread(target=worker)
thread.start()
thread.join()
# 程序正常运行
线程同步:Semaphore (信号量)
假设一个公共厕所只有3个坑位,最多只能有3个人同时使用。Semaphore 就可以实现这个限制。
import threading
import time
# 信号量,表示最多有3个线程可以同时执行
semaphore = threading.Semaphore(3)
def worker(id):
print(f"线程 {id} 等待进入...")
with semaphore: # 获取信号量,如果满了则等待
print(f"线程 {id} 已进入,开始工作...")
time.sleep(2) # 模拟工作
print(f"线程 {id} 工作完成,离开")
threads = []
for i in range(10):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
for t in threads:
t.join()
print("所有线程工作完毕")
输出分析: 前3个线程会立即进入,第4个及之后的线程会等待,每隔2秒,一个线程完成工作,下一个等待的线程才能进入。
线程间通信:Queue (队列)
直接使用共享变量需要 Lock 来保护,非常繁琐且容易出错。Queue 是线程安全的,是线程间通信的最佳选择。
生产者-消费者模型示例:
import threading
import time
import queue
# 创建一个线程安全的队列,最大容量为5
q = queue.Queue(maxsize=5)
def producer():
"""生产者,向队列中放入数据"""
for i in range(10):
print(f"生产者: 准备生产数据 {i}")
q.put(f"数据-{i}") # 如果队列满了,put() 会阻塞
print(f"生产者: 数据 {i} 已放入队列")
time.sleep(0.5)
def consumer():
"""消费者,从队列中取出数据"""
while True:
print(f"消费者: 等待从队列取数据...")
data = q.get() # 如果队列空了,get() 会阻塞
print(f"消费者: 取到 {data},开始处理...")
time.sleep(1)
# 标记任务完成,对于队列的 get() 是必须的
q.task_done()
# 创建并启动线程
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer, daemon=True) # 将消费者设为守护线程
p.start()
c.start()
# 等待生产者完成
p.join()
# 等待队列中所有任务被处理完毕
q.join()
print("所有数据已被处理,主线程结束")
关键点:
queue.put(item): 向队列中添加一个元素,如果队列已满,则阻塞。queue.get(): 从队列中移除并返回一个元素,如果队列已空,则阻塞。queue.task_done(): 消费者在处理完一个get()来的元素后调用,通知队列该任务已完成。queue.join(): 阻塞主线程,直到队列中所有的元素都被处理完毕(即task_done()被调用了与队列元素数量相同的次数)。
守护线程 (Daemon Thread)
守护线程的生命周期依赖于主线程,当主线程结束时,所有守护线程都会被 Python 解释器强行终止,无论它们是否执行完毕。
示例:
import threading
import time
def daemon_task():
print("守护线程启动")
while True:
print("守护线程正在运行...")
time.sleep(1)
def regular_task():
print("普通线程启动")
time.sleep(3)
print("普通线程结束")
d = threading.Thread(target=daemon_task, daemon=True)
r = threading.Thread(target=regular_task)
d.start()
r.start()
# 主线程什么都不做,等待普通线程 r 结束
# r 结束后,主线程也随之结束,守护线程 d 会被强制终止
注意: 守护线程中不能有需要清理的资源(如打开的文件、数据库连接),因为它们可能被突然中断。
线程池 (ThreadPoolExecutor)
手动创建和管理大量线程非常麻烦。concurrent.futures.ThreadPoolExecutor 提供了更高级、更现代的方式来管理线程池。
示例:
import concurrent.futures
import time
def task(n):
print(f"任务 {n} 开始执行")
time.sleep(n)
print(f"任务 {n} 执行完毕")
return n * n
# 创建一个包含5个工作线程的线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 创建10个任务,并提交到线程池
futures = [executor.submit(task, i) for i in range(1, 11)]
print("所有任务已提交,等待结果...")
# 可以遍历 future 对象来获取结果
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"得到结果: {result}")
except Exception as e:
print(f"任务出错: {e}")
print("所有任务处理完毕")
优点:
- 自动管理:自动创建和销毁线程。
- 任务提交:通过
submit提交任务,返回Future对象,可以用来获取结果或检查状态。 - 获取结果:
as_completed会按任务完成的顺序返回Future对象,result()可以获取返回值。 - 优雅关闭:
with语句确保线程池在使用完毕后被正确关闭。
重要陷阱与注意事项
全局解释器锁
GIL (Global Interpreter Lock) 是 Python 解释器(特别是 CPython)的一个互斥锁,它确保任何时候都只有一个线程在执行 Python 字节码。
- 对 I/O 密集型任务的影响:几乎无影响,当一个线程在等待 I/O(如网络请求)时,GIL 会被释放,允许其他线程运行。
- 对 CPU 密集型任务的影响:非常大,由于 GIL 的存在,即使在多核 CPU 上,Python 的多线程也无法实现真正的并行计算,它只是在不同线程间快速切换,但同一时刻只有一个线程在运行。
对于 CPU 密集型任务,应使用 multiprocessing 模块来利用多核 CPU,对于 I/O 密集型任务,threading 依然是最佳选择。
避免使用 kill 命令
不要使用 kill -9 (SIGKILL) 来终止 Python 线程,这会导致线程被强制杀死,资源(如文件、锁)可能无法被正确释放,导致程序状态不一致。
- 优雅终止:可以通过设置一个“停止事件”(
threading.Event)来通知线程自行退出。 daemon线程:对于可以随时被丢弃的后台任务,可以使用守护线程。
注意资源清理
确保在线程结束或异常时,打开的文件、数据库连接、网络套接字等资源能够被正确关闭。try...finally 结构或 with 语句是很好的实践。
完整示例:一个多线程下载器
这个示例将结合 threading 和 Queue 来实现一个简单的多线程文件下载器。
import threading
import queue
import requests
import os
# --- 配置 ---
DOWNLOAD_URL = "http://example.com/largefile.zip" # 替换为一个大文件的真实URL
NUM_WORKERS = 5 # 下载线程数
CHUNK_SIZE = 1024 * 1024 # 每次下载 1MB
# --- 全局变量 ---
task_queue = queue.Queue()
lock = threading.Lock()
def download_worker():
"""工作线程,负责从队列中获取任务并下载"""
while True:
try:
# 从队列获取任务,设置超时以避免永久阻塞
url, file_path, start_byte, end_byte = task_queue.get(timeout=1)
headers = {'Range': f'bytes={start_byte}-{end_byte}'}
print(f"[线程 {threading.current_thread().name}] 开始下载范围 {start_byte}-{end_byte}")
with requests.get(url, headers=headers, stream=True) as r:
r.raise_for_status()
with open(file_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
f.write(chunk)
print(f"[线程 {threading.current_thread().name}] 范围 {start_byte}-{end_byte} 下载完成")
task_queue.task_done()
except queue.Empty:
# 队列为空,退出线程
break
except Exception as e:
print(f"[线程 {threading.current_thread().name}] 下载出错: {e}")
task_queue.task_done()
def get_file_size(url):
"""获取文件总大小"""
res = requests.head(url)
if res.status_code == 200:
return int(res.headers.get('content-length', 0))
return 0
def main():
# 1. 获取文件大小
file_size = get_file_size(DOWNLOAD_URL)
if file_size == 0:
print("无法获取文件大小或文件不存在。")
return
file_name = os.path.basename(DOWNLOAD_URL)
temp_file_name = f"{file_name}.tmp"
print(f"文件总大小: {file_size / (1024*1024):.2f} MB")
# 2. 创建任务队列
chunk_size = file_size // NUM_WORKERS
for i in range(NUM_WORKERS):
start_byte = i * chunk_size
end_byte = (i + 1) * chunk_size - 1 if i < NUM_WORKERS - 1 else file_size
task_queue.put((DOWNLOAD_URL, temp_file_name, start_byte, end_byte))
# 3. 创建并启动工作线程
threads = []
for i in range(NUM_WORKERS):
thread = threading.Thread(target=download_worker, name=f"Downloader-{i+1}")
thread.start()
threads.append(thread)
# 4. 等待所有任务完成
task_queue.join()
# 5. 合并文件(这里简单处理,实际应用中可能需要更复杂的合并逻辑)
# 对于这个示例,我们假设下载器本身是顺序写入的,所以不需要合并。
# 但如果每个线程写入不同的临时文件,就需要在这里合并。
# 我们这里简化处理,直接重命名。
if os.path.exists(temp_file_name):
os.rename(temp_file_name, file_name)
print(f"\n所有任务完成!文件已保存为 {file_name}")
else:
print("\n下载失败,未找到临时文件。")
# 6. 等待工作线程退出(虽然它们在队列为空后会自动退出)
for t in threads:
t.join()
if __name__ == "__main__":
main()
这个示例展示了如何将一个大任务(下载一个大文件)分解成多个小任务(下载不同字节范围),然后让多个线程并行处理,最后汇总结果,这是 threading + Queue 模式的典型应用。
