杰瑞科技汇

Redis消息队列在Java中如何高效实现?

为什么选择 Redis 作为消息队列?

  • 高性能: 基于内存操作,速度极快。
  • 简单易用: 无需额外部署,数据结构简单,API 直观。
  • 数据持久化: 支持将数据持久化到磁盘,防止消息丢失。
  • 功能丰富: 除了队列,还能提供缓存、分布式锁等多种功能。

消息队列的核心模型

一个基本的消息队列包含三个角色:

  1. 生产者: 负责创建消息并将其发送到队列。
  2. 队列: 存储消息,是生产者和消费者之间的桥梁。
  3. 消费者: 负责从队列中取出消息并进行处理。

Redis 实现消息队列的三种主要方式

基于 List 的实现 (最常用)

这是最简单、最直接的方式,完全符合队列的“先进先出”(FIFO)原则。

核心命令:

  • LPUSH key value: 将一个值或多个值插入到列表的头部(左端)。
  • RPUSH key value: 将一个值或多个值插入到列表的尾部(右端)。
  • LPOP key: 移除并返回列表的头部(左端)元素。
  • RPOP key: 移除并返回列表的尾部(右端)元素。

工作流程:

  • 生产者: 使用 LPUSH 命令将消息推入列表的头部。
  • 消费者: 使用 RPOP 命令从列表的尾部拉取消息。

Java 实现 (使用 Jedis)

确保你的项目中包含了 Jedis 依赖。

Maven 依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.3.1</version> <!-- 使用较新版本 -->
</dependency>

生产者代码:

import redis.clients.jedis.Jedis;
public class RedisQueueProducer {
    private final Jedis jedis;
    private final String queueName = "my_queue";
    public RedisQueueProducer(String host, int port) {
        this.jedis = new Jedis(host, port);
    }
    public void produce(String message) {
        // 使用 LPUSH 将消息放入队列头部
        jedis.lpush(queueName, message);
        System.out.println("Produced message: " + message);
    }
    public static void main(String[] args) {
        RedisQueueProducer producer = new RedisQueueProducer("localhost", 6379);
        for (int i = 0; i < 10; i++) {
            producer.produce("Hello Redis Message " + i);
        }
        producer.jedis.close();
    }
}

消费者代码:

import redis.clients.jedis.Jedis;
public class RedisQueueConsumer {
    private final Jedis jedis;
    private final String queueName = "my_queue";
    public RedisQueueConsumer(String host, int port) {
        this.jedis = new Jedis(host, port);
    }
    public void consume() {
        // 使用 RPOP 从队列尾部拉取消息
        String message = jedis.rpop(queueName);
        if (message != null) {
            System.out.println("Consumed message: " + message);
            // 在这里处理消息...
        }
    }
    public static void main(String[] args) throws InterruptedException {
        RedisQueueConsumer consumer = new RedisQueueConsumer("localhost", 6379);
        while (true) {
            consumer.consume();
            // 为了避免CPU空转,可以短暂休眠
            Thread.sleep(1000);
        }
        // consumer.jedis.close(); // 通常消费者是常驻进程,不关闭
    }
}

缺点:

  • 消息丢失: 如果消费者在处理消息之前崩溃,那么这条消息就会永久丢失,因为 RPOP 是“拉取”模式,消息一旦被取出,就从队列中删除了。
  • 消费者负载不均: 如果多个消费者同时拉取,快的消费者会拿到更多的消息,而慢的消费者可能长时间拿不到消息。

基于 List + BRPOP/B LPOP 的实现 (改进版)

为了解决消费者崩溃导致消息丢失的问题,我们可以使用 BRPOP (或 BLPOP) 命令。

核心命令:

  • BRPOP key [key ...] timeout: 阻塞式地移除并返回列表的尾部(右端)元素,如果列表为空,客户端会阻塞,直到等待超时或列表中有新元素加入。

工作流程:

  • 生产者: 仍然是 LPUSH
  • 消费者: 使用 BRPOP 从队列尾部拉取消息,关键区别在于,如果队列为空,BRPOP 不会立即返回 null,而是会“阻塞”住,直到有新消息到来或者超时,这避免了消费者在无消息时进行无效的轮询,节省了 CPU 资源。

Java 实现 (消费者代码)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;
public class RedisQueueBlockingConsumer {
    private final Jedis jedis;
    private final String queueName = "my_queue";
    public RedisQueueBlockingConsumer(String host, int port) {
        this.jedis = new Jedis(host, port);
    }
    public void consume() {
        // BRPOP 会阻塞,直到有消息或超时(单位:秒)
        // 返回的是一个列表,第一个元素是队列名,第二个元素是消息内容
        // 设置超时时间为 0,表示无限期阻塞,直到有消息
        String[] result = jedis.brpop(0, queueName).toArray(new String[0]);
        if (result.length > 0) {
            String message = result[1]; // 消息在第二个位置
            System.out.println("Consumed message: " + message);
            // 在这里处理消息...
        }
    }
    public static void main(String[] args) throws InterruptedException {
        RedisQueueBlockingConsumer consumer = new RedisQueueBlockingConsumer("localhost", 6379);
        while (true) {
            consumer.consume();
        }
    }
}

优点:

  • 高效: 消费者无需轮询,在有消息时才被唤醒,效率高。
  • 消息可靠性: 消息只有在被 BRPOP 成功取出后才会从队列中移除,如果消费者在处理过程中崩溃,消息仍然在队列中,可以被其他消费者重新消费。

