杰瑞科技汇

python 监控activemq

监控的核心指标

在开始之前,我们先明确需要监控哪些关键指标,这些指标能帮助我们全面了解 ActiveMQ 的健康状况:

python 监控activemq-图1
(图片来源网络,侵删)
  1. Broker 状态

    • 是否正在运行?
    • 启动时间?
  2. 内存使用

    • JVM 堆内存使用量、最大值和已使用百分比。
    • 非堆内存使用情况。
    • 内存使用率过高可能导致消息处理缓慢或服务崩溃。
  3. 队列状态

    • 队列数量:系统中总共有多少个队列。
    • 每个队列的消息数
      • EnqueueCount:入队消息总数。
      • DequeueCount:出队消息总数。
      • DispatchCount:派发消息总数(可能被消费者签收也可能未签收)。
      • Size:队列中当前未消费的消息数量(最重要的指标)。
      • ConsumerCount:当前连接到此队列的消费者数量。
      • ProducerCount:当前连接到此队列的生产者数量。
  4. 主题 状态

    python 监控activemq-图2
    (图片来源网络,侵删)

    类似于队列,但主题是发布/订阅模式,关注订阅者数量和消息流量。

  5. 网络连接

    当前活跃的连接数。

  6. 死信队列

    python 监控activemq-图3
    (图片来源网络,侵删)

    死信队列中的消息数量,如果持续增长,说明有消息处理失败,需要人工干预。


使用 Web Console API (最简单、推荐)

ActiveMQ 内置了一个基于 Web 的管理控制台,它提供了一个 RESTful API,我们可以直接通过 Python 的 requests 库来调用这些 API,获取监控数据,这是最简单、最直接的方法,无需安装额外的客户端库。

优点

  • 简单易用:只需要 requests 库。
  • 信息全面:几乎可以获取 Web Console 上能看到的所有信息。
  • 无额外依赖:不依赖 Java 环境。

缺点

  • 需要 ActiveMQ 的 Web Console 功能是开启的(默认是开启的)。
  • API 端点可能在未来的 ActiveMQ 版本中发生变化(但非常稳定)。

实现代码

import requests
import json
from urllib.parse import urljoin
# --- 配置 ---
ACTIVEMQ_URL = "http://localhost:8161"  # ActiveMQ Web Console 地址
ACTIVEMQ_USER = "admin"                # 用户名
ACTIVEMQ_PASSWORD = "admin"            # 密码
def get_activemq_stats():
    """
    获取 ActiveMQ 的关键统计信息
    """
    session = requests.Session()
    session.auth = (ACTIVEMQ_USER, ACTIVEMQ_PASSWORD)
    # 禁用 SSL 验证(如果使用自签名证书,仅用于测试环境)
    # session.verify = False 
    stats = {}
    try:
        # 1. 获取 Broker 整体信息 (JVM, 内存等)
        broker_url = urljoin(ACTIVEMQ_URL, "/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost")
        response = session.get(broker_url, timeout=5)
        response.raise_for_status()
        broker_data = response.json()['value']
        stats['broker'] = {
            'name': broker_data['BrokerName'],
            'uptime': broker_data['Uptime'],
            'memory_heap_usage_percent': round((broker_data['MemoryLimit'] - broker_data['MemoryPercentFree']) / broker_data['MemoryLimit'] * 100, 2)
        }
        # 2. 获取所有队列
        queues_url = urljoin(ACTIVEMQ_URL, "/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=*")
        response = session.get(queues_url, timeout=5)
        response.raise_for_status()
        queues_data = response.json()['value']
        queue_stats = {}
        for queue_name, data in queues_data.items():
            queue_name = queue_name.split('destinationName=')[-1].strip("'")
            queue_stats[queue_name] = {
                'size': data['QueueSize'],
                'consumer_count': data['ConsumerCount'],
                'enqueue_count': data['EnqueueCount'],
                'dequeue_count': data['DequeueCount']
            }
        stats['queues'] = queue_stats
        # 3. 获取所有 Topic (可选)
        topics_url = urljoin(ACTIVEMQ_URL, "/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=*")
        response = session.get(topics_url, timeout=5)
        # topics_data = response.json()['value'] # 解析逻辑类似队列
        # stats['topics'] = topics_data
        # 4. 获取死信队列 (假设死信队列名为 ActiveMQ.DLQ)
        dlq_url = urljoin(ACTIVEMQ_URL, f"/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName='ActiveMQ.DLQ'")
        response = session.get(dlq_url, timeout=5)
        if response.status_code == 200:
            dlq_data = response.json()['value']
            stats['dead_letter_queue'] = {
                'name': 'ActiveMQ.DLQ',
                'size': dlq_data['QueueSize']
            }
        else:
            stats['dead_letter_queue'] = {'error': 'DLQ not found or not accessible'}
        return stats
    except requests.exceptions.RequestException as e:
        print(f"请求 ActiveMQ API 失败: {e}")
        return None
    except json.JSONDecodeError as e:
        print(f"解析 JSON 响应失败: {e}")
        return None
    except KeyError as e:
        print(f"响应数据结构不符合预期,缺少键: {e}")
        return None
