杰瑞科技汇

Python threading 如何高效使用?

目录

  1. 为什么需要多线程? - 理解多线程的用途
  2. threading 模块核心概念 - 线程、锁、守护线程
  3. threading 基础用法 - 从创建到启动
  4. 线程同步:Lock (锁) - 解决数据竞争问题
  5. 线程同步:RLock (可重入锁) - 处理同一线程的多次加锁
  6. 线程同步:Semaphore (信号量) - 控制访问资源的线程数量
  7. 线程间通信:Queue (队列) - 安全地在线程间传递数据
  8. 守护线程 (Daemon Thread) - 主线程退出时自动结束的线程
  9. 线程池 (ThreadPoolExecutor) - 现代化的线程管理方式
  10. 重要陷阱与注意事项
    • 全局解释器锁
    • 避免使用 kill 命令
    • 注意资源清理
  11. 完整示例:一个多线程下载器

为什么需要多线程?

想象一下你正在一家餐厅点餐,传统单线程的方式就像只有一个服务员:他先记下你的点单,然后去厨房,等菜做好再端回来,期间不能服务其他任何客人,效率非常低。

Python threading 如何高效使用?-图1
(图片来源网络,侵删)

多线程就像餐厅雇佣了多个服务员(线程):

  • 线程 1 接受你的点单。
  • 线程 2 同时为另一桌客人服务。
  • 线程 3 去厨房取菜。

在编程中,多线程主要有两个用途:

  1. I/O 密集型任务:当程序需要等待外部资源(如网络请求、文件读写、数据库查询)时,CPU 是空闲的,此时启动另一个线程去执行其他任务,可以充分利用 CPU 的等待时间,提高程序的并发处理能力,这是 Python 多线程最常用且最有效的场景。
  2. CPU 密集型任务:当程序需要进行大量的计算(如科学计算、图像处理)时,多线程并不能真正实现并行,因为 Python 的 GIL(见第10节)的存在,但有时可以通过多线程让计算任务和界面响应同时进行(在 GUI 程序中,一个线程负责计算,另一个线程负责保持界面不卡死)。

threading 模块核心概念

  • Threadthreading 模块的核心,用于创建和控制线程。
  • Lock (锁):一种同步原语,用于保护共享资源,防止多个线程同时修改它导致数据错乱(即“竞态条件”)。
  • RLock (可重入锁):一种特殊的锁,同一个线程可以多次获取它而不会造成死锁。
  • Semaphore (信号量):一个计数器,用于控制同时访问某个特定资源的线程数量。
  • Queue (队列):一个线程安全的队列,是生产者-消费者模型的完美实现,可以安全地在多个线程之间传递数据。
  • Daemon Thread (守护线程):一个“后台”线程,当主程序(主线程)结束时,所有守护线程都会被强制终止,垃圾回收线程通常就是守护线程。

threading 基础用法

创建和启动线程有两种主要方式。

继承 Thread 类 (面向对象风格)

import threading
import time
class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    def run(self):
        """线程要执行的代码放在 run 方法中"""
        print(f"线程 {self.name} 开始运行")
        time.sleep(2)  # 模拟耗时操作
        print(f"线程 {self.name} 运行结束")
# 创建线程实例
thread1 = MyThread("线程-A")
thread2 = MyThread("线程-B")
# 启动线程
thread1.start()
thread2.start()
# 等待所有线程执行完毕
thread1.join()
thread2.join()
print("主线程结束")

传递一个函数 (函数式风格,更常用)

import threading
import time
def worker(name):
    """线程要执行的函数"""
    print(f"线程 {name} 开始运行")
    time.sleep(2)
    print(f"线程 {name} 运行结束")
# 创建线程实例,target 是目标函数,args 是传递给函数的参数元组
thread1 = threading.Thread(target=worker, args=("线程-A",))
thread2 = threading.Thread(target=worker, args=("线程-B",))
# 启动线程
thread1.start()
thread2.start()
# 等待所有线程执行完毕
thread1.join()
thread2.join()
print("主线程结束")

输出 (两种方式都类似):

