Redis 本身不是一个专业的消息队列系统(如 RabbitMQ、Kafka),但它提供了一些数据结构,可以非常方便地快速搭建一个轻量级、功能尚可的消息队列。

下面我将从原理、具体实现、优缺点、以及生产级方案演进等多个角度进行阐述。
核心原理:Redis 的哪些数据结构适合做 MQ?
消息队列的核心操作是:生产者将消息放入队列,消费者从队列中取出消息,在 Redis 中,以下几个数据结构天然适合这个场景:
a) 列表 - 最经典、最简单的实现
这是实现 MQ 最直接、最常用的方式。
- 数据结构:
List - 核心命令:
LPUSH key value: 将消息推入列表的头部(左端),生产者常用。RPUSH key value: 将消息推入列表的尾部(右端),生产者常用。RPOP key: 从列表的尾部(右端)弹出一个消息,消费者常用。LPOP key: 从列表的头部(左端)弹出一个消息,消费者常用。
- 工作模式:
- 生产者:
LPUSH my_queue "message_content" - 消费者:
RPOP my_queue
- 生产者:
- 特点:
- 先进先出: 非常符合队列的定义。
- 简单: 实现逻辑最简单。
b) Streams - 最专业、功能最强大的实现 (Redis 5.0+)
Redis Streams 是专门为消息流场景设计的数据结构,功能非常强大,是目前 Redis 实现生产级 MQ 的首选方案。

