杰瑞科技汇

Java线程中wait/notify如何正确协作?

核心思想:生产者-消费者模型

想象一个场景:一个线程(生产者)负责生产数据并存入一个共享的容器(比如一个队列),另一个线程(消费者)负责从容器中取出数据并处理。

Java线程中wait/notify如何正确协作?-图1
(图片来源网络,侵删)

这里就存在一个天然的协作关系

  • 如果容器满了,生产者就必须等待,直到消费者取走一些数据,腾出空间。
  • 如果容器空了,消费者就必须等待,直到生产者放入一些新数据。

wait()notify() 就是用来解决这种“等待-通知”问题的机制。


wait() 方法

当一个线程调用某个共享对象(Object)的 wait() 方法时,它会做以下几件事:

  1. 释放锁:该线程会立即释放它对该共享对象的
  2. 进入等待队列:该线程会进入该共享对象的等待队列,并进入阻塞状态,不再参与 CPU 竞争。
  3. 等待被唤醒:它会一直等待,直到其他线程调用了该对象的 notify()notifyAll() 方法。

wait() 的三种形式

  1. void wait(): 无限等待,直到被 notify()notifyAll() 唤醒。
  2. void wait(long timeout): 等待指定的毫秒数,如果超时了还没被唤醒,线程会自动苏醒。
  3. void wait(long timeout, int nanos): 更精确的超时等待,纳秒级。

为什么必须在 synchronized 代码块中调用 wait()

Java线程中wait/notify如何正确协作?-图2
(图片来源网络,侵删)

这是由 Java 的内存模型和锁机制决定的。wait() 的作用是让当前线程释放锁并等待,如果没有 synchronized,你根本就没有锁可以释放。wait() 的执行依赖于对象的监视器(锁),只有获取了锁的线程才能操作这个监视器。

wait()sleep() 的区别

特性 wait() sleep()
所属类 Object Thread
锁的释放 会释放对象锁 不会释放任何锁
唤醒方式 必须由 notify()/notifyAll() 唤醒 时间到自动苏醒,或被 interrupt() 中断
使用位置 必须在 synchronized 代码块/方法中 可以在任何地方调用
本质 线程间通信/协作机制 线程休眠/暂停机制

notify()notifyAll() 方法

这两个方法也必须在 synchronized 代码块/方法中调用,因为它们也依赖于对象的锁。

notify()

  • 作用:随机唤醒一个正在等待该对象锁的线程。
  • 如何选择:JVM 内部会随机选择一个线程(通常不保证是哪个)从等待队列中移出,并将其放入锁的竞争队列(就绪状态)中,去重新竞争锁。
  • 特点“唤醒一个”,但不确定是哪一个,这可能会导致“信号丢失”或“线程饥饿”问题。

notifyAll()

  • 作用:唤醒所有正在等待该对象锁的线程。
  • 如何选择:将等待队列中的所有线程都移出,放入锁的竞争队列中。
  • 特点“唤醒全部”,这些被唤醒的线程会重新竞争锁,最终只有一个能成功获取锁并继续执行,其他的则继续等待。

notify() vs notifyAll() 的选择

Java线程中wait/notify如何正确协作?-图3
(图片来源网络,侵删)
  • 使用 notifyAll() 更安全:它能确保所有等待的线程都能得到通知,避免了因随机唤醒导致某些线程永远得不到信号的问题,虽然可能会导致“惊群效应”(多个线程被唤醒去竞争锁,但只有一个能成功,其他又得回去等待),但在大多数复杂场景下,notifyAll() 是更可靠的选择。
  • 使用 notify() 更高效:如果你能绝对确定只有一个等待的线程需要被唤醒(在经典的“一对一”的生产者-消费者模型中),notify() 会更高效,因为它避免了不必要的线程竞争。

代码示例:生产者-消费者模型

下面是一个使用 wait()notifyAll() 实现的生产者-消费者模型,我们使用一个 List 作为共享的“仓库”。

