杰瑞科技汇

Java线程安全Queue如何实现?

在 Java 并发编程中,队列是一个非常基础且重要的数据结构,当多个线程需要共享访问一个队列时,如果不加以控制,就会发生数据不一致、数据丢失等严重问题,使用线程安全的队列至关重要。

Java线程安全Queue如何实现?-图1
(图片来源网络,侵删)

Java 提供了多种线程安全的队列实现,它们位于 java.util.concurrent 包下,并遵循不同的并发策略,适用于不同的场景。

Java 线程安全队列的核心分类

Java 的并发队列主要可以分为两大类:

  1. 阻塞队列:当队列为空时,从队列中获取元素的线程会被阻塞,直到队列中有新的元素被加入;当队列已满时,向队列中添加元素的线程也会被阻塞,直到队列中有空间可以容纳新元素。
  2. 非阻塞队列:提供非阻塞的插入、移除和检查操作,这些操作在失败时会立即返回,而不是等待。

阻塞队列

阻塞队列是 Java 并发包中最常用的一类队列,它们是实现生产者-消费者模式的理想选择。

BlockingQueue 接口

这是所有阻塞队列的顶级接口,定义了核心的阻塞操作:

Java线程安全Queue如何实现?-图2
(图片来源网络,侵删)
  • 阻塞式 put(e):将元素 e 添加到队列尾部,如果队列已满,则调用线程会阻塞,直到队列有空间。
  • 阻塞式 take():获取并移除队列头部的元素,如果队列为空,则调用线程会阻塞,直到队列中有元素。
  • 超时式 offer(e, time, unit):尝试将元素 e 加入队列,如果队列已满,则等待指定的时间,如果超时仍未加入成功,则返回 false
  • 超时式 poll(time, unit):尝试获取并移除队列头部元素,如果队列为空,则等待指定的时间,如果超时仍未获取到元素,则返回 null

常见的阻塞队列实现

实现类 底层数据结构 特点 适用场景
ArrayBlockingQueue 数组 有界队列,必须指定容量,基于可重入锁 实现,并发度相对较低(同一时间只有一个锁)。 当需要固定大小的队列,且希望限制内存使用时,性能在单生产者-单消费者场景下不错。
LinkedBlockingQueue 链表 可选有界队列(默认 Integer.MAX_VALUE),基于分离锁(头尾各一把锁),实现了高并发,生产者和消费者可以并行操作,互不干扰。 最常用的阻塞队列之一,适用于高并发、生产者和消费者数量较多的场景。
PriorityBlockingQueue 无界队列,按照元素的自然排序或指定的 Comparator 进行排序,阻塞操作基于可重入锁 需要按照优先级处理任务的场景,如任务调度。
SynchronousQueue 不存储元素 一个特殊的队列,容量为 0,每个 put 操作必须等待一个对应的 take 操作,反之亦然,它像一个“会合点”,直接将数据从生产者传递给消费者。 高并发的“传递”场景,Executors.newCachedThreadPool() 就使用它来创建线程池,它不持有任务,只负责传递。
DelayQueue 无界队列,其中的元素必须实现 Delayed 接口,只有当元素的延迟时间到期后,才能从队列中获取。 延迟任务执行,例如定时器、消息延迟推送等。

非阻塞队列

非阻塞队列通常基于更高级的并发原语(如 CAS - Compare-And-Swap)实现,避免了线程阻塞和唤醒的开销,在高并发下能提供更好的吞吐量和可伸缩性。

ConcurrentLinkedQueue

这是 Java 并发包中唯一的非阻塞队列实现。

  • 底层数据结构:基于链表
  • 并发机制:使用无锁算法,完全基于 CAS 操作来保证线程安全。
  • 特点
    • 无界:没有容量限制。
    • 高并发:在高并发环境下,性能通常优于基于锁的队列。
    • 弱一致性size() 方法是一个近似值,不能保证完全精确,迭代器是弱一致的,它不一定能反映出迭代后的所有修改。
  • 适用场景:适用于对性能要求极高,且不需要阻塞操作的场景,高并发的日志收集、事件总线等。

