监控的核心指标
在开始之前,我们先明确需要监控哪些关键指标,这些指标能帮助我们全面了解 ActiveMQ 的健康状况:

-
Broker 状态:
- 是否正在运行?
- 启动时间?
-
内存使用:
- JVM 堆内存使用量、最大值和已使用百分比。
- 非堆内存使用情况。
- 内存使用率过高可能导致消息处理缓慢或服务崩溃。
-
队列状态:
- 队列数量:系统中总共有多少个队列。
- 每个队列的消息数:
EnqueueCount:入队消息总数。DequeueCount:出队消息总数。DispatchCount:派发消息总数(可能被消费者签收也可能未签收)。Size:队列中当前未消费的消息数量(最重要的指标)。ConsumerCount:当前连接到此队列的消费者数量。ProducerCount:当前连接到此队列的生产者数量。
-
主题 状态:
(图片来源网络,侵删)类似于队列,但主题是发布/订阅模式,关注订阅者数量和消息流量。
-
网络连接:
当前活跃的连接数。
-
死信队列:
(图片来源网络,侵删)死信队列中的消息数量,如果持续增长,说明有消息处理失败,需要人工干预。
使用 Web Console API (最简单、推荐)
ActiveMQ 内置了一个基于 Web 的管理控制台,它提供了一个 RESTful API,我们可以直接通过 Python 的 requests 库来调用这些 API,获取监控数据,这是最简单、最直接的方法,无需安装额外的客户端库。
优点
- 简单易用:只需要
requests库。 - 信息全面:几乎可以获取 Web Console 上能看到的所有信息。
- 无额外依赖:不依赖 Java 环境。
缺点
- 需要 ActiveMQ 的 Web Console 功能是开启的(默认是开启的)。
- API 端点可能在未来的 ActiveMQ 版本中发生变化(但非常稳定)。
实现代码
import requests
import json
from urllib.parse import urljoin
# --- 配置 ---
ACTIVEMQ_URL = "http://localhost:8161" # ActiveMQ Web Console 地址
ACTIVEMQ_USER = "admin" # 用户名
ACTIVEMQ_PASSWORD = "admin" # 密码
def get_activemq_stats():
"""
获取 ActiveMQ 的关键统计信息
"""
session = requests.Session()
session.auth = (ACTIVEMQ_USER, ACTIVEMQ_PASSWORD)
# 禁用 SSL 验证(如果使用自签名证书,仅用于测试环境)
# session.verify = False
stats = {}
try:
# 1. 获取 Broker 整体信息 (JVM, 内存等)
broker_url = urljoin(ACTIVEMQ_URL, "/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost")
response = session.get(broker_url, timeout=5)
response.raise_for_status()
broker_data = response.json()['value']
stats['broker'] = {
'name': broker_data['BrokerName'],
'uptime': broker_data['Uptime'],
'memory_heap_usage_percent': round((broker_data['MemoryLimit'] - broker_data['MemoryPercentFree']) / broker_data['MemoryLimit'] * 100, 2)
}
# 2. 获取所有队列
queues_url = urljoin(ACTIVEMQ_URL, "/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=*")
response = session.get(queues_url, timeout=5)
response.raise_for_status()
queues_data = response.json()['value']
queue_stats = {}
for queue_name, data in queues_data.items():
queue_name = queue_name.split('destinationName=')[-1].strip("'")
queue_stats[queue_name] = {
'size': data['QueueSize'],
'consumer_count': data['ConsumerCount'],
'enqueue_count': data['EnqueueCount'],
'dequeue_count': data['DequeueCount']
}
stats['queues'] = queue_stats
# 3. 获取所有 Topic (可选)
topics_url = urljoin(ACTIVEMQ_URL, "/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=*")
response = session.get(topics_url, timeout=5)
# topics_data = response.json()['value'] # 解析逻辑类似队列
# stats['topics'] = topics_data
# 4. 获取死信队列 (假设死信队列名为 ActiveMQ.DLQ)
dlq_url = urljoin(ACTIVEMQ_URL, f"/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName='ActiveMQ.DLQ'")
response = session.get(dlq_url, timeout=5)
if response.status_code == 200:
dlq_data = response.json()['value']
stats['dead_letter_queue'] = {
'name': 'ActiveMQ.DLQ',
'size': dlq_data['QueueSize']
}
else:
stats['dead_letter_queue'] = {'error': 'DLQ not found or not accessible'}
return stats
except requests.exceptions.RequestException as e:
print(f"请求 ActiveMQ API 失败: {e}")
return None
except json.JSONDecodeError as e:
print(f"解析 JSON 响应失败: {e}")
return None
except KeyError as e:
print(f"响应数据结构不符合预期,缺少键: {e}")
return None
def print_stats(stats):
"""
格式化打印统计信息
"""
if not stats:
return
print("\n" + "="*50)
print("ActiveMQ 监控报告")
print("="*50)
# 打印 Broker 信息
broker = stats.get('broker', {})
print(f"\n[Broker 状态]")
print(f" 名称: {broker.get('name', 'N/A')}")
print(f" 运行时间: {broker.get('uptime', 'N/A')} ms")
print(f" 堆内存使用率: {broker.get('memory_heap_usage_percent', 'N/A')}%")
# 打印队列信息
queues = stats.get('queues', {})
print(f"\n[队列状态] (共 {len(queues)} 个队列)")
if queues:
for name, data in queues.items():
print(f" - 队列: {name}")
print(f" 当前消息数: {data.get('size', 0)}")
print(f" 消费者数: {data.get('consumer_count', 0)}")
print(f" 累计入队: {data.get('enqueue_count', 0)}")
print(f" 累计出队: {data.get('dequeue_count', 0)}")
else:
print(" 未找到队列信息。")
# 打印死信队列信息
dlq = stats.get('dead_letter_queue', {})
print(f"\n[死信队列状态]")
if 'error' in dlq:
print(f" {dlq['error']}")
else:
print(f" 名称: {dlq.get('name', 'N/A')}")
print(f" 当前消息数: {dlq.get('size', 0)}")
print("="*50 + "\n")
if __name__ == "__main__":
# 示例:每隔10秒打印一次监控信息
import time
while True:
stats = get_activemq_stats()
print_stats(stats)
time.sleep(10)
如何运行:
- 确保你的 ActiveMQ 已经启动,Web Console (默认端口 8161) 可以访问。
- 安装
requests库:pip install requests - 修改代码中的
ACTIVEMQ_URL,ACTIVEMQ_USER,ACTIVEMQ_PASSWORD为你的实际配置。 - 运行脚本:
python your_script_name.py
使用 STOMP 协议进行客户端监控
这种方法模拟一个真实的消息消费者或生产者,通过 STOMP 协议连接到 ActiveMQ,从内部获取信息,这更侧重于从客户端视角检查服务的可用性和连通性。
优点
- 模拟真实客户端行为,可以测试连通性。
- 可以监控特定队列的消费者是否正常工作。
缺点
- 实现相对复杂,需要处理网络连接、订阅、消息接收等。
- 主要用于连通性检查和简单消息数获取,不如 API 全面。
实现代码 (使用 stomp.py 库)
import stomp
import time
import json
# --- 配置 ---
ACTIVEMQ_HOST = "localhost"
ACTIVEMQ_PORT = 61613 # STOMP 协议端口
ACTIVEMQ_USER = "admin"
ACTIVEMQ_PASSWORD = "admin"
QUEUE_NAME = "TEST.QUEUE" # 你要监控的队列名
class ActiveMQMonitor(stomp.ConnectionListener):
def __init__(self, queue_name):
self.queue_name = queue_name
self.message_count = 0
def on_error(self, headers, message):
print(f"接收错误: {message}")
def on_message(self, headers, message):
"""
当从队列接收到消息时调用
"""
self.message_count += 1
print(f"从 {self.queue_name} 接收到消息 #{self.message_count}: {message}")
def on_connected(self, headers, body):
print("已连接到 ActiveMQ")
# 订阅队列,接收消息
self.conn.subscribe(destination=self.queue_name, id=1, ack='auto')
def on_disconnected(self):
print("与 ActiveMQ 的连接已断开")
def monitor_connectivity_and_queue():
"""
监控 ActiveMQ 的连通性并尝试从一个队列消费一条消息
"""
conn = stomp.Connection([(ACTIVEMQ_HOST, ACTIVEMQ_PORT)])
listener = ActiveMQMonitor(QUEUE_NAME)
conn.set_listener('', listener)
try:
print(f"正在尝试连接到 {ACTIVEMQ_HOST}:{ACTIVEMQ_PORT}...")
conn.start()
conn.connect(username=ACTIVEMQ_USER, password=ACTIVEMQ_PASSWORD, wait=True)
# 等待一段时间,看看是否能收到消息
print(f"已连接,正在监控队列 '{QUEUE_NAME}' 10秒...")
time.sleep(10)
print(f"\n在10秒内,从 '{QUEUE_NAME}' 成功接收到 {listener.message_count} 条消息。")
print("这表明 ActiveMQ 服务正常,并且该队列至少有一个活跃的消费者。")
except Exception as e:
print(f"连接或监控失败: {e}")
finally:
conn.disconnect()
if __name__ == "__main__":
monitor_connectivity_and_queue()
如何运行:
- 安装
stomp.py库:pip install stomp.py - 确保你的 ActiveMQ 的 STOMP 连接器是开启的(默认开启)。
- 修改配置并运行。
使用 Jolokia (底层技术)
方法一中的 API 正是基于 Jolokia,Jolokia 是一个通过 HTTP 来访问 JMX (Java Management Extensions) MBeans 的库,ActiveMQ 暴露了大量的 JMX MBean 来供监控。
如果你需要更底层的控制,或者想编写一个通用的 JMX 监控工具,可以直接使用 Jolokia 的 Python 客户端库 jolokia。
优点
- 功能强大,可以访问所有 JMX 暴露的指标。
- 是方法一的底层实现,更灵活。
缺点
- 需要理解 JMX 的概念,学习曲线稍陡峭。
实现代码 (使用 jolokia 库)
# pip install jolokia
from jolokia import Jolokia
# --- 配置 ---
JOLokia_URL = "http://admin:admin@localhost:8161/jolokia/" # jolokia agent 的 endpoint
j4p = Jolokia(JOLokia_URL)
try:
# 获取 JVM 内存信息
response = j4p.request(
{
"type": "read",
"mbean": "java.lang:type=Memory",
"attribute": "HeapMemoryUsage"
}
)
heap_usage = response['value']
max_memory = heap_usage['max']
used_memory = heap_usage['used']
percent_used = (used_memory / max_memory) * 100
print(f"JVM 堆内存:")
print(f" 已使用: {used_memory / (1024*1024):.2f} MB")
print(f" 最大值: {max_memory / (1024*1024):.2f} MB")
print(f" 使用率: {percent_used:.2f}%")
# 获取特定队列的大小
queue_name = "TEST.QUEUE"
response = j4p.request(
{
"type": "read",
"mbean": f"org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName='{queue_name}'",
"attribute": "QueueSize"
}
)
queue_size = response['value']
print(f"\n队列 '{queue_name}' 当前消息数: {queue_size}")
except Exception as e:
print(f"Jolokia 请求失败: {e}")
总结与建议
| 方法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Web Console API | 简单、全面、无额外依赖 | 依赖 Web Console 开启,API 可能在未来版本变 | 绝大多数监控场景,如编写健康检查脚本、Prometheus Exporter、自定义监控面板等。强烈推荐。 |
| STOMP 协议 | 模拟真实客户端,测试连通性 | 实现复杂,信息获取不全面 | 连通性测试、简单的消费者行为模拟。 |
| Jolokia 库 | 功能强大,灵活,底层 | 学习曲线陡峭,需要理解 JMX | 需要访问 JMX 中所有指标,或构建通用 JMX 监控工具。 |
对于绝大多数 Python 开发者和运维人员来说,方法一(Web Console API)是最佳选择。 它提供了最便捷的途径来获取 ActiveMQ 的核心监控数据,并且足够稳定和强大,你可以基于这个方法,将数据发送到监控系统(如 Prometheus, Zabbix, InfluxDB)或者触发告警(如通过邮件、钉钉、Slack)。