Python threading 如何高效使用?-图2
(图片来源网络,侵删)
线程 线程-A 开始运行
线程 线程-B 开始运行
(等待2秒后...)
线程 线程-A 运行结束
线程 线程-B 运行结束
主线程结束

关键点:

  • start(): 启动线程,这会自动调用 run() 方法。
  • run(): 线程执行体的入口,直接调用 run() 不会创建新线程,而是在当前线程中执行。
  • join(): 阻塞主线程,直到调用 join() 的线程执行完毕,这是实现“等待”的关键。

线程同步:Lock (锁)

当多个线程访问和修改同一个共享变量时,会发生竞态条件

不加锁的错误示例:

import threading
counter = 0
threads = []
def increment():
    global counter
    for _ in range(100000):
        counter += 1 # 这一步不是原子操作,包含了"读取-修改-写入"
for i in range(5):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()
for t in threads:
    t.join()
print(f"最终计数器值: {counter}") # 预期是 500000,但实际结果几乎总是小于它

使用 Lock 修复:

Python threading 如何高效使用?-图3
(图片来源网络,侵删)
import threading
counter = 0
lock = threading.Lock() # 创建一个锁
threads = []
def increment():
    global counter
    for _ in range(100000):
        with lock: # 使用 with 语句获取锁,代码块执行完后自动释放锁
            counter += 1
for i in range(5):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()
for t in threads:
    t.join()
print(f"最终计数器值: {counter}") # 输出 500000

with lock: 是获取锁的推荐方式,因为它能确保锁一定会被释放(即使在代码块中发生异常),手动使用 lock.acquire()lock.release() 也可以,但容易忘记释放而导致死锁。


线程同步:RLock (可重入锁)

如果在一个线程中,同一个锁被多次获取,Lock 会导致死锁。

Lock 导致的死锁示例:

import threading
lock = threading.Lock()
def worker():
    print("Worker 尝试获取锁...")
    with lock:
        print("Worker 已获取锁")
        # 在这里再次尝试获取同一个锁
        with lock: # 会卡住,因为锁已经被自己获取,无法再次获取
            print("Worker 尝试再次获取锁 (会失败)")
    print("Worker 释放锁")
thread = threading.Thread(target=worker)
thread.start()
thread.join()
# 程序会卡住,无法退出

使用 RLock 解决:

import threading
lock = threading.RLock() # 使用 RLock
def worker():
    print("Worker 尝试获取锁...")
    with lock:
        print("Worker 已获取锁")
        # RLock 允许同一个线程多次获取
        with lock:
            print("Worker 再次获取锁成功")
    print("Worker 释放锁")
thread = threading.Thread(target=worker)
thread.start()
thread.join()
# 程序正常运行

线程同步:Semaphore (信号量)

假设一个公共厕所只有3个坑位,最多只能有3个人同时使用。Semaphore 就可以实现这个限制。

import threading
import time
# 信号量,表示最多有3个线程可以同时执行
semaphore = threading.Semaphore(3)
def worker(id):
    print(f"线程 {id} 等待进入...")
    with semaphore: # 获取信号量,如果满了则等待
        print(f"线程 {id} 已进入,开始工作...")
        time.sleep(2) # 模拟工作
    print(f"线程 {id} 工作完成,离开")
threads = []
for i in range(10):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()
for t in threads:
    t.join()
print("所有线程工作完毕")

输出分析: 前3个线程会立即进入,第4个及之后的线程会等待,每隔2秒,一个线程完成工作,下一个等待的线程才能进入。


线程间通信:Queue (队列)

直接使用共享变量需要 Lock 来保护,非常繁琐且容易出错。Queue 是线程安全的,是线程间通信的最佳选择。

生产者-消费者模型示例:

import threading
import time
import queue
# 创建一个线程安全的队列,最大容量为5
q = queue.Queue(maxsize=5)
def producer():
    """生产者,向队列中放入数据"""
    for i in range(10):
        print(f"生产者: 准备生产数据 {i}")
        q.put(f"数据-{i}") # 如果队列满了,put() 会阻塞
        print(f"生产者: 数据 {i} 已放入队列")
        time.sleep(0.5)
