杰瑞科技汇

python rabbitmq状态

  1. 使用 RabbitMQ 的管理插件:这是最常用、最简单的方法,通过 HTTP API 获取丰富的管理信息。
  2. 使用 pika 库与 AMQP 协议交互:通过发送特定的方法(如 queue.declare())来获取队列或交换机的基础状态。
  3. 使用 aio-pikapika 的异步版本,适用于异步应用,监控方式与 pika 类似。

下面我将重点介绍第一种方法,因为它功能最强大,也是生产环境中的首选。

python rabbitmq状态-图1
(图片来源网络,侵删)

使用 RabbitMQ Management HTTP API (推荐)

RabbitMQ 自带了一个强大的管理插件,默认监听在 15672 端口,它提供了一个 Web 界面和一个 RESTful API,我们可以通过 Python 的 requests 库来调用这个 API,获取几乎所有的状态信息。

步骤 1:启用 Management 插件

RabbitMQ 服务器上还没有启用管理插件,需要先启用它,在 RabbitMQ 服务器上执行:

rabbitmq-plugins enable rabbitmq_management

然后重启 RabbitMQ 服务。

步骤 2:获取 API 访问凭证

你需要一个具有足够权限的用户来访问 API,默认用户 guest 只能从 localhost 访问,建议创建一个专用的监控用户。

# 创建一个名为 monitor 的用户,密码为 your_password
rabbitmqctl add_user monitor your_password
# 赋予该用户监控和管理所有资源的权限
rabbitmqctl set_permissions -p / monitor ".*" ".*" ".*"
rabbitmqctl set_user_tags monitor administrator

步骤 3:使用 Python 和 requests 库获取状态

安装 requests 库:

pip install requests

下面是一个完整的 Python 脚本示例,展示如何获取各种状态信息。

import requests
import json
from requests.auth import HTTPBasicAuth
# --- 配置 ---
RABBITMQ_API_URL = "http://your-rabbitmq-server:15672/api"
USERNAME = "monitor"
PASSWORD = "your_password"
def get_rabbitmq_status():
    """
    获取并打印 RabbitMQ 的关键状态信息。
    """
    try:
        # 1. 检查整体连接和服务器状态
        print("--- 1. 服务器概览 ---")
        response = requests.get(f"{RABBITMQ_API_URL}/overview", auth=HTTPBasicAuth(USERNAME, PASSWORD))
        response.raise_for_status()  # 如果请求失败则抛出异常
        overview = response.json()
        print(f" RabbitMQ 版本: {overview.get('rabbit_version')}")
        print(f" 服务器运行时间: {overview.get('uptime')}")
        print(f" 对象总数: {overview.get('object_totals', {}).get('consumers', 0)} 消费者, "
              f"{overview.get('object_totals', {}).get('queues', 0)} 队列, "
              f"{overview.get('object_totals', {}).get('exchanges', 0)} 交换机")
        # 2. 获取队列列表及其状态
        print("\n--- 2. 队列状态 ---")
        response = requests.get(f"{RABBITMQ_API_URL}/queues", auth=HTTPBasicAuth(USERNAME, PASSWORD))
        response.raise_for_status()
        queues = response.json()
        if not queues:
            print(" 没有找到队列。")
        else:
            for q in queues:
                # 你可以根据需要选择要显示的字段
                print(f" 队列名: {q['name']:<20} | 消息数: {q['messages']:<10} | 消费者数: {q['consumers']:<5} | 状态: {'活跃' if q.get('active') else '不活跃'}")
        # 3. 获取连接列表
        print("\n--- 3. 活动连接 ---")
        response = requests.get(f"{RABBITMQ_API_URL}/connections", auth=HTTPBasicAuth(USERNAME, PASSWORD))
        response.raise_for_status()
        connections = response.json()
        if not connections:
            print(" 没有活动连接。")
        else:
            for conn in connections:
                print(f" 客户端地址: {conn['client_properties'].get('connection_name', 'N/A'):<30} | "
                      f"用户: {conn['user']:<10} | "
                      f"通道数: {conn['channel_details']['channel_count']:<5} | "
                      f"状态: {'已连接' if conn.get('state') == 'running' else '未连接'}")
        # 4. 获取节点信息
        print("\n--- 4. 集群节点状态 ---")
        response = requests.get(f"{RABBITMQ_API_URL}/nodes", auth=HTTPBasicAuth(USERNAME, PASSWORD))
        response.raise_for_status()
        nodes = response.json()
        for node in nodes:
            print(f" 节点名: {node['name']:<20} | 状态: {node['running']:<10} | "
                  f"内存使用: {node.get('mem_used', 0) / (1024**2):.2f} MB | "
                  f"磁盘空闲: {node.get('disk_free', 0) / (1024**3):.2f} GB")
        # 5. 检查特定的队列状态
        print("\n--- 5. 特定队列 'my_test_queue' 的详细状态 ---")
        queue_name = "my_test_queue"
        response = requests.get(f"{RABBITMQ_API_URL}/queues/%2F/{queue_name}", auth=HTTPBasicAuth(USERNAME, PASSWORD))
        if response.status_code == 200:
            queue_details = response.json()
            print(json.dumps(queue_details, indent=2, ensure_ascii=False))
        else:
            print(f" 队列 '{queue_name}' 不存在或无法访问。")
    except requests.exceptions.RequestException as e:
        print(f"请求 RabbitMQ API 失败: {e}")
    except json.JSONDecodeError:
        print("解析 JSON 响应失败。")
