RocketMQ 官方提供了两个主要的 Python 客户端库,它们适用于不同的场景和版本:

- rocketmq-python (官方推荐): 这是 RocketMQ 5.x 版本的官方 Python 客户端,它基于 gRPC 和 Protobuf 构建,是未来发展的方向,功能更现代化。
- python-rocketmq (社区维护): 这是 RocketMQ 4.x 版本的 Python 客户端,它基于 TCP 协议和自定义的二进制协议,是使用最广泛的 Python 客户端,但已不再由官方主推。
核心建议:
- 如果你正在使用或计划升级到 RocketMQ 5.x,请使用
rocketmq-python。 - 如果你仍在使用 RocketMQ 4.x,或者项目依赖比较老,请使用
python-rocketmq。
rocketmq-python (官方,适用于 RocketMQ 5.x)
这是官方推荐的、与 RocketMQ 5.x 消息队列配套的客户端。
安装
pip install rocketmq
核心概念
- Producer (生产者): 发送消息到 Broker。
- Consumer (消费者): 从 Broker 消费消息。
- Message (消息): 要发送的数据载体。
- Topic (主题): 消息的逻辑分类,生产者发送到特定主题,消费者订阅特定主题。
- Tag (标签): 主题的进一步细分,用于在消费端进行更精确的消息过滤。
示例代码
这个库提供了非常现代的异步 API,非常适合与 Python 的 asyncio 生态结合使用。
准备工作 确保你已经启动了 RocketMQ 的 NameServer 和 Broker。

发送消息 (Producer)
import asyncio
from rocketmq.client import Producer, Message
async def send_message():
# 1. 创建一个 Producer
# 参数是 GroupName,用于标识一组 Producer
producer = Producer("python_producer_group")
# 2. 设置 NameServer 地址
producer.set_name_server_address("127.0.0.1:9876")
# 3. 启动 Producer
await producer.start()
try:
# 4. 创建一条消息
# 参数分别是 Topic, Body (消息内容), Tag (可选)
msg = Message("python-topic", "Hello, RocketMQ 5 from python!".encode("utf-8"), tag="tagA")
# 5. 发送消息
# send_sync 是同步发送,会阻塞直到收到响应
result = await producer.send_sync(msg)
print(f"Send message result: {result.status}, {result.msg_id}")
finally:
# 6. 关闭 Producer
await producer.shutdown()
# 运行异步函数
asyncio.run(send_message())
消费消息 (Consumer)
import asyncio
from rocketmq.client import PushConsumer
async def consume_message():
# 1. 创建一个 PushConsumer
# 参数是 GroupName,用于标识一组 Consumer
consumer = PushConsumer("python_consumer_group")
# 2. 设置 NameServer 地址
consumer.set_name_server_address("127.0.0.1:9876")
# 3. 订阅 Topic 和 Tag
# "*" 表示订阅该主题下的所有 Tag
consumer.subscribe("python-topic", "*")
# 4. 注册消息监听器
# 这是一个异步回调函数,当有消息到达时会被调用
async def listener(msg):
print(f"Received message: {msg.body.decode('utf-8')}")
print(f"Message ID: {msg.msg_id}, Topic: {msg.topic}, Tag: {msg.tag}")
# 消息消费成功后,必须调用 `ack` 来确认
# 如果不调用,消息可能会被重新投递
await msg.ack()
return # 返回 None 表示消费成功
consumer.register_message_listener(listener)
# 5. 启动 Consumer
await consumer.start()
print("Consumer started, waiting for messages...")
# 为了保持消费者运行,我们让主程序休眠
# 在实际应用中,这通常是你的主事件循环
await asyncio.sleep(30) # 运行30秒
# 6. 关闭 Consumer
await consumer.shutdown()
# 运行异步函数
asyncio.run(consume_message())
python-rocketmq (社区,适用于 RocketMQ 4.x)
这是一个成熟的社区库,基于 TCP 协议,提供了同步和异步的 API。
安装
pip install python-rocketmq
核心概念
- Producer (生产者): 发送消息。
- PushConsumer / PullConsumer (消费者):
PushConsumer: 由客户端长连接 Broker,Broker 将消息主动推送给消费者,使用更简单。PullConsumer: 由消费者主动向 Broker 拉取消息,更灵活,但需要自己管理拉取逻辑。
示例代码
准备工作 确保你已经启动了 RocketMQ 的 NameServer 和 Broker。

