杰瑞科技汇

RocketMQ Python包怎么用?

RocketMQ 官方提供了两个主要的 Python 客户端库,它们适用于不同的场景和版本:

RocketMQ Python包怎么用?-图1
(图片来源网络,侵删)
  1. rocketmq-python (官方推荐): 这是 RocketMQ 5.x 版本的官方 Python 客户端,它基于 gRPCProtobuf 构建,是未来发展的方向,功能更现代化。
  2. python-rocketmq (社区维护): 这是 RocketMQ 4.x 版本的 Python 客户端,它基于 TCP 协议和自定义的二进制协议,是使用最广泛的 Python 客户端,但已不再由官方主推。

核心建议:

  • 如果你正在使用或计划升级到 RocketMQ 5.x,请使用 rocketmq-python
  • 如果你仍在使用 RocketMQ 4.x,或者项目依赖比较老,请使用 python-rocketmq

rocketmq-python (官方,适用于 RocketMQ 5.x)

这是官方推荐的、与 RocketMQ 5.x 消息队列配套的客户端。

安装

pip install rocketmq

核心概念

  • Producer (生产者): 发送消息到 Broker。
  • Consumer (消费者): 从 Broker 消费消息。
  • Message (消息): 要发送的数据载体。
  • Topic (主题): 消息的逻辑分类,生产者发送到特定主题,消费者订阅特定主题。
  • Tag (标签): 主题的进一步细分,用于在消费端进行更精确的消息过滤。

示例代码

这个库提供了非常现代的异步 API,非常适合与 Python 的 asyncio 生态结合使用。

准备工作 确保你已经启动了 RocketMQ 的 NameServer 和 Broker。

RocketMQ Python包怎么用?-图2
(图片来源网络,侵删)

发送消息 (Producer)

import asyncio
from rocketmq.client import Producer, Message
async def send_message():
    # 1. 创建一个 Producer
    # 参数是 GroupName,用于标识一组 Producer
    producer = Producer("python_producer_group")
    # 2. 设置 NameServer 地址
    producer.set_name_server_address("127.0.0.1:9876")
    # 3. 启动 Producer
    await producer.start()
    try:
        # 4. 创建一条消息
        # 参数分别是 Topic, Body (消息内容), Tag (可选)
        msg = Message("python-topic", "Hello, RocketMQ 5 from python!".encode("utf-8"), tag="tagA")
        # 5. 发送消息
        # send_sync 是同步发送,会阻塞直到收到响应
        result = await producer.send_sync(msg)
        print(f"Send message result: {result.status}, {result.msg_id}")
    finally:
        # 6. 关闭 Producer
        await producer.shutdown()
# 运行异步函数
asyncio.run(send_message())

消费消息 (Consumer)

import asyncio
from rocketmq.client import PushConsumer
async def consume_message():
    # 1. 创建一个 PushConsumer
    # 参数是 GroupName,用于标识一组 Consumer
    consumer = PushConsumer("python_consumer_group")
    # 2. 设置 NameServer 地址
    consumer.set_name_server_address("127.0.0.1:9876")
    # 3. 订阅 Topic 和 Tag
    # "*" 表示订阅该主题下的所有 Tag
    consumer.subscribe("python-topic", "*")
    # 4. 注册消息监听器
    # 这是一个异步回调函数,当有消息到达时会被调用
    async def listener(msg):
        print(f"Received message: {msg.body.decode('utf-8')}")
        print(f"Message ID: {msg.msg_id}, Topic: {msg.topic}, Tag: {msg.tag}")
        # 消息消费成功后,必须调用 `ack` 来确认
        # 如果不调用,消息可能会被重新投递
        await msg.ack()
        return # 返回 None 表示消费成功
    consumer.register_message_listener(listener)
    # 5. 启动 Consumer
    await consumer.start()
    print("Consumer started, waiting for messages...")
    # 为了保持消费者运行,我们让主程序休眠
    # 在实际应用中,这通常是你的主事件循环
    await asyncio.sleep(30) # 运行30秒
    # 6. 关闭 Consumer
    await consumer.shutdown()
# 运行异步函数
asyncio.run(consume_message())