def print_stats(stats):
    """
    格式化打印统计信息
    """
    if not stats:
        return
    print("\n" + "="*50)
    print("ActiveMQ 监控报告")
    print("="*50)
    # 打印 Broker 信息
    broker = stats.get('broker', {})
    print(f"\n[Broker 状态]")
    print(f"  名称: {broker.get('name', 'N/A')}")
    print(f"  运行时间: {broker.get('uptime', 'N/A')} ms")
    print(f"  堆内存使用率: {broker.get('memory_heap_usage_percent', 'N/A')}%")
    # 打印队列信息
    queues = stats.get('queues', {})
    print(f"\n[队列状态] (共 {len(queues)} 个队列)")
    if queues:
        for name, data in queues.items():
            print(f"  - 队列: {name}")
            print(f"    当前消息数: {data.get('size', 0)}")
            print(f"    消费者数: {data.get('consumer_count', 0)}")
            print(f"    累计入队: {data.get('enqueue_count', 0)}")
            print(f"    累计出队: {data.get('dequeue_count', 0)}")
    else:
        print("  未找到队列信息。")
    # 打印死信队列信息
    dlq = stats.get('dead_letter_queue', {})
    print(f"\n[死信队列状态]")
    if 'error' in dlq:
        print(f"  {dlq['error']}")
    else:
        print(f"  名称: {dlq.get('name', 'N/A')}")
        print(f"  当前消息数: {dlq.get('size', 0)}")
    print("="*50 + "\n")
if __name__ == "__main__":
    # 示例:每隔10秒打印一次监控信息
    import time
    while True:
        stats = get_activemq_stats()
        print_stats(stats)
        time.sleep(10)

如何运行:

  1. 确保你的 ActiveMQ 已经启动,Web Console (默认端口 8161) 可以访问。
  2. 安装 requests 库:pip install requests
  3. 修改代码中的 ACTIVEMQ_URL, ACTIVEMQ_USER, ACTIVEMQ_PASSWORD 为你的实际配置。
  4. 运行脚本:python your_script_name.py

使用 STOMP 协议进行客户端监控

这种方法模拟一个真实的消息消费者或生产者,通过 STOMP 协议连接到 ActiveMQ,从内部获取信息,这更侧重于从客户端视角检查服务的可用性和连通性。

优点

  • 模拟真实客户端行为,可以测试连通性。
  • 可以监控特定队列的消费者是否正常工作。

缺点

  • 实现相对复杂,需要处理网络连接、订阅、消息接收等。
  • 主要用于连通性检查和简单消息数获取,不如 API 全面。

实现代码 (使用 stomp.py 库)

import stomp
import time
import json
# --- 配置 ---
ACTIVEMQ_HOST = "localhost"
ACTIVEMQ_PORT = 61613  # STOMP 协议端口
ACTIVEMQ_USER = "admin"
ACTIVEMQ_PASSWORD = "admin"
QUEUE_NAME = "TEST.QUEUE" # 你要监控的队列名
class ActiveMQMonitor(stomp.ConnectionListener):
    def __init__(self, queue_name):
        self.queue_name = queue_name
        self.message_count = 0
    def on_error(self, headers, message):
        print(f"接收错误: {message}")
    def on_message(self, headers, message):
        """
        当从队列接收到消息时调用
        """
        self.message_count += 1
        print(f"从 {self.queue_name} 接收到消息 #{self.message_count}: {message}")
    def on_connected(self, headers, body):
        print("已连接到 ActiveMQ")
        # 订阅队列,接收消息
        self.conn.subscribe(destination=self.queue_name, id=1, ack='auto')
    def on_disconnected(self):
        print("与 ActiveMQ 的连接已断开")