import java.util.ArrayList;
import java.util.List;
// 共享的仓库
class Warehouse {
    private final int capacity; // 仓库容量
    private final List<String> goods = new ArrayList<>();
    public Warehouse(int capacity) {
        this.capacity = capacity;
    }
    // 生产者调用
    public synchronized void produce(String item) throws InterruptedException {
        // 仓库已满,生产者等待
        while (goods.size() == capacity) {
            System.out.println("仓库已满,生产者 " + Thread.currentThread().getName() + " 等待...");
            wait(); // 释放锁,并进入等待状态
        }
        // 仓库不满,开始生产
        goods.add(item);
        System.out.println(Thread.currentThread().getName() + " 生产了 " + item + ",当前库存: " + goods.size());
        // 通知所有等待的线程(消费者)
        notifyAll(); // 唤醒消费者
    }
    // 消费者调用
    public synchronized String consume() throws InterruptedException {
        // 仓库为空,消费者等待
        while (goods.isEmpty()) {
            System.out.println("仓库为空,消费者 " + Thread.currentThread().getName() + " 等待...");
            wait(); // 释放锁,并进入等待状态
        }
        // 仓库不空,开始消费
        String item = goods.remove(0);
        System.out.println(Thread.currentThread().getName() + " 消费了 " + item + ",当前库存: " + goods.size());
        // 通知所有等待的线程(生产者)
        notifyAll(); // 唤醒生产者
        return item;
    }
}
// 生产者线程
class Producer implements Runnable {
    private final Warehouse warehouse;
    public Producer(Warehouse warehouse) {
        this.warehouse = warehouse;
    }
    @Override
    public void run() {
        try {
            for (int i = 1; i <= 10; i++) {
                warehouse.produce("商品-" + i);
                Thread.sleep(100); // 模拟生产耗时
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
// 消费者线程
class Consumer implements Runnable {
    private final Warehouse warehouse;
    public Consumer(Warehouse warehouse) {
        this.warehouse = warehouse;
    }
    @Override
    public void run() {
        try {
            for (int i = 1; i <= 10; i++) {
                warehouse.consume();
                Thread.sleep(200); // 模拟消费耗时
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ProducerConsumerDemo {
    public static void main(String[] args) {
        Warehouse warehouse = new Warehouse(3); // 仓库容量为3
        Thread producerThread = new Thread(new Producer(warehouse), "生产者A");
        Thread consumerThread = new Thread(new Consumer(warehouse), "消费者B");
        producerThread.start();
        consumerThread.start();
    }
}

代码解析

  1. Warehouse:共享资源,内部方法用 synchronized 修饰,保证同一时间只有一个线程能操作 goods 列表。
  2. produce() 方法
    • 使用 while (goods.size() == capacity) 而不是 if,这是防止虚假唤醒的关键,因为线程可能被意外唤醒(即使没有被 notify),此时仓库可能还是满的,所以需要再次检查。
    • 当仓库满时,调用 wait() 释放锁并等待。
    • 成功生产后,调用 notifyAll() 唤醒可能在等待的消费者。
  3. consume() 方法
    • 同样使用 while (goods.isEmpty()) 检查仓库是否为空。
    • 当仓库空时,调用 wait() 等待。
    • 成功消费后,调用 notifyAll() 唤醒可能在等待的生产者。
  4. ProducerConsumer:实现了 Runnable 接口,在线程的 run() 方法中循环调用 produce()consume()

最佳实践与注意事项

  1. 永远在 while 循环中调用 wait()

    • 原因:防止“虚假唤醒”(Spurious Wakeups),JVM 规范允许线程在没有被 notifynotifyAll 的情况下被唤醒,虽然不常见,但必须防御性地编程,使用 while 循环可以确保在唤醒后,再次检查等待条件是否满足,如果不满足,则继续等待。
  2. 优先使用 notifyAll()

    • 原因notify() 的随机性可能会导致逻辑错误,在一个有多个生产者和多个消费者的场景中,notify() 可能唤醒了一个生产者,而此时仓库是满的,导致它立即再次 wait,而真正需要被唤醒的消费者却还在等待。notifyAll() 虽然效率稍低,但逻辑更清晰、更安全。
  3. 锁的范围要最小化

    • 只在必要的临界区(即访问共享资源的代码块)使用 synchronized,不要将整个 run() 方法都用 synchronized 包裹,这会严重影响并发性能。
  4. 注意异常处理

    • wait() 方法会抛出 InterruptedException,当线程在等待时被 interrupt() 中断,它会抛出这个异常并从 wait() 状态返回,你需要妥善处理这个异常,通常意味着线程的等待任务被取消了。

现代替代方案:java.util.concurrent

虽然 wait()/notify() 是基础,但在现代 Java 开发中,我们更推荐使用 java.util.concurrent 包中的高级工具,它们更安全、更易用、性能更好。

  • BlockingQueue (阻塞队列):这是实现生产者-消费者模型的首选,它内部已经实现了线程安全和阻塞逻辑。

    • 示例

      // 无界队列,生产者不会因为队列满而阻塞
      // BlockingQueue<String> queue = new LinkedBlockingQueue<>();
      // 有界队列,容量为10
      BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
      // 生产者
      new Thread(() -> {
          try {
              for (int i = 0; i < 20; i++) {
                  String item = "商品-" + i;
                  queue.put(item); // 如果队列满,put() 方法会阻塞
                  System.out.println("生产了: " + item);
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }).start();
      // 消费者
      new Thread(() -> {
          try {
              for (int i = 0; i < 20; i++) {
                  String item = queue.take(); // 如果队列空,take() 方法会阻塞
                  System.out.println("消费了: " + item);
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }).start();
    • 优点:代码极其简洁,无需手动管理锁、wait()notify()put()take() 方法已经为我们处理好了所有阻塞和唤醒的逻辑。

  • Condition 接口:它是 Lock 的配套工具,提供了更灵活的等待/通知机制,可以区分不同的等待条件(可以有一个 notFull 条件和一个 notEmpty 条件)。

特性 wait() / notify() BlockingQueue
复杂度 ,需要手动管理锁和逻辑 ,开箱即用
安全性 ,容易出错(如忘记用 while ,由 JDK 保证线程安全
可读性 ,需要理解底层原理 ,代码意图清晰
适用场景 学习并发原理、需要高度定制化的复杂场景 绝大多数标准的生产者-消费者场景

深刻理解 wait()notify() 是掌握 Java 并发的基石,但在实际项目中,优先使用 BlockingQueue 等高级并发工具,它们能让你用更少的代码写出更健壮、更高效的并发程序。

分享:
扫描分享到社交APP
上一篇
下一篇