基于 Stream 的实现 (官方推荐,功能最强)

Redis 5.0 引入了 Stream 数据结构,它是一个全新的、功能强大的消息队列实现,弥补了 List 的一些不足。

核心特性:

  • 消息持久化: Stream 中的消息可以被持久化,即使 Redis 重启,消息也不会丢失。
  • 消费者组: 这是 Stream 最强大的功能,消费者组可以将消息分发给组内的不同消费者,实现消息的负载均衡,每个消费者只会收到组内未消费的消息。
  • 消息确认: 消费者处理完消息后需要手动发送确认,如果消息在一定时间内没有被确认,它会重新分发给组内的其他消费者,确保消息被处理。
  • 消息ID: 每条消息都有一个唯一的 ID,可以按 ID 范围查询历史消息。

核心命令:

  • XADD key ID field value: 向 Stream 中添加一条消息。
  • XREAD COUNT count BLOCK milliseconds STREAMS key id: 从一个或多个 Stream 中读取消息。
  • XGROUP CREATE key groupname id: 创建消费者组。
  • XREADGROUP GROUP groupname consumer COUNT count STREAMS key >: 从消费者组中读取消息(> 表示从未消费的消息开始)。
  • XACK key group id: 确认消息已处理。

Java 实现 (使用 Lettuce,Jedis 也可用)

Lettuce 对 Stream 的支持更好一些,Maven 依赖:

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>6.2.3.RELEASE</version> <!-- 使用较新版本 -->
</dependency>

生产者代码:

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
public class RedisStreamProducer {
    private final RedisStreamCommands<String, String> commands;
    public RedisStreamProducer(String uri) {
        RedisClient client = RedisClient.create(uri);
        StatefulRedisConnection<String, String> connection = client.connect();
        this.commands = connection.sync();
    }
    public void produce(String message) {
        // 使用 XADD 添加消息,* 表示让 Redis 自动生成 ID
        String messageId = commands.xadd("mystream", Map.of("message", message));
        System.out.println("Produced message with ID: " + messageId);
    }
    public static void main(String[] args) {
        RedisStreamProducer producer = new RedisStreamProducer("redis://localhost:6379");
        for (int i = 0; i < 10; i++) {
            producer.produce("Hello Stream Message " + i);
        }
    }
}

消费者代码 (需要先创建消费者组): 在 redis-cli 中先创建消费者组(如果不存在):

XGROUP CREATE mystream mygroup 0 MKSTREAM
# MKSTREAM 会在 Stream 不存在时自动创建
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.stream.StreamMessage;
import java.util.List;
import java.util.Map;
public class RedisStreamConsumer {
    private final RedisStreamCommands<String, String> commands;
    private final String groupName = "mygroup";
    private final String consumerName = "consumer-1"; // 可以是 consumer-2, consumer-3 等
    public RedisStreamConsumer(String uri) {
        RedisClient client = RedisClient.create(uri);
        StatefulRedisConnection<String, String> connection = client.connect();
        this.commands = connection.sync();
    }
    public void consume() {
        // 使用 XREADGROUP 从消费者组读取消息
        // ">" 表示从消费者组的“最后一条消息ID”之后开始读取,确保只读到新消息
        List<StreamMessage<String, String>> messages = commands.xreadgroup(
                groupName,
                consumerName,
                1, // COUNT 1,一次只读一条
                0, // BLOCK 0,无限阻塞
                Map.entry("mystream", ">")
        );
        for (StreamMessage<String, String> message : messages) {
            System.out.println("Consumer " + consumerName + " received message with ID: " + message.getId());
            String content = message.getBody().get("message");
            System.out.println("Message content: " + content);
            // 处理消息...
            // 处理完成后,发送 ACK 确认
            commands.xack("mystream", groupName, message.getId());
            System.out.println("Acknowledged message ID: " + message.getId());
        }
    }
    public static void main(String[] args) throws InterruptedException {
        RedisStreamConsumer consumer = new RedisStreamConsumer("redis://localhost:6379");
        while (true) {
            consumer.consume();
        }
    }
}

优点:

  • 功能强大: 支持消费者组、消息确认、持久化等企业级特性。
  • 高可靠: 消息不丢失,处理失败能重试。
  • 负载均衡: 消费者组自动将消息分发给不同的消费者。

总结与对比

特性 List + LPUSH/RPOP List + BRPOP Stream
实现复杂度 简单 简单 较复杂
消息可靠性 (消费者崩溃会丢消息) 中 (消息在处理前是安全的) (ACK机制,持久化)
消费者模型 单消费者 单消费者 支持消费者组,多消费者,负载均衡
消息顺序 严格FIFO 严格FIFO 按ID顺序,但并发消费时全局顺序可能不保证
适用场景 简单任务、低可靠性要求、学习示例 需要基本可靠性、单消费者场景 高可靠性、高吞吐、需要负载均衡的生产环境

如何选择?

  • 学习/简单场景: 使用 List + BRPOP 模式,足够简单且能解决基本问题。
  • 生产环境/高可靠性要求: 强烈推荐使用 Stream,它提供了现代消息队列所需的核心功能,是 Redis 官方推荐的方案。
  • 特殊场景: 如果你需要“发布/订阅”(Pub/Sub)模式(即一条消息被所有订阅者消费),那么可以使用 Redis 的 PUBLISH/SUBSCRIBE 命令,但这不属于队列范畴。
分享:
扫描分享到社交APP
上一篇
下一篇