杰瑞科技汇

Python多线程threading,线程安全如何保障?

threading 是 Python 中用于实现多线程编程的标准库,它允许你在单个进程中同时运行多个线程,从而可以执行多个任务,特别适合处理 I/O 密集型(如网络请求、文件读写)操作。

Python多线程threading,线程安全如何保障?-图1
(图片来源网络,侵删)

为什么需要多线程?(核心动机)

想象一下你正在餐厅点餐:

  • 单线程:你一个人点餐,从点餐、等餐、吃完、买单,再到下一位客人,如果等餐需要很长时间(20 分钟),整个餐厅的效率都会被拖慢。
  • 多线程:你点完餐后,服务员可以去服务其他客人,你的餐点由厨师(另一个“线程”)在后台准备,这样,即使你在等餐,餐厅的其他服务(如点餐、上菜)依然可以并行进行,大大提高了效率。

在编程中,这种“等待”I/O 操作

  • 从网络下载一个文件(大部分时间在等待服务器响应)。
  • 读取一个大文件(大部分时间在等待磁盘 I/O)。

在这些等待期间,CPU 是空闲的,多线程可以让一个线程在等待 I/O 时,切换到另一个线程执行计算任务,从而充分利用 CPU 时间,提高程序的响应速度和吞吐量。


threading 模块基础

1 创建和启动线程

最基本的方式是创建一个 Thread 对象,并传入一个函数作为目标。

import threading
import time
def worker(num):
    """线程要执行的函数"""
    print(f"Worker {num}: 开始工作")
    time.sleep(2)  # 模拟 I/O 操作
    print(f"Worker {num}: 工作完成")
# 创建线程对象
# target: 线程要执行的函数
# args: 传递给 target 函数的参数元组
t1 = threading.Thread(target=worker, args=(1,))
t2 = threading.Thread(target=worker, args=(2,))
# 启动线程 (调用 run() 方法)
# 这不会阻塞主线程,主线程会继续往下执行
t1.start()
t2.start()
# 等待所有线程执行完毕 (可选)
# 如果没有 join(),主线程可能会在子线程完成前就退出
t1.join()
t2.join()
print("主线程结束")

输出结果:

Worker 1: 开始工作
Worker 2: 开始工作
# (等待2秒)
Worker 1: 工作完成
Worker 2: 工作完成
主线程结束

注意: t1.start()t2.start() 几乎是同时被调用的,所以两个 "开始工作" 的打印语句几乎同时出现。

2 join() 方法详解

join() 是一个非常重要的方法,它的作用是“阻塞当前线程,直到被调用的那个线程执行完毕”。

  • 没有 join() 的情况:主线程启动子线程后,会立即继续执行自己的代码,不会等待子线程。
  • join() 的情况:主线程执行到 t1.join() 时,会暂停,直到 t1 线程执行完毕,然后才会执行 t2.join(),暂停直到 t2 完成。

这确保了在主线程退出前,所有子线程都有机会完成它们的工作。

3 继承 threading.Thread

除了函数式创建,你还可以通过继承 Thread 类来创建线程,这种方式更面向对象,可以方便地在类中管理线程的状态。

import threading
import time
class MyThread(threading.Thread):
    def __init__(self, thread_id):
        super().__init__()
        self.thread_id = thread_id
    def run(self):
        """必须重写 run() 方法,这是线程的执行体"""
        print(f"线程 {self.thread_id} 启动")
        time.sleep(1)
        print(f"线程 {self.thread_id} 结束")
# 创建并启动线程
threads = []
for i in range(3):
    thread = MyThread(i)
    threads.append(thread)
    thread.start()
# 等待所有线程完成
for thread in threads:
    thread.join()
print("所有线程执行完毕")

线程同步与锁

当多个线程需要访问和修改同一个共享资源时,就会发生 竞态条件,这可能导致数据不一致或程序崩溃。

经典例子:银行取款

import threading
# 共享资源
balance = 100
def withdraw(amount):
    global balance
    if balance >= amount:
        # 模拟网络延迟或处理时间
        # 另一个线程可能会趁虚而入
        time.sleep(0.1) 
        balance -= amount
        print(f"取款成功,余额: {balance}")
    else:
        print("余额不足")
