杰瑞科技汇

activemq java实例

我们将使用经典的 JMS (Java Message Service) API 来完成这个示例。

activemq java实例-图1
(图片来源网络,侵删)

准备工作

  1. 下载 ActiveMQ

  2. 启动 ActiveMQ 服务器

    • 进入解压后的 bin 目录 (D:\activemq\bin)。
    • 根据您的操作系统运行启动脚本:
      • Windows: activemq.bat
      • Linux/Mac: ./activemq start
    • 服务器启动后,您可以通过浏览器访问管理控制台来确认服务是否正常运行:
      • 地址: http://localhost:8161
      • 默认用户名/密码: admin / admin
  3. 创建 Java 项目

    • 在您喜欢的 IDE (如 IntelliJ IDEA, Eclipse) 中创建一个新的 Java 项目。
    • 添加 ActiveMQ 依赖,您有两种方式:
      • Maven (推荐): 在 pom.xml 文件中添加以下依赖。
      • 手动 JAR: 从 ActiveMQ 的 lib 目录中复制 activemq-all-5.x.x.jar (或类似名称的 JAR 文件) 到您的项目的 lib 目录,并添加到项目构建路径中。

Maven pom.xml 依赖:

activemq java实例-图2
(图片来源网络,侵删)
<dependencies>
    <!-- ActiveMQ Classic JMS Client -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.18.3</version> <!-- 请使用您下载的版本号 -->
    </dependency>
    <!-- 如果使用 JUnit 进行测试 -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
</dependencies>

实例步骤

我们将创建三个类:

  1. JmsUtils.java: 工具类,用于创建连接和会话。
  2. MessageProducer.java: 消息生产者,向队列发送消息。
  3. MessageConsumer.java: 消息消费者,从队列接收消息。

JmsUtils.java (工具类)

这个类封装了创建 JMS 连接和会话的通用逻辑,避免代码重复。

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
public class JmsUtils {
    // ActiveMQ 服务器的地址
    private static final String BROKER_URL = "tcp://localhost:61616";
    // 用户名和密码
    private static final String USER = "admin";
    private static final String PASSWORD = "admin";
    /**
     * 创建并返回一个 JMS 连接
     * @return Connection 对象
     * @throws Exception
     */
    public static Connection createConnection() throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, BROKER_URL);
        return connectionFactory.createConnection();
    }
    /**
     * 创建一个非事务性的、自动确认的会话
     * @param connection JMS 连接
     * @return Session 对象
     * @throws Exception
     */
    public static Session createSession(Connection connection) throws Exception {
        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }
}

MessageProducer.java (消息生产者)

这个类负责向一个名为 "TEST.QUEUE" 的队列发送文本消息。

