杰瑞科技汇

python threading使用

threading 是 Python 中用于实现多线程编程的标准库,它允许你在同一个进程中运行多个线程,从而可以“执行多个任务,特别适合用于 I/O 密集型(如网络请求、文件读写)任务。

python threading使用-图1
(图片来源网络,侵删)

为什么需要多线程?(核心动机)

在 Python 中,由于 全局解释器锁 的存在,多线程在 CPU 密集型 任务上并不能实现真正的并行执行(即多个线程在同一时刻由多个 CPU 核心同时运行),但对于 I/O 密集型 任务,情况就完全不同了。

I/O 密集型任务的特点: 程序的大部分时间都在等待 I/O 操作完成(如等待网络数据返回、等待磁盘读取文件),在这等待期间,CPU 是空闲的。

多线程在这里的作用: 当一个线程因为等待 I/O 而被阻塞时,GIL 会被释放,Python 解释器可以切换到另一个就绪的线程去执行,这样,CPU 的空闲时间就被利用起来了,看起来就像是多个任务在“进行,极大地提高了程序的执行效率。


threading 模块的核心概念与组件

1 Thread

threading.Thread 是创建和管理线程的主要工具。

python threading使用-图2
(图片来源网络,侵删)

创建线程的两种方式:

  1. 函数式创建(最简单) 创建一个函数,然后将这个函数作为 target 参数传递给 Thread 对象。

    import threading
    import time
    def worker(name):
        """线程要执行的函数"""
        print(f"Worker {name} is starting...")
        time.sleep(2)  # 模拟 I/O 操作
        print(f"Worker {name} has finished.")
    # 创建线程对象
    # target: 指定线程要执行的函数
    # args: 以元组形式传递给 target 函数的位置参数
    t1 = threading.Thread(target=worker, args=("Thread-1",))
    # 启动线程
    t1.start()
    print("Main thread continues to run...")
    # 等待线程执行完毕(非常重要!)
    t1.join()
    print("Main thread has ended.")

    输出分析: 你会看到 "Main thread continues to run..." 在 "Worker Thread-1 is starting..." 之后立即打印,因为主线程没有等待 t1 完成,而 "Main thread has ended" 会在 t1.join() 之后,也就是 t1 执行完毕后才打印。

  2. 类继承式创建(更面向对象,更灵活) 创建一个类,继承自 threading.Thread,并重写 run() 方法。

    import threading
    import time
    class MyThread(threading.Thread):
        def __init__(self, name):
            super().__init__()
            self.name = name
        def run(self):
            """线程启动时自动调用的方法"""
            print(f"{self.name} is starting...")
            time.sleep(2)
            print(f"{self.name} has finished.")
    # 创建并启动线程
    t2 = MyThread("Class-Thread-1")
    t2.start()
    print("Main thread continues to run...")
    t2.join()
    print("Main thread has ended.")

    这种方式更易于管理线程的状态和行为,适合复杂的线程逻辑。

2 线程的生命周期

一个线程从创建到销毁,会经历以下状态:

  1. New (新建): 线程对象被创建,但 start() 方法还未被调用。
  2. Runnable (就绪): start() 方法被调用,线程进入就绪队列,等待 GIL 的调度。
  3. Running (运行): 线程获得了 GIL,正在执行 run() 方法中的代码。
  4. Blocked (阻塞): 线程因为某些原因(如等待 I/O、获取锁)暂停运行,暂时让出 GIL。
  5. Terminated (终止): 线程的 run() 方法执行完毕,线程生命周期结束。

3 主线程与子线程

当你运行一个 Python 脚本时,默认就有一个“主线程”,通过 threading.Thread 创建的线程都被称为“子线程”。

threading.current_thread(): 可以获取当前正在执行的线程对象。 threading.main_thread(): 可以获取主线程对象。

import threading
import time
def show_current_thread():
    t = threading.current_thread()
    print(f"Running in thread: {t.name} (is Main: {t is threading.main_thread()})")
show_current_thread() # 显示主线程
t = threading.Thread(target=show_current_thread)
t.start()
t.join()

线程间的通信与同步

当多个线程需要访问共享资源(如一个全局变量、一个文件)时,就会引发 竞态条件,导致数据不一致或不正确,为了解决这个问题,我们需要使用同步机制。

1 锁

锁是最基本的同步工具,它确保了在任何时刻,只有一个线程可以访问被锁保护的代码块(称为“临界区”)。

  • threading.Lock(): 创建一个锁。
  • lock.acquire(): 获取锁,如果锁已经被其他线程获取,则当前线程会阻塞,直到锁被释放。
  • lock.release(): 释放锁。

重要提示: 推荐使用 with lock: 语句,它会自动处理 acquire()release(),即使发生异常也能保证锁被释放,避免死锁。

示例:不使用锁(竞态条件)

import threading
counter = 0
threads = []
def increment():
    global counter
    for _ in range(100000):
        counter += 1 # 这不是一个原子操作, read-modify-write
for i in range(5):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"Final counter value: {counter}") # 结果几乎总是小于 500000

示例:使用锁(解决竞态条件)

import threading
counter = 0
lock = threading.Lock()
threads = []
def increment_with_lock():
    global counter
    for _ in range(100000):
        with lock: # 获取锁,执行完代码块后自动释放
            counter += 1
for i in range(5):
    t = threading.Thread(target=increment_with_lock)
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"Final counter value: {counter}") # 结果总是 500000

2 其他同步原语

除了 Lockthreading 模块还提供了其他有用的同步工具:

  • RLock (可重入锁): 允许同一个线程多次获取同一个锁,避免死锁,适用于递归函数。
  • Semaphore (信号量): 控制同时访问某个资源的线程数量,可以看作是“多个许可”的锁。
  • Event (事件): 一个简单的线程间通信机制,一个线程可以发出一个事件信号,其他线程可以等待这个信号。
  • Condition (条件变量): 允许线程在某些条件满足时才继续执行,常与锁配合使用,实现复杂的线程间协作。

