为什么选择 Redis 作为消息队列?
- 高性能: 基于内存操作,速度极快。
- 简单易用: 无需额外部署,数据结构简单,API 直观。
- 数据持久化: 支持将数据持久化到磁盘,防止消息丢失。
- 功能丰富: 除了队列,还能提供缓存、分布式锁等多种功能。
消息队列的核心模型
一个基本的消息队列包含三个角色:
- 生产者: 负责创建消息并将其发送到队列。
- 队列: 存储消息,是生产者和消费者之间的桥梁。
- 消费者: 负责从队列中取出消息并进行处理。
Redis 实现消息队列的三种主要方式
基于 List 的实现 (最常用)
这是最简单、最直接的方式,完全符合队列的“先进先出”(FIFO)原则。
核心命令:
LPUSH key value: 将一个值或多个值插入到列表的头部(左端)。RPUSH key value: 将一个值或多个值插入到列表的尾部(右端)。LPOP key: 移除并返回列表的头部(左端)元素。RPOP key: 移除并返回列表的尾部(右端)元素。
工作流程:
- 生产者: 使用
LPUSH命令将消息推入列表的头部。 - 消费者: 使用
RPOP命令从列表的尾部拉取消息。
Java 实现 (使用 Jedis)
确保你的项目中包含了 Jedis 依赖。
Maven 依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version> <!-- 使用较新版本 -->
</dependency>
生产者代码:
import redis.clients.jedis.Jedis;
public class RedisQueueProducer {
private final Jedis jedis;
private final String queueName = "my_queue";
public RedisQueueProducer(String host, int port) {
this.jedis = new Jedis(host, port);
}
public void produce(String message) {
// 使用 LPUSH 将消息放入队列头部
jedis.lpush(queueName, message);
System.out.println("Produced message: " + message);
}
public static void main(String[] args) {
RedisQueueProducer producer = new RedisQueueProducer("localhost", 6379);
for (int i = 0; i < 10; i++) {
producer.produce("Hello Redis Message " + i);
}
producer.jedis.close();
}
}
消费者代码:
import redis.clients.jedis.Jedis;
public class RedisQueueConsumer {
private final Jedis jedis;
private final String queueName = "my_queue";
public RedisQueueConsumer(String host, int port) {
this.jedis = new Jedis(host, port);
}
public void consume() {
// 使用 RPOP 从队列尾部拉取消息
String message = jedis.rpop(queueName);
if (message != null) {
System.out.println("Consumed message: " + message);
// 在这里处理消息...
}
}
public static void main(String[] args) throws InterruptedException {
RedisQueueConsumer consumer = new RedisQueueConsumer("localhost", 6379);
while (true) {
consumer.consume();
// 为了避免CPU空转,可以短暂休眠
Thread.sleep(1000);
}
// consumer.jedis.close(); // 通常消费者是常驻进程,不关闭
}
}
缺点:
- 消息丢失: 如果消费者在处理消息之前崩溃,那么这条消息就会永久丢失,因为
RPOP是“拉取”模式,消息一旦被取出,就从队列中删除了。 - 消费者负载不均: 如果多个消费者同时拉取,快的消费者会拿到更多的消息,而慢的消费者可能长时间拿不到消息。
基于 List + BRPOP/B LPOP 的实现 (改进版)
为了解决消费者崩溃导致消息丢失的问题,我们可以使用 BRPOP (或 BLPOP) 命令。
核心命令:
BRPOP key [key ...] timeout: 阻塞式地移除并返回列表的尾部(右端)元素,如果列表为空,客户端会阻塞,直到等待超时或列表中有新元素加入。
工作流程:
- 生产者: 仍然是
LPUSH。 - 消费者: 使用
BRPOP从队列尾部拉取消息,关键区别在于,如果队列为空,BRPOP不会立即返回null,而是会“阻塞”住,直到有新消息到来或者超时,这避免了消费者在无消息时进行无效的轮询,节省了 CPU 资源。
Java 实现 (消费者代码)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;
public class RedisQueueBlockingConsumer {
private final Jedis jedis;
private final String queueName = "my_queue";
public RedisQueueBlockingConsumer(String host, int port) {
this.jedis = new Jedis(host, port);
}
public void consume() {
// BRPOP 会阻塞,直到有消息或超时(单位:秒)
// 返回的是一个列表,第一个元素是队列名,第二个元素是消息内容
// 设置超时时间为 0,表示无限期阻塞,直到有消息
String[] result = jedis.brpop(0, queueName).toArray(new String[0]);
if (result.length > 0) {
String message = result[1]; // 消息在第二个位置
System.out.println("Consumed message: " + message);
// 在这里处理消息...
}
}
public static void main(String[] args) throws InterruptedException {
RedisQueueBlockingConsumer consumer = new RedisQueueBlockingConsumer("localhost", 6379);
while (true) {
consumer.consume();
}
}
}
优点:
- 高效: 消费者无需轮询,在有消息时才被唤醒,效率高。
- 消息可靠性: 消息只有在被
BRPOP成功取出后才会从队列中移除,如果消费者在处理过程中崩溃,消息仍然在队列中,可以被其他消费者重新消费。
基于 Stream 的实现 (官方推荐,功能最强)
Redis 5.0 引入了 Stream 数据结构,它是一个全新的、功能强大的消息队列实现,弥补了 List 的一些不足。
核心特性:
- 消息持久化: Stream 中的消息可以被持久化,即使 Redis 重启,消息也不会丢失。
- 消费者组: 这是 Stream 最强大的功能,消费者组可以将消息分发给组内的不同消费者,实现消息的负载均衡,每个消费者只会收到组内未消费的消息。
- 消息确认: 消费者处理完消息后需要手动发送确认,如果消息在一定时间内没有被确认,它会重新分发给组内的其他消费者,确保消息被处理。
- 消息ID: 每条消息都有一个唯一的 ID,可以按 ID 范围查询历史消息。
核心命令:
XADD key ID field value: 向 Stream 中添加一条消息。XREAD COUNT count BLOCK milliseconds STREAMS key id: 从一个或多个 Stream 中读取消息。XGROUP CREATE key groupname id: 创建消费者组。XREADGROUP GROUP groupname consumer COUNT count STREAMS key >: 从消费者组中读取消息(>表示从未消费的消息开始)。XACK key group id: 确认消息已处理。
Java 实现 (使用 Lettuce,Jedis 也可用)
Lettuce 对 Stream 的支持更好一些,Maven 依赖:
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.2.3.RELEASE</version> <!-- 使用较新版本 -->
</dependency>
生产者代码:
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
public class RedisStreamProducer {
private final RedisStreamCommands<String, String> commands;
public RedisStreamProducer(String uri) {
RedisClient client = RedisClient.create(uri);
StatefulRedisConnection<String, String> connection = client.connect();
this.commands = connection.sync();
}
public void produce(String message) {
// 使用 XADD 添加消息,* 表示让 Redis 自动生成 ID
String messageId = commands.xadd("mystream", Map.of("message", message));
System.out.println("Produced message with ID: " + messageId);
}
public static void main(String[] args) {
RedisStreamProducer producer = new RedisStreamProducer("redis://localhost:6379");
for (int i = 0; i < 10; i++) {
producer.produce("Hello Stream Message " + i);
}
}
}
消费者代码 (需要先创建消费者组):
在 redis-cli 中先创建消费者组(如果不存在):
XGROUP CREATE mystream mygroup 0 MKSTREAM # MKSTREAM 会在 Stream 不存在时自动创建
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.stream.StreamMessage;
import java.util.List;
import java.util.Map;
public class RedisStreamConsumer {
private final RedisStreamCommands<String, String> commands;
private final String groupName = "mygroup";
private final String consumerName = "consumer-1"; // 可以是 consumer-2, consumer-3 等
public RedisStreamConsumer(String uri) {
RedisClient client = RedisClient.create(uri);
StatefulRedisConnection<String, String> connection = client.connect();
this.commands = connection.sync();
}
public void consume() {
// 使用 XREADGROUP 从消费者组读取消息
// ">" 表示从消费者组的“最后一条消息ID”之后开始读取,确保只读到新消息
List<StreamMessage<String, String>> messages = commands.xreadgroup(
groupName,
consumerName,
1, // COUNT 1,一次只读一条
0, // BLOCK 0,无限阻塞
Map.entry("mystream", ">")
);
for (StreamMessage<String, String> message : messages) {
System.out.println("Consumer " + consumerName + " received message with ID: " + message.getId());
String content = message.getBody().get("message");
System.out.println("Message content: " + content);
// 处理消息...
// 处理完成后,发送 ACK 确认
commands.xack("mystream", groupName, message.getId());
System.out.println("Acknowledged message ID: " + message.getId());
}
}
public static void main(String[] args) throws InterruptedException {
RedisStreamConsumer consumer = new RedisStreamConsumer("redis://localhost:6379");
while (true) {
consumer.consume();
}
}
}
优点:
- 功能强大: 支持消费者组、消息确认、持久化等企业级特性。
- 高可靠: 消息不丢失,处理失败能重试。
- 负载均衡: 消费者组自动将消息分发给不同的消费者。
总结与对比
| 特性 | List + LPUSH/RPOP | List + BRPOP | Stream |
|---|---|---|---|
| 实现复杂度 | 简单 | 简单 | 较复杂 |
| 消息可靠性 | 低 (消费者崩溃会丢消息) | 中 (消息在处理前是安全的) | 高 (ACK机制,持久化) |
| 消费者模型 | 单消费者 | 单消费者 | 支持消费者组,多消费者,负载均衡 |
| 消息顺序 | 严格FIFO | 严格FIFO | 按ID顺序,但并发消费时全局顺序可能不保证 |
| 适用场景 | 简单任务、低可靠性要求、学习示例 | 需要基本可靠性、单消费者场景 | 高可靠性、高吞吐、需要负载均衡的生产环境 |
如何选择?
- 学习/简单场景: 使用
List + BRPOP模式,足够简单且能解决基本问题。 - 生产环境/高可靠性要求: 强烈推荐使用
Stream,它提供了现代消息队列所需的核心功能,是 Redis 官方推荐的方案。 - 特殊场景: 如果你需要“发布/订阅”(Pub/Sub)模式(即一条消息被所有订阅者消费),那么可以使用 Redis 的
PUBLISH/SUBSCRIBE命令,但这不属于队列范畴。