if __name__ == "__main__":
    get_rabbitmq_status()

脚本说明:

  • 认证:使用 HTTPBasicAuth 进行用户名和密码认证。
  • API 端点
    • /api/overview: 服务器概览,包含运行时间、对象总数等。
    • /api/queues: 所有队列的列表,每个队列包含名称、消息数、消费者数等关键信息。
    • /api/connections: 所有客户端连接的列表。
    • /api/nodes: 集群中所有节点的列表。
    • /api/queues/%2F/{queue_name}: 获取指定队列的详细信息。%2F 是 的 URL 编码,因为队列名可能包含斜杠。
  • 错误处理:使用 try...except 捕获网络请求和 JSON 解析可能发生的错误。

使用 pika 库获取基础状态

如果你不想依赖 Management 插件,或者只需要在应用内部进行轻量级检查,可以使用 pika 库,这种方法通过 AMQP 协议的 Queue.Declare 方法来获取队列状态,但信息有限。

步骤 1:安装 pika

pip install pika

步骤 2:编写 Python 脚本

import pika
# --- 配置 ---
RABBITMQ_HOST = 'your-rabbitmq-server'
QUEUE_NAME = 'my_test_queue'
USERNAME = 'guest' # 或者你的应用用户
PASSWORD = 'guest'
def check_queue_status_with_pika():
    """
    使用 pika 检查队列是否存在及其消息数。
    """
    credentials = pika.PlainCredentials(USERNAME, PASSWORD)
    parameters = pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials)
    try:
        # 1. 建立连接
        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()
        # 2. 声明队列 (passive=True 表示只检查,不创建)
        # 这个方法会返回队列的详细信息
        method_frame, header_frame, body = channel.queue_declare(
            queue=QUEUE_NAME,
            passive=True  # 关键:设置为被动模式,仅获取信息
        )
        if method_frame:
            print(f"队列 '{QUEUE_NAME}' 存在。")
            print(f"  消息数: {method_frame.message_count}")
            print(f"  消费者数: {method_frame.consumer_count}")
        else:
            print(f"队列 '{QUEUE_NAME}' 不存在。")
        connection.close()
    except pika.exceptions.ChannelClosedByBroker as e:
        # 当队列不存在时,Broker 会关闭通道并返回 404 错误
        if e.reply_code == 404:
            print(f"队列 '{QUEUE_NAME}' 不存在。")
        else:
            print(f"发生其他通道错误: {e}")
    except pika.exceptions.AMQPConnectionError as e:
        print(f"连接 RabbitMQ 失败: {e}")
if __name__ == "__main__":
    check_queue_status_with_pika()

脚本说明:

  • passive=True:这是关键,它告诉 RabbitMQ 我们只想查询队列的状态,而不是创建它,如果队列不存在,RabbitMQ 会返回一个 ChannelClosedByBroker 异常,错误码为 404
  • 信息有限:这种方法只能获取到队列的 message_count(待处理消息数)和 consumer_count(消费者数量),它无法获取服务器概览、连接列表等更宏观的信息。
  • 适用场景:非常适合在你自己的应用程序中,作为启动或运行时的一部分,检查依赖的队列是否就绪。

总结与对比

特性 方法一 (Management API) 方法二 (pika)
依赖 需要 RabbitMQ Management 插件 不需要任何插件
信息量 非常丰富:服务器、队列、交换机、连接、通道、节点、用户等 非常有限:主要是单个队列的消息数和消费者数
复杂度 需要额外配置用户权限,使用 HTTP 请求 直接集成到应用代码中,使用 AMQP 协议
适用场景 监控、运维、自动化脚本、全面的健康检查 应用内逻辑检查(如:确保队列存在)、轻量级验证
推荐度 ★★★★★ (首选) ★★★☆☆ (特定场景)
  • 如果你需要编写一个独立的监控脚本、健康检查服务或进行 RabbitMQ 的日常运维管理,强烈推荐使用方法一(Management API),因为它提供了最全面和最准确的信息。
  • 如果你的 Python 应用程序本身就是一个 RabbitMQ 消费者或生产者,并且只需要在程序启动时确认目标队列是否存在,那么方法二(pika)是一个轻量级且高效的选择。
分享:
扫描分享到社交APP
上一篇
下一篇