- 数据结构:
Stream - 核心概念:
- Stream: 消息的集合,相当于一个队列。
- Producer: 生产者,向 Stream 添加消息。
- Consumer: 消费者,从 Stream 读取消息。
- Consumer Group: 消费者组,允许多个消费者协同消费一个 Stream,每条消息只被组内的一个消费者处理,实现负载均衡。
- Message ID: 每条消息的唯一 ID,格式为
timestamp-sequence。 - Pending Entries List (PEL): 待处理列表,记录已经被分发给某个消费者但尚未被确认的消息,防止消息丢失。
- 核心命令:
XADD stream_name * field1 value1 field2 value2: 向 Stream 添加一条消息。XREAD COUNT count STREAMS stream_name id: 从一个或多个 Stream 读取消息。XREADGROUP GROUP group_name consumer_name COUNT count STREAMS stream_name >: 从消费者组中读取新消息(>表示从未读取过的消息开始)。XACK stream_name group_name message_id: 消费者确认消息已处理,从 PEL 中移除。XGROUP CREATE stream_name group_id id: 创建消费者组。
- 特点:
- 消费者组: 支持多消费者消费,实现负载均衡和消息不重复消费。
- 消息持久化: 消息会一直存在于 Stream 中,除非被主动删除。
- 消息确认: 通过
XACK机制确保消息被成功处理,防止丢失。 - 阻塞读取:
XREAD命令支持阻塞,类似BRPOP,可以实时获取新消息。
c) 发布/订阅 - 特殊场景的 MQ
Pub/Sub 不适合传统的队列模型,而更适合事件通知。
- 数据结构: 不特定,基于频道。
- 核心命令:
PUBLISH channel message: 发布消息到频道。SUBSCRIBE channel: 订阅频道,接收消息。
- 特点:
- 一对多: 一个消息可以被多个订阅者接收。
- 无状态: 消息一旦发布,如果订阅者不在线,消息就会丢失,它不存储历史消息。
- 不保证顺序: 消息是并发的,不保证到达顺序。
- 适用场景: 实时通知、聊天室、日志广播等。
Java 实现示例
我们将使用流行的 Java Redis 客户端 Lettuce(推荐,支持异步和响应式编程)和 Jedis 来演示。
准备工作
在 pom.xml 中添加依赖:
<!-- 使用 Lettuce 作为客户端 -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.2.6.RELEASE</version> <!-- 使用最新稳定版 -->
</dependency>
<!-- 或者使用 Jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version> <!-- 使用最新稳定版 -->
</dependency>
基于 List 的简单队列
生产者
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class SimpleMQProducer {
private Jedis jedis;
public SimpleMQProducer() {
this.jedis = new Jedis("localhost", 6379);
}
public void produce(String queueName, String message) {
// 使用 Pipeline 可以提高批量操作的性能
Pipeline pipeline = jedis.pipelined();
// LPUSH 将消息放入队列头部
pipeline.lpush(queueName, message);
pipeline.sync(); // 提交执行
System.out.println("Produced message: " + message);
}
public static void main(String[] args) throws InterruptedException {
SimpleMQProducer producer = new SimpleMQProducer();
for (int i = 0; i < 10; i++) {
producer.produce("my_simple_queue", "Hello Redis MQ " + i);
Thread.sleep(1000); // 每秒生产一条消息
}
producer.jedis.close();
}
}
消费者
import redis.clients.jedis.Jedis;
public class SimpleMQConsumer {
private Jedis jedis;
public SimpleMQConsumer() {
this.jedis = new Jedis("localhost", 6379);
}
public void consume(String queueName) {
// BRPOP 是阻塞版本的 RPOP,当列表为空时,会阻塞直到有新消息或超时
// 0 表示无限期阻塞
while (true) {
// RPOP 阻塞获取列表尾部元素
String message = jedis.brpop(0, queueName).get(1); // 返回的是一个列表,第二个元素是消息内容
if (message != null) {
System.out.println("Consumed message: " + message);
// 模拟消息处理
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
SimpleMQConsumer consumer = new SimpleMQConsumer();
consumer.consume("my_simple_queue");
}
问题: 这个简单的实现有严重缺陷:如果消费者在处理消息时崩溃了,消息就会丢失,因为没有消息确认机制。
基于 Streams 的专业队列 (推荐)
生产者
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
public class StreamMQProducer {
private RedisStreamCommands<String, String> syncCommands;
public StreamMQProducer() {
RedisClient redisClient = RedisClient.create("redis://localhost:6379");
StatefulRedisConnection<String, String> connection = redisClient.connect();
this.syncCommands = connection.sync();
}
public void produce(String streamName, String message) {
// XADD 添加消息,* 表示让 Redis 自动生成 ID
String messageId = syncCommands.xadd(streamName, Map.of("message", message));
System.out.println("Produced message with ID: " + messageId);
}
public static void main(String[] args) throws InterruptedException {
StreamMQProducer producer = new StreamMQProducer();
for (int i = 0; i < 10; i++) {
producer.produce("my_stream", "Hello Redis Stream MQ " + i);
Thread.sleep(1000);
}
}
}
消费者
import io.lettuce.core.RedisClient;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
import java.util.Map;
public class StreamMQConsumer {
private RedisStreamCommands<String, String> syncCommands;
private String groupName = "my_consumer_group";
private String consumerName = "consumer-1";
public StreamMQConsumer() {
RedisClient redisClient = RedisClient.create("redis://localhost:6379");
StatefulRedisConnection<String, String> connection = redisClient.connect();
this.syncCommands = connection.sync();
// 创建消费者组,如果已存在则忽略
// '0' 表示从 Stream 的第一条消息开始读
try {
syncCommands.xgroupCreate(StreamMQKey.Stream, groupName, "0");
} catch (Exception e) {
System.out.println("Consumer group already exists.");
}
}
public void consume() {
while (true) {
// XREADGROUP 从消费者组中读取新消息
// '>' 表示从未处理过的消息开始
// COUNT 1 表示一次只读取一条
// BLOCK 0 表示无限期阻塞
Map<String, StreamMessage<String, String>> messages = syncCommands.xreadgroup(
Map.entry(groupName, consumerName),
Map.entry(StreamMQKey.Stream, ">"),
1, 0);
if (!messages.isEmpty()) {
StreamMessage<String, String> message = messages.get(StreamMQKey.Stream);
System.out.println("Consumer " + consumerName + " received message: " + message.getBody());
// 模拟处理消息
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完成后,发送 ACK 确认
syncCommands.xack(StreamMQKey.Stream, groupName, message.getId());
System.out.println("Acknowledged message ID: " + message.getId());
}
}
}
public static void main(String[] args) {
StreamMQConsumer consumer = new StreamMQConsumer();
consumer.consume();
}
}
注意:在实际应用中,消费者组名和消费者名应该是动态生成的,例如使用 UUID。
优缺点分析
| 特性 | 基于列表 | 基于 Streams | 基于发布/订阅 |
|---|---|---|---|
| 实现复杂度 | 低 | 中 | 低 |
| 消息持久化 | 是(数据在内存) | 是 | 否 |
| 消息可靠性 | 低 (无ACK) | 高 (有ACK机制) | 低 |
| 多消费者 | 可以,但需自行实现负载均衡 | 是 (消费者组,自动负载均衡) | 是 (一对多) |
| 消息不重复消费 | 难以保证 | 可以保证 (通过消费者组) | 不适用 |
| 消息顺序 | 保证 FIFO | 保证单个消费者顺序 | 不保证 |
| 阻塞消费 | BRPOP |
XREAD with BLOCK |
SUBSCRIBE (天生阻塞) |
| 适用场景 | 简单任务、日志收集、非核心业务 | 绝大多数 MQ 场景、可靠的消息传递 | 实时通知、事件驱动架构 |
生产级演进与方案对比
当你的业务对消息队列的可靠性、高可用性有较高要求时,Redis MQ 可能会遇到瓶颈,这时需要考虑更专业的方案。
| 特性 | Redis (List/Streams) | RabbitMQ | Apache Kafka |
|---|---|---|---|
| 可靠性 | 中 (Streams 较好) | 非常高 (ACK, 持久化, 事务) | 非常高 (副本, 分区) |
| 高可用 | 需要依赖 Redis Cluster | 原生支持 (镜像队列) | 原生支持 (多副本) |
| 性能 | 高 (内存操作) | 高 (单机/小集群) | 极高 (分布式, 顺序写盘) |
| 功能丰富度 | 基础 | 非常丰富 (路由, 死信, 延迟队列等) | 非常丰富 (流处理, 连接器) |
| 运维复杂度 | 低 (与 Redis 一同运维) | 中 | 高 |
| 适用场景 | 缓存/会话旁路、轻量级MQ | 企业应用、复杂路由、任务调度 | 大数据、日志收集、事件溯源 |
如何选择?
- 简单、快速、非核心: 使用 Redis List,比如一个后台的日志收集任务,丢了无所谓,实现起来最快。
- 需要基本可靠性、想用 Redis: 使用 Redis Streams,它提供了生产 MQ 所需的核心功能(持久化、ACK、消费者组),足以应对大部分中等规模的应用场景。
- 金融、电商等对可靠性要求极高: 选择 RabbitMQ,它的成熟度和功能完整性是 Redis 无法比拟的。
- 海量数据、高吞吐、日志/流处理: 选择 Kafka,它是分布式消息系统的王者,专为大数据场景设计。
在 Java 中使用 Redis 实现消息队列是一个灵活且高效的方案,尤其是在已经使用 Redis 的项目中。
- 对于学习和快速原型: 从
List+LPUSH/RPOP开始,理解其基本原理。 - 对于生产环境: 强烈推荐使用 Redis Streams,它提供了接近专业 MQ 的核心功能,同时保持了 Redis 的简洁和高效。
- 对于核心业务系统: 在评估 Redis Streams 是否能满足需求后,如果对可靠性、功能有更高要求,请务必考虑 RabbitMQ 或 Kafka 这样的专业消息队列中间件。