def consumer():
    """消费者,从队列中取出数据"""
    while True:
        print(f"消费者: 等待从队列取数据...")
        data = q.get() # 如果队列空了,get() 会阻塞
        print(f"消费者: 取到 {data},开始处理...")
        time.sleep(1)
        # 标记任务完成,对于队列的 get() 是必须的
        q.task_done()
# 创建并启动线程
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer, daemon=True) # 将消费者设为守护线程
p.start()
c.start()
# 等待生产者完成
p.join()
# 等待队列中所有任务被处理完毕
q.join() 
print("所有数据已被处理,主线程结束")

关键点:

  • queue.put(item): 向队列中添加一个元素,如果队列已满,则阻塞。
  • queue.get(): 从队列中移除并返回一个元素,如果队列已空,则阻塞。
  • queue.task_done(): 消费者在处理完一个 get() 来的元素后调用,通知队列该任务已完成。
  • queue.join(): 阻塞主线程,直到队列中所有的元素都被处理完毕(即 task_done() 被调用了与队列元素数量相同的次数)。

守护线程 (Daemon Thread)

守护线程的生命周期依赖于主线程,当主线程结束时,所有守护线程都会被 Python 解释器强行终止,无论它们是否执行完毕。

示例:

import threading
import time
def daemon_task():
    print("守护线程启动")
    while True:
        print("守护线程正在运行...")
        time.sleep(1)
def regular_task():
    print("普通线程启动")
    time.sleep(3)
    print("普通线程结束")
d = threading.Thread(target=daemon_task, daemon=True)
r = threading.Thread(target=regular_task)
d.start()
r.start()
# 主线程什么都不做,等待普通线程 r 结束
# r 结束后,主线程也随之结束,守护线程 d 会被强制终止

注意: 守护线程中不能有需要清理的资源(如打开的文件、数据库连接),因为它们可能被突然中断。


线程池 (ThreadPoolExecutor)

手动创建和管理大量线程非常麻烦。concurrent.futures.ThreadPoolExecutor 提供了更高级、更现代的方式来管理线程池。

示例:

import concurrent.futures
import time
def task(n):
    print(f"任务 {n} 开始执行")
    time.sleep(n)
    print(f"任务 {n} 执行完毕")
    return n * n
# 创建一个包含5个工作线程的线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # 创建10个任务,并提交到线程池
    futures = [executor.submit(task, i) for i in range(1, 11)]
    print("所有任务已提交,等待结果...")
    # 可以遍历 future 对象来获取结果
    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result()
            print(f"得到结果: {result}")
        except Exception as e:
            print(f"任务出错: {e}")
print("所有任务处理完毕")

优点:

  • 自动管理:自动创建和销毁线程。
  • 任务提交:通过 submit 提交任务,返回 Future 对象,可以用来获取结果或检查状态。
  • 获取结果as_completed 会按任务完成的顺序返回 Future 对象,result() 可以获取返回值。
  • 优雅关闭with 语句确保线程池在使用完毕后被正确关闭。

重要陷阱与注意事项

全局解释器锁

GIL (Global Interpreter Lock) 是 Python 解释器(特别是 CPython)的一个互斥锁,它确保任何时候都只有一个线程在执行 Python 字节码。

  • 对 I/O 密集型任务的影响:几乎无影响,当一个线程在等待 I/O(如网络请求)时,GIL 会被释放,允许其他线程运行。
  • 对 CPU 密集型任务的影响:非常大,由于 GIL 的存在,即使在多核 CPU 上,Python 的多线程也无法实现真正的并行计算,它只是在不同线程间快速切换,但同一时刻只有一个线程在运行。

对于 CPU 密集型任务,应使用 multiprocessing 模块来利用多核 CPU,对于 I/O 密集型任务,threading 依然是最佳选择。

避免使用 kill 命令