传统集合中的线程安全队列

除了 java.util.concurrent 包,Java 也提供了一些传统的线程安全队列,但它们在现代并发编程中不推荐使用。

实现类 底层数据结构 同步机制 不推荐的原因
Vector 数组 方法级同步 (synchronized) 效率低下,所有操作都加锁,即使是在读多写少的场景下,也会导致严重的性能瓶颈。
Collections.synchronizedQueue(Queue<T> q) 包装任意队列 方法级同步 Vector 类似,它对队列的每一个公共方法都加了一把全局锁,这使得并发度极低,无法实现真正的并行操作。

为什么不推荐? 因为这些实现采用的是一种“粗粒度锁”策略,即用一个锁保护了整个数据结构,在任何时候,只允许一个线程访问队列,这使得它们在高并发场景下性能很差。

Java线程安全Queue如何实现?-图3
(图片来源网络,侵删)

如何选择?

选择哪种线程安全队列,取决于你的具体应用场景:

场景 推荐选择 理由
需要实现经典的生产者-消费者模式 LinkedBlockingQueueArrayBlockingQueue 阻塞队列的特性天然适合这种模式,可以简化线程间的协调逻辑。LinkedBlockingQueue 高并发,ArrayBlockingQueue 有界可控。
需要处理高并发、高性能要求的任务传递 SynchronousQueue 无存储,直接传递,效率最高,适合 CachedThreadPool 这样的线程池。
需要处理有优先级的任务 PriorityBlockingQueue 内部维护堆结构,可以保证每次取出的都是优先级最高的任务。
需要执行延迟任务 DelayQueue 专为延迟场景设计,只有任务到期才能被消费。
只需要一个高性能、非阻塞的队列 ConcurrentLinkedQueue 无锁设计,在高并发下吞吐量极高,但需要自己处理线程间的协调(例如使用 while 循环 poll)。
遗留代码或简单场景(不推荐) VectorCollections.synchronizedQueue 简单易用,但性能差,应尽量避免在新代码中使用。

代码示例:生产者-消费者模式

下面是一个使用 LinkedBlockingQueue 实现生产者-消费者模式的简单示例。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
class Producer implements Runnable {
    private final BlockingQueue<String> queue;
    public Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                System.out.println("Producer produced: " + message);
                queue.put(message); // 如果队列满,这里会阻塞
                TimeUnit.SECONDS.sleep(1); // 模拟耗时
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
class Consumer implements Runnable {
    private final BlockingQueue<String> queue;
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) { // 消费者会一直运行
                String message = queue.take(); // 如果队列空,这里会阻塞
                System.out.println("Consumer consumed: " + message);
                // 模拟处理消息耗时
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
public class BlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个有界阻塞队列,容量为 5
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);
        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread = new Thread(new Consumer(queue));
        producerThread.start();
        consumerThread.start();
        // 等待生产者完成
        producerThread.join();
        // 给消费者一些时间消费完所有消息
        TimeUnit.SECONDS.sleep(12);
        // 优雅地停止消费者(在实际应用中,可能需要更复杂的机制)
        consumerThread.interrupt();
    }
}
特性 阻塞队列 非阻塞队列
核心机制 CAS (无锁)
阻塞行为 队列满/空时,线程会阻塞 操作失败时立即返回,不阻塞
适用场景 生产者-消费者、任务调度 高吞吐量、低延迟的场景
代表类 LinkedBlockingQueue, ArrayBlockingQueue ConcurrentLinkedQueue
优点 逻辑简单,易于使用 高并发下性能好,可伸缩性强
缺点 线程阻塞和唤醒有开销 API 更复杂,需要自己处理失败情况

在选择时,请优先考虑 java.util.concurrent 包中的实现,并根据你的业务需求(是否需要阻塞、是否需要容量限制、对性能的要求等)来做出最合适的选择。

分享:
扫描分享到社交APP
上一篇
下一篇