- 使用 RabbitMQ 的管理插件:这是最常用、最简单的方法,通过 HTTP API 获取丰富的管理信息。
- 使用
pika库与 AMQP 协议交互:通过发送特定的方法(如queue.declare())来获取队列或交换机的基础状态。 - 使用
aio-pika库:pika的异步版本,适用于异步应用,监控方式与pika类似。
下面我将重点介绍第一种方法,因为它功能最强大,也是生产环境中的首选。

使用 RabbitMQ Management HTTP API (推荐)
RabbitMQ 自带了一个强大的管理插件,默认监听在 15672 端口,它提供了一个 Web 界面和一个 RESTful API,我们可以通过 Python 的 requests 库来调用这个 API,获取几乎所有的状态信息。
步骤 1:启用 Management 插件
RabbitMQ 服务器上还没有启用管理插件,需要先启用它,在 RabbitMQ 服务器上执行:
rabbitmq-plugins enable rabbitmq_management
然后重启 RabbitMQ 服务。
步骤 2:获取 API 访问凭证
你需要一个具有足够权限的用户来访问 API,默认用户 guest 只能从 localhost 访问,建议创建一个专用的监控用户。
# 创建一个名为 monitor 的用户,密码为 your_password rabbitmqctl add_user monitor your_password # 赋予该用户监控和管理所有资源的权限 rabbitmqctl set_permissions -p / monitor ".*" ".*" ".*" rabbitmqctl set_user_tags monitor administrator
步骤 3:使用 Python 和 requests 库获取状态
安装 requests 库:
pip install requests
下面是一个完整的 Python 脚本示例,展示如何获取各种状态信息。
import requests
import json
from requests.auth import HTTPBasicAuth
# --- 配置 ---
RABBITMQ_API_URL = "http://your-rabbitmq-server:15672/api"
USERNAME = "monitor"
PASSWORD = "your_password"
def get_rabbitmq_status():
"""
获取并打印 RabbitMQ 的关键状态信息。
"""
try:
# 1. 检查整体连接和服务器状态
print("--- 1. 服务器概览 ---")
response = requests.get(f"{RABBITMQ_API_URL}/overview", auth=HTTPBasicAuth(USERNAME, PASSWORD))
response.raise_for_status() # 如果请求失败则抛出异常
overview = response.json()
print(f" RabbitMQ 版本: {overview.get('rabbit_version')}")
print(f" 服务器运行时间: {overview.get('uptime')}")
print(f" 对象总数: {overview.get('object_totals', {}).get('consumers', 0)} 消费者, "
f"{overview.get('object_totals', {}).get('queues', 0)} 队列, "
f"{overview.get('object_totals', {}).get('exchanges', 0)} 交换机")
# 2. 获取队列列表及其状态
print("\n--- 2. 队列状态 ---")
response = requests.get(f"{RABBITMQ_API_URL}/queues", auth=HTTPBasicAuth(USERNAME, PASSWORD))
response.raise_for_status()
queues = response.json()
if not queues:
print(" 没有找到队列。")
else:
for q in queues:
# 你可以根据需要选择要显示的字段
print(f" 队列名: {q['name']:<20} | 消息数: {q['messages']:<10} | 消费者数: {q['consumers']:<5} | 状态: {'活跃' if q.get('active') else '不活跃'}")
# 3. 获取连接列表
print("\n--- 3. 活动连接 ---")
response = requests.get(f"{RABBITMQ_API_URL}/connections", auth=HTTPBasicAuth(USERNAME, PASSWORD))
response.raise_for_status()
connections = response.json()
if not connections:
print(" 没有活动连接。")
else:
for conn in connections:
print(f" 客户端地址: {conn['client_properties'].get('connection_name', 'N/A'):<30} | "
f"用户: {conn['user']:<10} | "
f"通道数: {conn['channel_details']['channel_count']:<5} | "
f"状态: {'已连接' if conn.get('state') == 'running' else '未连接'}")
# 4. 获取节点信息
print("\n--- 4. 集群节点状态 ---")
response = requests.get(f"{RABBITMQ_API_URL}/nodes", auth=HTTPBasicAuth(USERNAME, PASSWORD))
response.raise_for_status()
nodes = response.json()
for node in nodes:
print(f" 节点名: {node['name']:<20} | 状态: {node['running']:<10} | "
f"内存使用: {node.get('mem_used', 0) / (1024**2):.2f} MB | "
f"磁盘空闲: {node.get('disk_free', 0) / (1024**3):.2f} GB")
# 5. 检查特定的队列状态
print("\n--- 5. 特定队列 'my_test_queue' 的详细状态 ---")
queue_name = "my_test_queue"
response = requests.get(f"{RABBITMQ_API_URL}/queues/%2F/{queue_name}", auth=HTTPBasicAuth(USERNAME, PASSWORD))
if response.status_code == 200:
queue_details = response.json()
print(json.dumps(queue_details, indent=2, ensure_ascii=False))
else:
print(f" 队列 '{queue_name}' 不存在或无法访问。")
except requests.exceptions.RequestException as e:
print(f"请求 RabbitMQ API 失败: {e}")
except json.JSONDecodeError:
print("解析 JSON 响应失败。")
if __name__ == "__main__":
get_rabbitmq_status()
脚本说明:
- 认证:使用
HTTPBasicAuth进行用户名和密码认证。 - API 端点:
/api/overview: 服务器概览,包含运行时间、对象总数等。/api/queues: 所有队列的列表,每个队列包含名称、消息数、消费者数等关键信息。/api/connections: 所有客户端连接的列表。/api/nodes: 集群中所有节点的列表。/api/queues/%2F/{queue_name}: 获取指定队列的详细信息。%2F是 的 URL 编码,因为队列名可能包含斜杠。
- 错误处理:使用
try...except捕获网络请求和 JSON 解析可能发生的错误。
使用 pika 库获取基础状态
如果你不想依赖 Management 插件,或者只需要在应用内部进行轻量级检查,可以使用 pika 库,这种方法通过 AMQP 协议的 Queue.Declare 方法来获取队列状态,但信息有限。
步骤 1:安装 pika 库
pip install pika
步骤 2:编写 Python 脚本
import pika
# --- 配置 ---
RABBITMQ_HOST = 'your-rabbitmq-server'
QUEUE_NAME = 'my_test_queue'
USERNAME = 'guest' # 或者你的应用用户
PASSWORD = 'guest'
def check_queue_status_with_pika():
"""
使用 pika 检查队列是否存在及其消息数。
"""
credentials = pika.PlainCredentials(USERNAME, PASSWORD)
parameters = pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials)
try:
# 1. 建立连接
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 2. 声明队列 (passive=True 表示只检查,不创建)
# 这个方法会返回队列的详细信息
method_frame, header_frame, body = channel.queue_declare(
queue=QUEUE_NAME,
passive=True # 关键:设置为被动模式,仅获取信息
)
if method_frame:
print(f"队列 '{QUEUE_NAME}' 存在。")
print(f" 消息数: {method_frame.message_count}")
print(f" 消费者数: {method_frame.consumer_count}")
else:
print(f"队列 '{QUEUE_NAME}' 不存在。")
connection.close()
except pika.exceptions.ChannelClosedByBroker as e:
# 当队列不存在时,Broker 会关闭通道并返回 404 错误
if e.reply_code == 404:
print(f"队列 '{QUEUE_NAME}' 不存在。")
else:
print(f"发生其他通道错误: {e}")
except pika.exceptions.AMQPConnectionError as e:
print(f"连接 RabbitMQ 失败: {e}")
if __name__ == "__main__":
check_queue_status_with_pika()
脚本说明:
passive=True:这是关键,它告诉 RabbitMQ 我们只想查询队列的状态,而不是创建它,如果队列不存在,RabbitMQ 会返回一个ChannelClosedByBroker异常,错误码为404。- 信息有限:这种方法只能获取到队列的
message_count(待处理消息数)和consumer_count(消费者数量),它无法获取服务器概览、连接列表等更宏观的信息。 - 适用场景:非常适合在你自己的应用程序中,作为启动或运行时的一部分,检查依赖的队列是否就绪。
总结与对比
| 特性 | 方法一 (Management API) | 方法二 (pika) |
|---|---|---|
| 依赖 | 需要 RabbitMQ Management 插件 | 不需要任何插件 |
| 信息量 | 非常丰富:服务器、队列、交换机、连接、通道、节点、用户等 | 非常有限:主要是单个队列的消息数和消费者数 |
| 复杂度 | 需要额外配置用户权限,使用 HTTP 请求 | 直接集成到应用代码中,使用 AMQP 协议 |
| 适用场景 | 监控、运维、自动化脚本、全面的健康检查 | 应用内逻辑检查(如:确保队列存在)、轻量级验证 |
| 推荐度 | ★★★★★ (首选) | ★★★☆☆ (特定场景) |
- 如果你需要编写一个独立的监控脚本、健康检查服务或进行 RabbitMQ 的日常运维管理,强烈推荐使用方法一(Management API),因为它提供了最全面和最准确的信息。
- 如果你的 Python 应用程序本身就是一个 RabbitMQ 消费者或生产者,并且只需要在程序启动时确认目标队列是否存在,那么方法二(pika)是一个轻量级且高效的选择。