# 两个人同时取款
t1 = threading.Thread(target=withdraw, args=(50,))
t2 = threading.Thread(target=withdraw, args=(70,))
t1.start()
t2.start()
t1.join()
t2.join()

可能出现的错误输出:

取款成功,余额: 50  (t1 检查时 balance=100, t2 检查时 balance=100)
取款成功,余额: -20 (t2 执行时,t1 还没来得及扣款)

这显然是错误的,为了避免这种情况,我们需要引入

1 threading.Lock

锁是一个同步原语,它确保一次只有一个线程可以访问共享资源。

  • acquire(): 获取锁,如果锁已经被其他线程获取,则当前线程会阻塞,直到锁被释放。
  • release(): 释放锁。

使用锁修复上面的例子:

import threading
import time
balance = 100
# 创建一个锁对象
lock = threading.Lock()
def withdraw(amount):
    global balance
    # 在访问共享资源前,尝试获取锁
    lock.acquire() 
    try:
        if balance >= amount:
            time.sleep(0.1) 
            balance -= amount
            print(f"取款成功,余额: {balance}")
        else:
            print("余额不足")
    finally:
        # 在 finally 块中确保锁一定会被释放,即使发生异常
        lock.release()
t1 = threading.Thread(target=withdraw, args=(50,))
t2 = threading.Thread(target=withdraw, args=(70,))
t1.start()
t2.start()
t1.join()
t2.join()

正确的输出:

取款成功,余额: 50  (t1 获取了锁)
余额不足         (t2 等待 t1 释放锁后,再次检查 balance=50)

无论哪个线程先获取锁,另一个线程都必须等待,从而保证了数据的安全性。

更推荐的方式:with 语句

with 语句可以自动处理锁的获取和释放,即使发生异常,也能保证锁被释放,代码更简洁、安全。

def withdraw(amount):
    global balance
    with lock:  # 等同于 lock.acquire() ... lock.release()
        if balance >= amount:
            time.sleep(0.1)
            balance -= amount
            print(f"取款成功,余额: {balance}")
        else:
            print("余额不足")

线程通信

线程间需要交换数据,Python 提供了多种线程安全的队列来实现线程间的通信,其中最常用的是 queue.Queue

queue.Queue 是一个先进先出的数据结构,并且是线程安全的,非常适合在生产者和消费者模型中使用。

示例:生产者-消费者模型

import threading
import queue
import time
import random
# 创建一个线程安全的队列
q = queue.Queue()
def producer():
    """生产者线程:向队列中添加数据"""
    for i in range(5):
        item = f"产品-{i}"
        print(f"生产者: 正在生产 {item}")
        q.put(item)  # 将数据放入队列
        time.sleep(random.random()) # 模拟生产耗时
    print("生产者: 生产完毕,退出")
def consumer():
    """消费者线程:从队列中取出数据"""
    while True:
        try:
            # get() 会阻塞,直到队列中有数据
            item = q.get(timeout=2) # 设置超时,避免永久阻塞
            print(f"消费者: 正在消费 {item}")
            time.sleep(random.random()) # 模拟消费耗时
            q.task_done() # 标记任务已完成
        except queue.Empty:
            print("消费者: 队列为空,等待超时,退出")
            break
# 创建并启动线程
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
# 等待生产者完成
p.join()
# 等待队列中的所有任务被处理完毕
q.join() 
print("主线程: 所有任务已完成,程序退出")

queue.Queue 的常用方法:

  • q.put(item): 向队列中添加一个项目。
  • q.get(): 从队列中获取一个项目,如果队列为空,它会阻塞。
  • q.get_nowait(): 尝试获取一个项目,如果队列为空,则立即抛出 queue.Empty 异常。
  • q.task_done(): 当一个消费者完成从队列中获取的项目处理后调用它。
  • q.join(): 阻塞,直到队列中的所有项目都被处理(即,对于每个已放入队列的项目,都调用了一次 task_done())。

GIL (Global Interpreter Lock) - 全局解释器锁

这是理解 Python 多线程最关键的一点。

