杰瑞科技汇

Python消息队列如何选型与高效使用?

Of course! Let's dive into Python message queues. This is a fundamental concept for building scalable, decoupled, and robust applications.

Python消息队列如何选型与高效使用?-图1
(图片来源网络,侵删)

What is a Message Queue?

Imagine a post office. Instead of sending a letter directly to a friend, you drop it in a mailbox. A postal worker then picks up the mail and delivers it to your friend's address.

In software:

  • Producer (Sender): The application that creates and sends messages (like writing and dropping a letter).
  • Consumer (Receiver): The application that receives and processes messages (like your friend getting the mail).
  • Message Queue (The "Mailbox"): A broker or service that stores messages from producers and delivers them to consumers.

This decoupling is the primary benefit. The producer doesn't need to know if the consumer is online, busy, or even exists. It just sends the message to the queue, confident that it will be processed eventually.


Why Use a Message Queue?

  1. Decoupling: The producer and consumer are independent. You can update one without affecting the other.
  2. Asynchronous Processing: A web app can respond instantly to a user by sending a long-running task (like processing a video or sending an email) to a queue to be handled later.
  3. Load Leveling / Buffering: If a sudden flood of requests arrives, the queue acts as a buffer. Consumers can process messages at their own pace, preventing the system from being overwhelmed.
  4. Reliability & Durability: Messages can be stored persistently. If a consumer crashes, the message remains in the queue and can be processed by another consumer when it comes back online.
  5. Scalability: If message processing becomes a bottleneck, you can simply add more consumers to share the workload.

Popular Python Message Queue Libraries

The choice of library often depends on the underlying message broker technology you choose.

Python消息队列如何选型与高效使用?-图2
(图片来源网络,侵删)
Library Broker Technology Key Characteristics Best For
pika RabbitMQ The official, pure-Python client. Very flexible but can be verbose. Direct, low-level interaction with RabbitMQ. Full control over RabbitMQ, custom topologies.
kombu RabbitMQ, Redis, Amazon SQS A high-level messaging library for Python. It's the foundation of Celery and provides a unified API for different brokers. Building systems that might need to switch brokers, or using Celery.
celery RabbitMQ, Redis, Amazon SQS A task queue built on top of kombu. It's not just a message queue; it's a complete distributed task execution framework. Background task processing, periodic jobs (cron-like), distributed computing.
redis-py Redis The official Python client for Redis. While Redis is a key-value store, its Lists and Streams data structures are excellent for simple queuing. Simple queuing, caching, session storage, and pub/sub in one application.
aiormq RabbitMQ An asynchronous version of pika, built for asyncio. Offers much higher performance for I/O-bound tasks. High-throughput, asynchronous applications using async/await.
hiredis Redis A performance-focused, C-based parser for the redis-py client. Applications where maximum Redis performance is critical.

Practical Examples

Let's look at simple examples with two of the most common combinations: RabbitMQ with pika and Redis with redis-py.

Example 1: RabbitMQ with pika

First, you need to install RabbitMQ server on your machine or use a Docker container.

Install the library:

pip install pika

The Producer (Sender): This script will connect to RabbitMQ and send a message to a queue named task_queue.

Python消息队列如何选型与高效使用?-图3
(图片来源网络,侵删)
# producer.py
import pika
import time
# --- Connection Setup ---
# The 'connection' object manages the connection to the RabbitMQ server.
# 'localhost' means the server is on your own machine.
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# --- Declare a Queue ---
# It's good practice to declare the queue in both the producer and consumer.
# This ensures the queue exists before we try to send messages to it.
# 'durable=True' means the queue will survive a broker restart.
channel.queue_declare(queue='task_queue', durable=True)
# --- Send Messages ---
message = "Hello World! Message number %s"
for i in range(10):
    # 'body' is the actual message content.
    # 'properties=pika.BasicProperties(...)' makes the message persistent.
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message % i,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(f" [x] Sent '{message % i}'")
    time.sleep(0.5) # Simulate some work
# --- Clean Up ---
connection.close()

The Consumer (Receiver): This script will connect to RabbitMQ, listen for messages on the task_queue, and process them.

# consumer.py
import pika
import time
# --- Connection Setup ---
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# --- Declare the Queue (same as producer) ---
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
# --- Define the Callback Function ---
# This function will be executed whenever a message is received.
def callback(ch, method, properties, body):
    """Called on pika when a message is received from RabbitMQ."""
    print(f" [x] Received {body.decode()}")
    # Simulate a time-consuming task
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    # Acknowledge the message. This tells RabbitMQ that the message has been
    # successfully processed and can be removed from the queue.
    ch.basic_ack(delivery_tag=method.delivery_tag)
# --- Start Consuming ---
# 'no_ack=False' means we need to manually acknowledge messages.
channel.basic_consume(queue='task_queue', on_message_callback=callback, no_ack=False)
# --- Start Listening ---
channel.start_consuming()

How to Run:

  1. Start the RabbitMQ server.
  2. Open two terminal windows.
  3. In the first terminal, run the consumer: python consumer.py
  4. In the second terminal, run the producer: python producer.py
  5. You will see the consumer printing messages as they arrive.

Example 2: Redis with redis-py

This is much simpler and great for basic queuing needs.

Install the library:

pip install redis

You also need a Redis server running.

The Producer (Sender):

# redis_producer.py
import redis
import time
# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)
queue_name = 'my_simple_queue'
# Send 10 messages to the queue
for i in range(10):
    message = f"Task {i}"
    # LPUSH adds an item to the head of the list.
    # Using RPUSH would add it to the tail.
    r.lpush(queue_name, message)
    print(f"Sent: {message}")
    time.sleep(0.2)

The Consumer (Receiver):

# redis_consumer.py
import redis
import time
# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)
queue_name = 'my_simple_queue'
print(' [*] Waiting for messages...')
while True:
    # BRPOP (Blocking Right Pop) waits for an item on the right (tail) of the list.
    # It blocks until an item is available or the timeout is reached.
    # The timeout of 0 means it will block indefinitely.
    message_bytes = r.brpop(queue_name, timeout=0)
    if message_bytes:
        # message_bytes is a tuple: (b'queue_name', b'message_content')
        message = message_bytes[1].decode('utf-8')
        print(f"Received: {message}")
        # Simulate processing
        time.sleep(1)
        print("Processing complete.")

How to Run:

  1. Start the Redis server.
  2. Open two terminal windows.
  3. In the first terminal, run the consumer: python redis_consumer.py
  4. In the second terminal, run the producer: python redis_producer.py
  5. You will see the consumer printing messages as they arrive.

Which One Should I Choose?

  • For simple background tasks in a single application: Redis is fantastic. It's fast, easy to set up, and you might already be using it for caching.
  • For complex, distributed systems with guaranteed delivery and advanced routing: **R
分享:
扫描分享到社交APP
上一篇
下一篇