def monitor_connectivity_and_queue():
    """
    监控 ActiveMQ 的连通性并尝试从一个队列消费一条消息
    """
    conn = stomp.Connection([(ACTIVEMQ_HOST, ACTIVEMQ_PORT)])
    listener = ActiveMQMonitor(QUEUE_NAME)
    conn.set_listener('', listener)
    try:
        print(f"正在尝试连接到 {ACTIVEMQ_HOST}:{ACTIVEMQ_PORT}...")
        conn.start()
        conn.connect(username=ACTIVEMQ_USER, password=ACTIVEMQ_PASSWORD, wait=True)
        # 等待一段时间,看看是否能收到消息
        print(f"已连接,正在监控队列 '{QUEUE_NAME}' 10秒...")
        time.sleep(10)
        print(f"\n在10秒内,从 '{QUEUE_NAME}' 成功接收到 {listener.message_count} 条消息。")
        print("这表明 ActiveMQ 服务正常,并且该队列至少有一个活跃的消费者。")
    except Exception as e:
        print(f"连接或监控失败: {e}")
    finally:
        conn.disconnect()
if __name__ == "__main__":
    monitor_connectivity_and_queue()

如何运行:

  1. 安装 stomp.py 库:pip install stomp.py
  2. 确保你的 ActiveMQ 的 STOMP 连接器是开启的(默认开启)。
  3. 修改配置并运行。

使用 Jolokia (底层技术)

方法一中的 API 正是基于 Jolokia,Jolokia 是一个通过 HTTP 来访问 JMX (Java Management Extensions) MBeans 的库,ActiveMQ 暴露了大量的 JMX MBean 来供监控。

如果你需要更底层的控制,或者想编写一个通用的 JMX 监控工具,可以直接使用 Jolokia 的 Python 客户端库 jolokia

优点

  • 功能强大,可以访问所有 JMX 暴露的指标。
  • 是方法一的底层实现,更灵活。

缺点

  • 需要理解 JMX 的概念,学习曲线稍陡峭。

实现代码 (使用 jolokia 库)

# pip install jolokia
from jolokia import Jolokia
# --- 配置 ---
JOLokia_URL = "http://admin:admin@localhost:8161/jolokia/" # jolokia agent 的 endpoint
j4p = Jolokia(JOLokia_URL)
try:
    # 获取 JVM 内存信息
    response = j4p.request(
        {
            "type": "read",
            "mbean": "java.lang:type=Memory",
            "attribute": "HeapMemoryUsage"
        }
    )
    heap_usage = response['value']
    max_memory = heap_usage['max']
    used_memory = heap_usage['used']
    percent_used = (used_memory / max_memory) * 100
    print(f"JVM 堆内存:")
    print(f"  已使用: {used_memory / (1024*1024):.2f} MB")
    print(f"  最大值: {max_memory / (1024*1024):.2f} MB")
    print(f"  使用率: {percent_used:.2f}%")
    # 获取特定队列的大小
    queue_name = "TEST.QUEUE"
    response = j4p.request(
        {
            "type": "read",
            "mbean": f"org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName='{queue_name}'",
            "attribute": "QueueSize"
        }
    )
    queue_size = response['value']
    print(f"\n队列 '{queue_name}' 当前消息数: {queue_size}")
except Exception as e:
    print(f"Jolokia 请求失败: {e}")

总结与建议

方法 优点 缺点 适用场景
Web Console API 简单、全面、无额外依赖 依赖 Web Console 开启,API 可能在未来版本变 绝大多数监控场景,如编写健康检查脚本、Prometheus Exporter、自定义监控面板等。强烈推荐
STOMP 协议 模拟真实客户端,测试连通性 实现复杂,信息获取不全面 连通性测试、简单的消费者行为模拟。
Jolokia 库 功能强大,灵活,底层 学习曲线陡峭,需要理解 JMX 需要访问 JMX 中所有指标,或构建通用 JMX 监控工具。

对于绝大多数 Python 开发者和运维人员来说,方法一(Web Console API)是最佳选择。 它提供了最便捷的途径来获取 ActiveMQ 的核心监控数据,并且足够稳定和强大,你可以基于这个方法,将数据发送到监控系统(如 Prometheus, Zabbix, InfluxDB)或者触发告警(如通过邮件、钉钉、Slack)。

分享:
扫描分享到社交APP
上一篇
下一篇