GIL 是什么? GIL 是 CPython 解释器(Python 的标准实现)的一个互斥锁,它确保在任何时刻,只有一个线程可以执行 Python 字节码。

GIL 的影响:

  1. CPU 密集型任务:对于纯计算任务(如大量数学运算),多线程并不能利用多核 CPU 的优势,因为同一时间只有一个线程能在运行,GIL 会在线程之间快速切换,但并不能真正并行执行,在这种情况下,多进程 (multiprocessing) 是更好的选择。
  2. I/O 密集型任务:当线程因为等待 I/O(如网络请求、文件读写)而阻塞时,GIL 会被释放,允许其他线程运行,对于 I/O 密集型任务,多线程仍然非常有效,可以显著提高程序的效率。
  • threading 处理 I/O 密集型任务(如爬虫、Web 服务器)。
  • multiprocessing 处理 CPU 密集型任务(如科学计算、图像处理)。

完整示例:并发下载器

这个例子结合了 threadingqueue,实现一个简单的并发下载器。

import threading
import queue
import requests
import time
# 1. 准备任务队列
# 这里用一些示例图片链接
image_urls = [
    "https://www.python.org/static/community_logos/python-logo-master-v3-TM.png",
    "https://www.djangoproject.com/m/img/logos/django-logo-negative.png",
    "https://flask.palletsprojects.com/en/2.2.x/_static/flask-icon.png",
    "https://www.fastapi.tiangolo.com/img/logo-margin/logo-teal.png"
]
# 2. 创建线程安全的队列
download_queue = queue.Queue()
# 3. 生产者:将所有 URL 放入队列
def producer():
    for url in image_urls:
        download_queue.put(url)
    print("生产者: 所有 URL 已加入队列")
# 4. 消费者:从队列中取出 URL 并下载
def consumer():
    while True:
        try:
            url = download_queue.get_nowait()
            print(f"消费者: 开始下载 {url.split('/')[-1]}")
            response = requests.get(url, timeout=5)
            if response.status_code == 200:
                filename = url.split('/')[-1]
                with open(filename, 'wb') as f:
                    f.write(response.content)
                print(f"消费者: {filename} 下载完成")
            else:
                print(f"消费者: 下载 {url.split('/')[-1]} 失败,状态码: {response.status_code}")
            download_queue.task_done()
        except queue.Empty:
            print("消费者: 队列已空,退出")
            break
# 5. 创建并启动线程
num_consumers = 3  # 3个下载线程
threads = []
# 启动生产者
producer_thread = threading.Thread(target=producer)
threads.append(producer_thread)
producer_thread.start()
# 启动消费者
for _ in range(num_consumers):
    consumer_thread = threading.Thread(target=consumer)
    threads.append(consumer_thread)
    consumer_thread.start()
# 6. 等待所有任务完成
producer_thread.join()
download_queue.join() # 等待队列清空
# 等待所有消费者线程结束(虽然它们在队列为空后会自己退出)
for t in threads:
    if t != producer_thread: # 生产者已经结束了
        t.join()
print("主线程: 所有下载任务完成!")

高级主题

  • threading.Event: 线程间的事件通知机制,一个线程可以等待一个事件的发生,另一个线程在特定条件满足时可以设置该事件,从而唤醒等待的线程。
  • threading.Semaphore: 信号量,用于控制同时访问某个特定资源的线程数量,限制一个只能容纳 3 人的房间(信号量为 3)。
  • threading.Timer: 定时器,在指定的时间后执行一个函数。
特性 描述 适用场景
创建线程 threading.Thread(target=func, args=()) 或继承 Thread 任何需要多线程的场景
启动线程 thread.start() 启动线程执行
等待线程 thread.join() 确保主线程在子线程完成后才退出
同步机制 threading.Lock 保护共享资源,防止竞态条件
线程通信 queue.Queue 安全地在线程间传递数据(生产者-消费者模型)
核心限制 GIL I/O 密集型任务效果好,CPU 密集型任务效果差

threading 是 Python 并发编程的重要工具,掌握它对于编写高效、响应迅速的应用程序至关重要。

分享:
扫描分享到社交APP
上一篇
下一篇