import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.*;
public class MessageProducer {
    // 队列名称
    private static final String QUEUE_NAME = "TEST.QUEUE";
    public static void main(String[] args) {
        Connection connection = null;
        try {
            // 1. 创建连接
            connection = JmsUtils.createConnection();
            connection.start(); // 启动连接
            // 2. 创建会话
            Session session = JmsUtils.createSession(connection);
            // 3. 创建目的地 (Queue)
            Destination destination = new ActiveMQQueue(QUEUE_NAME);
            // 4. 创建生产者
            MessageProducer producer = session.createProducer(destination);
            // 5. 创建并发送消息
            for (int i = 1; i <= 5; i++) {
                String text = "Hello, ActiveMQ! This is message No. " + i;
                TextMessage message = session.createTextMessage(text);
                System.out.println("Sending message: " + text);
                producer.send(message);
                // 模拟发送间隔
                Thread.sleep(1000);
            }
            System.out.println("All messages have been sent successfully.");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 6. 关闭连接
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

MessageConsumer.java (消息消费者)

这个类负责连接同一个 "TEST.QUEUE" 队列,并接收消息,它使用 MessageListener 接口来实现异步监听。

import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.*;
public class MessageConsumer {
    // 队列名称,必须与生产者一致
    private static final String QUEUE_NAME = "TEST.QUEUE";
    public static void main(String[] args) {
        Connection connection = null;
        try {
            // 1. 创建连接
            connection = JmsUtils.createConnection();
            connection.start(); // 启动连接
            // 2. 创建会话
            Session session = JmsUtils.createSession(connection);
            // 3. 创建目的地 (Queue)
            Destination destination = new ActiveMQQueue(QUEUE_NAME);
            // 4. 创建消费者
            MessageConsumer consumer = session.createConsumer(destination);
            // 5. 创建消息监听器,实现异步接收
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            TextMessage textMessage = (TextMessage) message;
                            System.out.println("Received message: " + textMessage.getText());
                        } else {
                            System.out.println("Received non-text message: " + message);
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("Consumer is running and waiting for messages... Press Ctrl+C to exit.");
            // 为了让消费者持续运行,我们让主线程休眠
            // 在实际应用中,这通常是一个独立的服务,不会退出
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 注意:如果使用监听器,通常不会在这里关闭连接,而是需要外部机制来控制生命周期
            // 如果需要关闭,可以取消下面的注释
            /*
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            */
        }
    }
}

运行实例

  1. 先运行消费者:

    • 运行 MessageConsumer.javamain 方法。
    • 您会在控制台看到输出:Consumer is running and waiting for messages...,程序会在此处等待。
  2. 再运行生产者:

    • 运行 MessageProducer.javamain 方法。
    • 生产者会依次发送 5 条消息,每条消息间隔 1 秒。
  3. 观察结果:

    • 消费者控制台: 您会看到消费者实时接收并打印出每条消息。
      Received message: Hello, ActiveMQ! This is message No. 1
      Received message: Hello, ActiveMQ! This is message No. 2
      Received message: Hello, ActiveMQ! This is message No. 3
      Received message: Hello, ActiveMQ! This is message No. 4
      Received message: Hello, ActiveMQ! This is message No. 5
    • 生产者控制台: 发送完成后会打印 All messages have been sent successfully.
  4. 验证 ActiveMQ 管理控制台:

    • 打开浏览器访问 http://localhost:8161
    • 登录后,点击左侧菜单的 Queues
    • 您应该能看到名为 TEST.QUEUE 的队列,并且有 Enqueued (入队) 和 Dequeued (出队) 的消息计数,都应该是 5。

关键概念和代码解释

  • ConnectionFactory: 创建连接的工厂,我们使用 ActiveMQConnectionFactory,它知道如何连接到 ActiveMQ 服务器。
  • Connection: 代表一个与 JMS 提供者的网络连接,它非常“重”,通常在整个应用程序生命周期中只创建一个。
  • Session: 一个单线程上下文,用于发送和接收消息,它包含一系列的操作,如创建生产者、消费者、消息等,会话是线程不安全的。
  • Destination: 消息的目的地,可以是队列 或主题。
    • Queue (队列): 点对点模式,一个消息只能被一个消费者消费,如果消费者不在,消息会一直存在,直到被消费。
    • Topic (主题): 发布/订阅模式,一个消息可以被所有订阅了该主题的消费者消费。
  • MessageProducer: 用于向目的地发送消息的对象。
  • MessageConsumer: 用于从目的地接收消息的对象。
  • Message: JMS 消息的接口,有几种类型,如 TextMessage, MapMessage, ObjectMessage 等,我们这里用的是最简单的 TextMessage
  • connection.start(): 非常重要! 这个方法会启动连接,使消费者能够开始接收消息,在创建完消费者之后调用。
  • 同步 vs 异步消费:
    • 同步: 使用 consumer.receive() 方法,它会阻塞当前线程,直到收到一条消息。
    • 异步: 使用 consumer.setMessageListener(),提供一个回调接口 (MessageListener),当有消息到达时,JMS 容器会自动调用 onMessage 方法,不会阻塞主线程,这是更常用的生产者-消费者模式。

常见问题

  • 连接失败 (Connection refused)

    • 原因: ActiveMQ 服务器没有启动,或者 BROKER_URL 中的端口号 (61616) 不正确。
    • 解决: 确保服务器已启动,并且检查 activemq.xml 配置文件中的 transportConnector 的端口设置。
  • 认证失败

    • 原因: JmsUtils 中的 USERPASSWORD 与 ActiveMQ 管理控制台的凭据不匹配。
    • 解决: 检查 activemq.xml 中的 <authentication> 配置,或确保 JmsUtils 中的凭据正确。
  • 消费者收不到消息

    • 原因:
      1. 队列名称 (QUEUE_NAME) 在生产者和消费者中不一致。
      2. 消费者没有调用 connection.start()
      3. 消费者代码执行完毕,连接被关闭了(特别是如果使用同步 receive() 后没有循环)。
    • 解决: 仔细检查以上三点。

这个实例为您提供了一个坚实的基础,您可以根据它来构建更复杂的消息驱动应用程序。

分享:
扫描分享到社交APP
上一篇
下一篇