不要使用 kill -9 (SIGKILL) 来终止 Python 线程,这会导致线程被强制杀死,资源(如文件、锁)可能无法被正确释放,导致程序状态不一致。

  • 优雅终止:可以通过设置一个“停止事件”(threading.Event)来通知线程自行退出。
  • daemon 线程:对于可以随时被丢弃的后台任务,可以使用守护线程。

注意资源清理

确保在线程结束或异常时,打开的文件、数据库连接、网络套接字等资源能够被正确关闭。try...finally 结构或 with 语句是很好的实践。


完整示例:一个多线程下载器

这个示例将结合 threadingQueue 来实现一个简单的多线程文件下载器。

import threading
import queue
import requests
import os
# --- 配置 ---
DOWNLOAD_URL = "http://example.com/largefile.zip" # 替换为一个大文件的真实URL
NUM_WORKERS = 5  # 下载线程数
CHUNK_SIZE = 1024 * 1024  # 每次下载 1MB
# --- 全局变量 ---
task_queue = queue.Queue()
lock = threading.Lock()
def download_worker():
    """工作线程,负责从队列中获取任务并下载"""
    while True:
        try:
            # 从队列获取任务,设置超时以避免永久阻塞
            url, file_path, start_byte, end_byte = task_queue.get(timeout=1)
            headers = {'Range': f'bytes={start_byte}-{end_byte}'}
            print(f"[线程 {threading.current_thread().name}] 开始下载范围 {start_byte}-{end_byte}")
            with requests.get(url, headers=headers, stream=True) as r:
                r.raise_for_status()
                with open(file_path, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
                        if chunk:
                            f.write(chunk)
            print(f"[线程 {threading.current_thread().name}] 范围 {start_byte}-{end_byte} 下载完成")
            task_queue.task_done()
        except queue.Empty:
            # 队列为空,退出线程
            break
        except Exception as e:
            print(f"[线程 {threading.current_thread().name}] 下载出错: {e}")
            task_queue.task_done()
def get_file_size(url):
    """获取文件总大小"""
    res = requests.head(url)
    if res.status_code == 200:
        return int(res.headers.get('content-length', 0))
    return 0
def main():
    # 1. 获取文件大小
    file_size = get_file_size(DOWNLOAD_URL)
    if file_size == 0:
        print("无法获取文件大小或文件不存在。")
        return
    file_name = os.path.basename(DOWNLOAD_URL)
    temp_file_name = f"{file_name}.tmp"
    print(f"文件总大小: {file_size / (1024*1024):.2f} MB")
    # 2. 创建任务队列
    chunk_size = file_size // NUM_WORKERS
    for i in range(NUM_WORKERS):
        start_byte = i * chunk_size
        end_byte = (i + 1) * chunk_size - 1 if i < NUM_WORKERS - 1 else file_size
        task_queue.put((DOWNLOAD_URL, temp_file_name, start_byte, end_byte))
    # 3. 创建并启动工作线程
    threads = []
    for i in range(NUM_WORKERS):
        thread = threading.Thread(target=download_worker, name=f"Downloader-{i+1}")
        thread.start()
        threads.append(thread)
    # 4. 等待所有任务完成
    task_queue.join()
    # 5. 合并文件(这里简单处理,实际应用中可能需要更复杂的合并逻辑)
    # 对于这个示例,我们假设下载器本身是顺序写入的,所以不需要合并。
    # 但如果每个线程写入不同的临时文件,就需要在这里合并。
    # 我们这里简化处理,直接重命名。
    if os.path.exists(temp_file_name):
        os.rename(temp_file_name, file_name)
        print(f"\n所有任务完成!文件已保存为 {file_name}")
    else:
        print("\n下载失败,未找到临时文件。")
    # 6. 等待工作线程退出(虽然它们在队列为空后会自动退出)
    for t in threads:
        t.join()
if __name__ == "__main__":
    main()

这个示例展示了如何将一个大任务(下载一个大文件)分解成多个小任务(下载不同字节范围),然后让多个线程并行处理,最后汇总结果,这是 threading + Queue 模式的典型应用。

分享:
扫描分享到社交APP
上一篇
下一篇