发送消息 (Producer)
import time
from rocketmq.client import Producer, Message
def send_message():
# 1. 创建一个 Producer
# 参数是 GroupName
producer = Producer("python_producer_group_v4")
# 2. 设置 NameServer 地址
producer.set_namesrv_addr("127.0.0.1:9876")
# 3. 启动 Producer
producer.start()
try:
# 4. 发送多条消息
for i in range(5):
msg = Message("python-topic-v4")
msg.set_body(f"Hello RocketMQ 4 from Python, message {i}".encode('utf-8'))
# 同步发送
result = producer.send_sync(msg)
print(f"Send message {i} result: {result.status}, {result.msg_id}")
time.sleep(1) # 间隔1秒
finally:
# 5. 关闭 Producer
producer.shutdown()
if __name__ == '__main__':
send_message()
消费消息 (PushConsumer)
from rocketmq.client import PushConsumer
def consume_message():
# 1. 创建一个 PushConsumer
# 参数是 GroupName
consumer = PushConsumer("python_consumer_group_v4")
# 2. 设置 NameServer 地址
consumer.set_namesrv_addr("127.0.0.1:9876")
# 3. 注册消息监听器
# 这是一个同步回调函数
def listener(msg):
print(f"Received message: {msg.body.decode('utf-8')}")
print(f"Message ID: {msg.msg_id}, Topic: {msg.topic}")
# 消息消费成功,返回 CONSUME_SUCCESS
# 如果消费失败,可以返回 RECONSUME_LATER,稍后会重试
return "CONSUME_SUCCESS"
# 4. 订阅 Topic 和 Tag
consumer.subscribe("python-topic-v4", "*", listener)
# 5. 启动 Consumer
consumer.start()
print("Consumer started, waiting for messages...")
# 为了保持消费者运行,我们让主程序休眠
# 在实际应用中,这通常是你的主程序循环
time.sleep(30) # 运行30秒
# 6. 关闭 Consumer
consumer.shutdown()
if __name__ == '__main__':
consume_message()
总结与对比
| 特性 | rocketmq-python (官方) |
python-rocketmq (社区) |
|---|---|---|
| 适用版本 | RocketMQ 5.x | RocketMQ 4.x |
| 协议 | gRPC / Protobuf | TCP / 自定义二进制协议 |
| API 风格 | 异步 (asyncio) |
同步 & 异步 (线程模型) |
| 官方支持 | 主推,官方维护 | 社区维护,已不再主推 |
| 依赖 | grpcio, protobuf |
enum34 (Python < 3.4) |
| 现代化程度 | 高,符合云原生趋势 | 较低,但稳定可靠 |
| 推荐度 | 强烈推荐用于新项目 | 仅用于维护旧项目或 4.x 环境 |
如何选择?
- 新项目、新系统:毫不犹豫地选择
rocketmq-python,它与 RocketMQ 5.x 的云原生特性(如消息轨迹、客户端重试等)结合更紧密,API 设计也更符合现代 Python 开发范式。 - 维护旧项目:如果你的项目已经基于
python-rocketmq并且运行在 RocketMQ 4.x 上,继续使用它是没有问题的,因为它非常稳定,但如果未来要升级到 RocketMQ 5.x,就必须迁移到rocketmq-python。 - 学习与测试:建议从
rocketmq-python开始学习,因为它代表了未来的方向,其异步 API 能让你更好地体验 Python 生态的优势。