线程间数据共享与通信

1 共享数据

共享数据(如全局变量、列表、字典)是线程间通信最直接的方式,但也是最容易出问题的方式。必须配合锁等同步机制来使用

import threading
shared_data = []
lock = threading.Lock()
def append_data(value):
    with lock:
        shared_data.append(value)
        print(f"{threading.current_thread().name} appended {value}")
threads = []
for i in range(5):
    t = threading.Thread(target=append_data, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"Final shared_data: {shared_data}")

2 Queue (线程安全的队列)

queue.Queue 是多线程编程中 最推荐、最安全 的数据交换方式,它是一个线程安全的队列,一个线程可以把数据放入队列,另一个线程可以从队列中取出数据。Queue 内部已经实现了必要的锁,你无需手动加锁。

  • put(item): 向队列中放入一个数据,如果队列已满,则阻塞。
  • get(): 从队列中取出一个数据,如果队列为空,则阻塞。
  • task_done(): 当消费者完成一项任务后调用,通常用于 join() 方法。
  • join(): 阻塞主线程,直到队列中的所有任务都被处理完毕(即每个 put 的项目都有对应的 task_done)。
  • qsize(): 返回队列中大致的项目数。

生产者-消费者模型示例

import threading
import queue
import time
import random
# 创建一个线程安全的队列
q = queue.Queue()
def producer():
    """生产者线程"""
    for i in range(5):
        item = f"Product-{i}"
        print(f"Producer is producing {item}")
        q.put(item)
        time.sleep(random.random()) # 模拟生产耗时
    print("Producer finished.")
def consumer():
    """消费者线程"""
    while True:
        try:
            # 设置超时,避免永久阻塞
            item = q.get(timeout=1)
            print(f"Consumer is consuming {item}")
            time.sleep(random.random()) # 模拟消费耗时
            q.task_done() # 标记任务完成
        except queue.Empty:
            print("Queue is empty, consumer exiting.")
            break
# 创建并启动线程
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start()
c.start()
# 等待生产者完成
p.join()
# 等待队列中所有任务被消费者处理完毕
q.join()
print("All products have been consumed.")
# 确保消费者线程也能退出
# 在这个简单例子中,消费者会因为超时而退出

高级主题与最佳实践

1 线程池

手动创建和管理大量线程(如成百上千个)会消耗大量资源,并且难以控制。concurrent.futures.ThreadPoolExecutor 提供了一个更高级、更易用的线程池接口。

import concurrent.futures
import time
def do_task(task_id):
    print(f"Task {task_id} is starting...")
    time.sleep(2)
    print(f"Task {task_id} is finished.")
    return f"Result of task {task_id}"
# 创建一个包含 4 个线程的线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # 提交多个任务到线程池
    futures = [executor.submit(do_task, i) for i in range(8)]
    # 等待所有任务完成并获取结果
    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result()
            print(f"Got: {result}")
        except Exception as e:
            print(f"A task generated an exception: {e}")
print("All tasks done in the thread pool.")

2 线程本地存储

threading.local() 为每个线程创建一个独立的存储空间,即使多个线程访问同一个 local 对象,它们也只能看到和修改自己存储的数据,而不会互相干扰,这对于避免在多线程环境下传递大量上下文数据非常有用。

import threading
local_data = threading.local()
def process_data(user_id):
    local_data.user_id = user_id # 将数据存储在本地
    print(f"Thread {threading.current_thread().name} is processing for user {local_data.user_id}")
    # ... 其他操作,都可以使用 local_data.user_id
threads = []
for i in range(3):
    t = threading.Thread(target=process_data, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

3 停止线程

Python 没有直接强制终止线程的 API(_stop() 是危险的,已废弃),正确的做法是使用 “哨兵值”事件 来通知线程应该退出了。

使用事件 Event 示例:

import threading
import time
stop_event = threading.Event()
def worker():
    print("Worker is running...")
    while not stop_event.is_set(): # 检查事件是否被设置
        print("Worker is working...")
        time.sleep(1)
    print("Worker is stopping...")
t = threading.Thread(target=worker)
t.start()
time.sleep(5)
print("Main thread is signaling worker to stop...")
stop_event.set() # 设置事件,通知线程退出
t.join()
print("Worker has stopped.")

总结与最佳实践

场景 推荐方案 原因
简单的并行任务 threading.Thread + join() 直观,易于理解。
生产者-消费者模型 queue.Queue 线程安全,设计经典,解耦生产者和消费者。
大量并发任务 concurrent.futures.ThreadPoolExecutor 避免了手动管理线程的复杂性,复用线程,效率高。
需要线程独立状态 threading.local() 隔离线程数据,避免冲突。
需要优雅地停止线程 threading.Event哨兵值 安全、可控,避免强制终止带来的资源问题。
CPU 密集型任务 multiprocessing 模块 绕过 GIL,实现真正的多核并行。

核心原则:

  1. 优先考虑 Queue:在线程间传递数据时,queue.Queue 是你的首选。
  2. 锁要最小化:只在保护共享数据的临界区使用锁,并尽量缩短持有锁的时间。
  3. 使用 with 语句管理锁:确保锁一定会被释放,防止死锁。
  4. 避免共享可变状态:最好的同步是没有同步,尽量让每个线程操作自己的数据。
  5. 了解 GIL:Python 多线程在 CPU 密集型任务上的局限性,并知道何时该选择多进程。
分享:
扫描分享到社交APP
上一篇
下一篇