杰瑞科技汇

Python threading多线程如何高效实现并发控制?

这是一个非常重要的主题,因为它关系到如何利用多核 CPU 提升程序性能,以及如何处理并发任务。

Python threading多线程如何高效实现并发控制?-图1
(图片来源网络,侵删)

目录

  1. 为什么需要多线程?
  2. Python 的 GIL (全局解释器器锁)
  3. threading 模块核心概念
    • Thread 类:创建和管理线程
    • Lock (锁):解决资源竞争问题
    • RLock (可重入锁):更安全的锁
    • Semaphore (信号量):控制资源访问数量
    • Event (事件):线程间通信
    • Queue (队列):线程安全的任务和数据交换
  4. 完整代码示例
    • 示例1:基础多线程
    • 示例2:使用 Lock 防止竞争条件
    • 示例3:生产者-消费者模型(使用 Queue
  5. 多线程 vs. 多进程
  6. 最佳实践与注意事项

为什么需要多线程?

想象一个场景:你需要下载 10 个文件,如果只用一个线程,程序会一个接一个地下载,总时间是所有文件下载时间的总和。

如果使用 10 个线程,每个线程负责下载一个文件,10 个文件可以(在一定程度上)并行下载,总时间会大大缩短。

多线程的核心用途:

  • I/O 密集型任务:当程序需要等待外部资源时,如网络请求、文件读写、数据库查询,在等待时,CPU 可以被释放去执行其他线程的任务,从而提高 CPU 的利用率。
  • 提升用户体验:在图形用户界面程序中,将耗时操作放在后台线程执行,可以避免界面卡死,保持响应。

Python 的 GIL (Global Interpreter Lock)

这是理解 Python 多线程最关键的一点!

Python threading多线程如何高效实现并发控制?-图2
(图片来源网络,侵删)

GIL 是什么? GIL 是一个互斥锁,它确保在任何时刻,一个 Python 进程中只有一个线程在执行 Python 字节码,这意味着,即使在多核处理器上,Python 的多线程也无法实现真正的并行计算。

GIL 的影响:

  • 对于 CPU 密集型 任务(如大量数学计算、图像处理),多线程并不能带来性能提升,因为同一时间只有一个线程在运行,在这种情况下,应该使用 multiprocessing 模块来创建多个进程,每个进程有自己的 Python 解释器和内存空间,可以真正地在多核上并行。
  • 对于 I/O 密集型 任务,GIL 的影响较小,当一个线程因为 I/O 操作(如 requests.get()time.sleep())而阻塞时,GIL 会被释放,其他线程就可以获得 GIL 并继续执行,这使得多线程在处理 I/O 密集型任务时非常有效。

简单总结:

  • 多线程:适合 I/O 密集型 任务。
  • 多进程:适合 CPU 密集型 任务。

threading 模块核心概念

Thread 类:创建和管理线程

这是 threading 模块最核心的类,用于创建新线程。

Python threading多线程如何高效实现并发控制?-图3
(图片来源网络,侵删)

两种创建线程的方式:

继承 Thread 类(面向对象)

import threading
import time
class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    def run(self):
        # 线程要执行的代码
        print(f"线程 {self.name} 开始")
        time.sleep(2)  # 模拟耗时操作
        print(f"线程 {self.name} 结束")
# 创建并启动线程
thread1 = MyThread("Thread-1")
thread2 = MyThread("Thread-2")
thread1.start()
thread2.start()
# 等待所有线程执行完毕
thread1.join()
thread2.join()
print("所有线程已结束")

传递一个可调用对象(函数或 lambda)

这是更常见、更简洁的方式。

import threading
import time
def worker_function(name):
    print(f"线程 {name} 开始")
    time.sleep(2)
    print(f"线程 {name} 结束")
# 创建线程对象
thread1 = threading.Thread(target=worker_function, args=("Thread-A",))
thread2 = threading.Thread(target=worker_function, args=("Thread-B",))
# 启动线程
thread1.start()
thread2.start()
# 等待线程结束
thread1.join()
thread2.join()
print("所有线程已结束")

常用方法:

  • start(): 启动线程,执行 run() 方法。
  • run(): 线程的执行体,需要重写。
  • join(): 阻塞当前线程,直到被调用的线程执行完毕,非常重要,用于同步。
  • is_alive(): 检查线程是否还在运行。

Lock (锁):解决资源竞争问题

当多个线程同时读写同一个共享变量时,可能会导致数据不一致,这就是“竞争条件”。

例子:银行账户取款

import threading
balance = 1000
def withdraw(amount):
    global balance
    # 检查余额
    if balance >= amount:
        # 模拟网络延迟,让问题更容易发生
        threading.Event().wait(0.001) 
        balance -= amount
        print(f"取款 {amount},余额为 {balance}")
    else:
        print("余额不足")
threads = []
for _ in range(10):
    t = threading.Thread(target=withdraw, args=(100,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"最终余额: {balance}") # 可能不是 0,因为多个线程同时检查了余额

使用 Lock 修复:

import threading
balance = 1000
lock = threading.Lock() # 创建一个锁
def withdraw(amount):
    global balance
    # 在访问共享资源前获取锁
    lock.acquire() 
    try:
        if balance >= amount:
            threading.Event().wait(0.001)
            balance -= amount
            print(f"取款 {amount},余额为 {balance}")
        else:
            print("余额不足")
    finally:
        # 在 finally 块中释放锁,确保锁一定会被释放
        lock.release() 
threads = []
for _ in range(10):
    t = threading.Thread(target=withdraw, args=(100,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"最终余额: {balance}") # 正确结果是 0

更推荐的方式:with 语句 with 语句会自动处理锁的获取和释放,即使在代码块中发生异常,锁也会被释放,这是最安全、最简洁的方式。

def withdraw(amount):
    global balance
    with lock: # 自动 acquire() 和 release()
        if balance >= amount:
            threading.Event().wait(0.001)
            balance -= amount
            print(f"取款 {amount},余额为 {balance}")
        else:
            print("余额不足")

RLock (可重入锁)

RLock (Reentrant Lock) 允许同一个线程多次获取锁,而不会造成死锁,这在递归函数中非常有用。

import threading
lock = threading.RLock()
def recursive_function(depth):
    with lock:
        if depth > 0:
            print(f"递归深度: {depth}")
            recursive_function(depth - 1)
recursive_function(3) # 不会死锁

Semaphore (信号量)

Semaphore 是一个计数器,用于控制同时访问某个资源的线程数量,一个只能容纳 5 人的房间(信号量初始值为 5)。

import threading
import time
# 最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def worker(worker_id):
    with semaphore:
        print(f"Worker {worker_id} 正在工作...")
        time.sleep(2)
        print(f"Worker {worker_id} 工作完成")
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

Event (事件)

Event 用于线程间的简单通信,一个线程可以通知其他某个事件已经发生。

  • event.set(): 设置事件,表示事件发生。
  • event.clear(): 清除事件,表示事件未发生。
  • event.wait(): 阻塞线程,直到事件被设置。
import threading
import time
event = threading.Event()
def waiter():
    print("Waiter: 等待事件...")
    event.wait() # 阻塞在这里
    print("Waiter: 事件已发生!继续执行。")
def setter():
    print("Setter: 准
分享:
扫描分享到社交APP
上一篇
下一篇