我们将使用经典的 JMS (Java Message Service) API 来完成这个示例。

准备工作
-
下载 ActiveMQ
- 访问 ActiveMQ 官网: https://activemq.apache.org/components/classic/download/
- 下载最新的 Binary 版本 (
apache-activemq-5.18.3-bin.zip)。 - 解压到任意目录,
D:\activemq。
-
启动 ActiveMQ 服务器
- 进入解压后的
bin目录 (D:\activemq\bin)。 - 根据您的操作系统运行启动脚本:
- Windows:
activemq.bat - Linux/Mac:
./activemq start
- Windows:
- 服务器启动后,您可以通过浏览器访问管理控制台来确认服务是否正常运行:
- 地址:
http://localhost:8161 - 默认用户名/密码:
admin/admin
- 地址:
- 进入解压后的
-
创建 Java 项目
- 在您喜欢的 IDE (如 IntelliJ IDEA, Eclipse) 中创建一个新的 Java 项目。
- 添加 ActiveMQ 依赖,您有两种方式:
- Maven (推荐): 在
pom.xml文件中添加以下依赖。 - 手动 JAR: 从 ActiveMQ 的
lib目录中复制activemq-all-5.x.x.jar(或类似名称的 JAR 文件) 到您的项目的lib目录,并添加到项目构建路径中。
- Maven (推荐): 在
Maven pom.xml 依赖:

<dependencies>
<!-- ActiveMQ Classic JMS Client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.18.3</version> <!-- 请使用您下载的版本号 -->
</dependency>
<!-- 如果使用 JUnit 进行测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
实例步骤
我们将创建三个类:
JmsUtils.java: 工具类,用于创建连接和会话。MessageProducer.java: 消息生产者,向队列发送消息。MessageConsumer.java: 消息消费者,从队列接收消息。
JmsUtils.java (工具类)
这个类封装了创建 JMS 连接和会话的通用逻辑,避免代码重复。
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
public class JmsUtils {
// ActiveMQ 服务器的地址
private static final String BROKER_URL = "tcp://localhost:61616";
// 用户名和密码
private static final String USER = "admin";
private static final String PASSWORD = "admin";
/**
* 创建并返回一个 JMS 连接
* @return Connection 对象
* @throws Exception
*/
public static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, BROKER_URL);
return connectionFactory.createConnection();
}
/**
* 创建一个非事务性的、自动确认的会话
* @param connection JMS 连接
* @return Session 对象
* @throws Exception
*/
public static Session createSession(Connection connection) throws Exception {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
}
MessageProducer.java (消息生产者)
这个类负责向一个名为 "TEST.QUEUE" 的队列发送文本消息。
import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.*;
public class MessageProducer {
// 队列名称
private static final String QUEUE_NAME = "TEST.QUEUE";
public static void main(String[] args) {
Connection connection = null;
try {
// 1. 创建连接
connection = JmsUtils.createConnection();
connection.start(); // 启动连接
// 2. 创建会话
Session session = JmsUtils.createSession(connection);
// 3. 创建目的地 (Queue)
Destination destination = new ActiveMQQueue(QUEUE_NAME);
// 4. 创建生产者
MessageProducer producer = session.createProducer(destination);
// 5. 创建并发送消息
for (int i = 1; i <= 5; i++) {
String text = "Hello, ActiveMQ! This is message No. " + i;
TextMessage message = session.createTextMessage(text);
System.out.println("Sending message: " + text);
producer.send(message);
// 模拟发送间隔
Thread.sleep(1000);
}
System.out.println("All messages have been sent successfully.");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 6. 关闭连接
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
MessageConsumer.java (消息消费者)
这个类负责连接同一个 "TEST.QUEUE" 队列,并接收消息,它使用 MessageListener 接口来实现异步监听。
import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.*;
public class MessageConsumer {
// 队列名称,必须与生产者一致
private static final String QUEUE_NAME = "TEST.QUEUE";
public static void main(String[] args) {
Connection connection = null;
try {
// 1. 创建连接
connection = JmsUtils.createConnection();
connection.start(); // 启动连接
// 2. 创建会话
Session session = JmsUtils.createSession(connection);
// 3. 创建目的地 (Queue)
Destination destination = new ActiveMQQueue(QUEUE_NAME);
// 4. 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 5. 创建消息监听器,实现异步接收
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
} else {
System.out.println("Received non-text message: " + message);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("Consumer is running and waiting for messages... Press Ctrl+C to exit.");
// 为了让消费者持续运行,我们让主线程休眠
// 在实际应用中,这通常是一个独立的服务,不会退出
Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 注意:如果使用监听器,通常不会在这里关闭连接,而是需要外部机制来控制生命周期
// 如果需要关闭,可以取消下面的注释
/*
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
*/
}
}
}
运行实例
-
先运行消费者:
- 运行
MessageConsumer.java的main方法。 - 您会在控制台看到输出:
Consumer is running and waiting for messages...,程序会在此处等待。
- 运行
-
再运行生产者:
- 运行
MessageProducer.java的main方法。 - 生产者会依次发送 5 条消息,每条消息间隔 1 秒。
- 运行
-
观察结果:
- 消费者控制台: 您会看到消费者实时接收并打印出每条消息。
Received message: Hello, ActiveMQ! This is message No. 1 Received message: Hello, ActiveMQ! This is message No. 2 Received message: Hello, ActiveMQ! This is message No. 3 Received message: Hello, ActiveMQ! This is message No. 4 Received message: Hello, ActiveMQ! This is message No. 5 - 生产者控制台: 发送完成后会打印
All messages have been sent successfully.。
- 消费者控制台: 您会看到消费者实时接收并打印出每条消息。
-
验证 ActiveMQ 管理控制台:
- 打开浏览器访问
http://localhost:8161。 - 登录后,点击左侧菜单的 Queues。
- 您应该能看到名为
TEST.QUEUE的队列,并且有 Enqueued (入队) 和 Dequeued (出队) 的消息计数,都应该是 5。
- 打开浏览器访问
关键概念和代码解释
- ConnectionFactory: 创建连接的工厂,我们使用
ActiveMQConnectionFactory,它知道如何连接到 ActiveMQ 服务器。 - Connection: 代表一个与 JMS 提供者的网络连接,它非常“重”,通常在整个应用程序生命周期中只创建一个。
- Session: 一个单线程上下文,用于发送和接收消息,它包含一系列的操作,如创建生产者、消费者、消息等,会话是线程不安全的。
- Destination: 消息的目的地,可以是队列 或主题。
- Queue (队列): 点对点模式,一个消息只能被一个消费者消费,如果消费者不在,消息会一直存在,直到被消费。
- Topic (主题): 发布/订阅模式,一个消息可以被所有订阅了该主题的消费者消费。
- MessageProducer: 用于向目的地发送消息的对象。
- MessageConsumer: 用于从目的地接收消息的对象。
- Message: JMS 消息的接口,有几种类型,如
TextMessage,MapMessage,ObjectMessage等,我们这里用的是最简单的TextMessage。 connection.start(): 非常重要! 这个方法会启动连接,使消费者能够开始接收消息,在创建完消费者之后调用。- 同步 vs 异步消费:
- 同步: 使用
consumer.receive()方法,它会阻塞当前线程,直到收到一条消息。 - 异步: 使用
consumer.setMessageListener(),提供一个回调接口 (MessageListener),当有消息到达时,JMS 容器会自动调用onMessage方法,不会阻塞主线程,这是更常用的生产者-消费者模式。
- 同步: 使用
常见问题
-
连接失败 (Connection refused)
- 原因: ActiveMQ 服务器没有启动,或者
BROKER_URL中的端口号 (61616) 不正确。 - 解决: 确保服务器已启动,并且检查
activemq.xml配置文件中的transportConnector的端口设置。
- 原因: ActiveMQ 服务器没有启动,或者
-
认证失败
- 原因:
JmsUtils中的USER和PASSWORD与 ActiveMQ 管理控制台的凭据不匹配。 - 解决: 检查
activemq.xml中的<authentication>配置,或确保JmsUtils中的凭据正确。
- 原因:
-
消费者收不到消息
- 原因:
- 队列名称 (
QUEUE_NAME) 在生产者和消费者中不一致。 - 消费者没有调用
connection.start()。 - 消费者代码执行完毕,连接被关闭了(特别是如果使用同步
receive()后没有循环)。
- 队列名称 (
- 解决: 仔细检查以上三点。
- 原因:
这个实例为您提供了一个坚实的基础,您可以根据它来构建更复杂的消息驱动应用程序。