python-rocketmq (社区,适用于 RocketMQ 4.x)

这是一个成熟的社区库,基于 TCP 协议,提供了同步和异步的 API。

安装

pip install python-rocketmq

核心概念

  • Producer (生产者): 发送消息。
  • PushConsumer / PullConsumer (消费者):
    • PushConsumer: 由客户端长连接 Broker,Broker 将消息主动推送给消费者,使用更简单。
    • PullConsumer: 由消费者主动向 Broker 拉取消息,更灵活,但需要自己管理拉取逻辑。

示例代码

准备工作 确保你已经启动了 RocketMQ 的 NameServer 和 Broker。

RocketMQ Python包怎么用?-图3
(图片来源网络,侵删)

发送消息 (Producer)

import time
from rocketmq.client import Producer, Message
def send_message():
    # 1. 创建一个 Producer
    # 参数是 GroupName
    producer = Producer("python_producer_group_v4")
    # 2. 设置 NameServer 地址
    producer.set_namesrv_addr("127.0.0.1:9876")
    # 3. 启动 Producer
    producer.start()
    try:
        # 4. 发送多条消息
        for i in range(5):
            msg = Message("python-topic-v4")
            msg.set_body(f"Hello RocketMQ 4 from Python, message {i}".encode('utf-8'))
            # 同步发送
            result = producer.send_sync(msg)
            print(f"Send message {i} result: {result.status}, {result.msg_id}")
            time.sleep(1) # 间隔1秒
    finally:
        # 5. 关闭 Producer
        producer.shutdown()
if __name__ == '__main__':
    send_message()

消费消息 (PushConsumer)

from rocketmq.client import PushConsumer
def consume_message():
    # 1. 创建一个 PushConsumer
    # 参数是 GroupName
    consumer = PushConsumer("python_consumer_group_v4")
    # 2. 设置 NameServer 地址
    consumer.set_namesrv_addr("127.0.0.1:9876")
    # 3. 注册消息监听器
    # 这是一个同步回调函数
    def listener(msg):
        print(f"Received message: {msg.body.decode('utf-8')}")
        print(f"Message ID: {msg.msg_id}, Topic: {msg.topic}")
        # 消息消费成功,返回 CONSUME_SUCCESS
        # 如果消费失败,可以返回 RECONSUME_LATER,稍后会重试
        return "CONSUME_SUCCESS"
    # 4. 订阅 Topic 和 Tag
    consumer.subscribe("python-topic-v4", "*", listener)
    # 5. 启动 Consumer
    consumer.start()
    print("Consumer started, waiting for messages...")
    # 为了保持消费者运行,我们让主程序休眠
    # 在实际应用中,这通常是你的主程序循环
    time.sleep(30) # 运行30秒
    # 6. 关闭 Consumer
    consumer.shutdown()
if __name__ == '__main__':
    consume_message()

总结与对比

特性 rocketmq-python (官方) python-rocketmq (社区)
适用版本 RocketMQ 5.x RocketMQ 4.x
协议 gRPC / Protobuf TCP / 自定义二进制协议
API 风格 异步 (asyncio) 同步 & 异步 (线程模型)
官方支持 主推,官方维护 社区维护,已不再主推
依赖 grpcio, protobuf enum34 (Python < 3.4)
现代化程度 高,符合云原生趋势 较低,但稳定可靠
推荐度 强烈推荐用于新项目 仅用于维护旧项目或 4.x 环境

如何选择?

  • 新项目、新系统:毫不犹豫地选择 rocketmq-python,它与 RocketMQ 5.x 的云原生特性(如消息轨迹、客户端重试等)结合更紧密,API 设计也更符合现代 Python 开发范式。
  • 维护旧项目:如果你的项目已经基于 python-rocketmq 并且运行在 RocketMQ 4.x 上,继续使用它是没有问题的,因为它非常稳定,但如果未来要升级到 RocketMQ 5.x,就必须迁移到 rocketmq-python
  • 学习与测试:建议从 rocketmq-python 开始学习,因为它代表了未来的方向,其异步 API 能让你更好地体验 Python 生态的优势。
分享:
扫描分享到社交APP
上一篇
下一篇