Of course! Here is a comprehensive guide on how to use the kafkaproducer in Python, covering everything from basic setup to more advanced configurations.

Prerequisites
Before you start, make sure you have the following:
- Python 3: A recent version of Python 3 is recommended.
- Kafka Broker: A running Kafka instance. You can run one locally using Docker or download it from the official Apache Kafka website.
- A Topic: A Kafka topic to send messages to. You can create one using the
kafka-topics.shscript:# From your Kafka bin directory ./kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Installation
The most popular and recommended library for interacting with Kafka in Python is confluent-kafka. It's a high-performance, C-based client library with a Python wrapper.
Install it using pip:
pip install confluent-kafka
Basic Producer Example
This is the simplest way to send a single message to a Kafka topic.

from confluent_kafka import Producer
# 1. Configure the producer
# 'bootstrap.servers' is a comma-separated list of Kafka broker addresses.
conf = {
'bootstrap.servers': 'localhost:9092'
}
# 2. Create a Producer instance
producer = Producer(conf)
# 3. Define the topic and the message value
topic = 'test-topic'
message_value = b'Hello, Kafka! This is my first message.'
# 4. Produce the message
# The `produce()` method sends the message to the broker asynchronously.
# It returns immediately without waiting for an acknowledgment.
producer.produce(topic, value=message_value)
# 5. Wait for any outstanding messages to be delivered and delivery reports to be received.
# This is a crucial step to ensure messages are not lost.
producer.flush()
print("Message sent successfully!")
Explanation:
conf: A dictionary with configuration properties. The most important one isbootstrap.servers, which tells the producer where to find the Kafka cluster.Producer(conf): Initializes the producer.producer.produce(topic, value=message): This is the core method.topic: The name of the topic to send the message to.value: The actual message payload. It must be a byte string (bytes). We useb'...'to create a byte string. If you have a string, you need to encode it:message.encode('utf-8').key(optional): You can also provide a key for the message. Messages with the same key are guaranteed to go to the same partition (useful for ordering).
producer.flush(): This is a blocking call that waits for all messages in the producer's internal buffer to be sent to the broker and for acknowledgments to be received. Always callflush()before your program exits to avoid losing messages.
Handling Delivery Reports (Acknowledgments)
By default, produce() is asynchronous. To know if your message was successfully delivered or if it failed, you need to set a delivery_callback function.
This is the recommended way to produce messages reliably.
from confluent_kafka import Producer, KafkaException
import random
# 1. Configure the producer
conf = {
'bootstrap.servers': 'localhost:9092',
# 'acks': 'all' # Wait for the full ISR (In-Sync Replicas) to acknowledge. Highest durability.
}
# 2. Create a Producer instance
producer = Producer(conf)
# 3. Define the delivery callback function
def delivery_callback(err, msg):
"""
Called once for each message to indicate delivery result.
Triggered by poll() or flush().
"""
if err is not None:
print(f'ERROR: Message delivery failed: {err}')
else:
# The message was successfully delivered
print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')
# 4. Produce messages in a loop
topic = 'test-topic'
for i in range(10):
# Create a unique key for each message to ensure they go to the same partition
message_key = f'key-{i}'.encode('utf-8')
message_value = f'Message number {i}'.encode('utf-8')
# Produce the message and assign the callback
producer.produce(topic, key=message_key, value=message_value, callback=delivery_callback)
# The poll() method is used to handle delivery reports and other events.
# It should be called regularly, even if you don't expect any events.
# It's non-blocking and processes any callbacks that have been triggered.
producer.poll(0) # A timeout of 0 means "return immediately"
# 5. Wait for all messages to be delivered
producer.flush()
print("All messages sent and processed.")
Explanation of delivery_callback:

- It's a function that takes two arguments:
err(an error object) andmsg(theMessageobject). - If
errisNone, the message was delivered successfully. - If
erris notNone, an error occurred (e.g., network issue, broker not available). - The
msgobject contains useful metadata like the topic, partition, and offset of the successfully delivered message.
The Role of poll(0):
- When you produce a message with a callback, the callback isn't executed immediately.
producer.poll(timeout)is a non-blocking method that checks for any events (like delivery reports) and executes the corresponding callbacks.- Calling
poll(0)tells the producer to check for events and return immediately. This is a common pattern to ensure callbacks are processed without blocking your application.
Producer Configuration and Best Practices
The conf dictionary is where you control the producer's behavior. Here are some important configurations:
| Configuration Key | Description | Common Values |
|---|---|---|
acks |
The number of acknowledgments the producer requires the leader to have received before considering a request complete. | 0 (No acknowledgment, fastest, least reliable), 1 (Leader only), all (All in-sync replicas, safest, slowest). |
retries |
How many times the producer will retry sending a message if it fails. | Integer, e.g., 2147483647 (retry indefinitely). |
linger.ms |
The time the producer will wait to allow batch messages to be sent together. | Integer (milliseconds). A higher value can improve throughput by creating larger batches. |
batch.size |
The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. | Integer (bytes). Larger batches improve throughput but use more memory. |
compression.type |
The compression algorithm used for the messages sent to the broker. | none, gzip, snappy, lz4, zstd. Compression saves network bandwidth. |
Example with more configurations:
conf = {
'bootstrap.servers': 'localhost:9092',
'acks': 'all', # Wait for all replicas to acknowledge
'retries': 2147483647, # Retry indefinitely
'linger.ms': 5, # Wait up to 5ms to batch messages
'compression.type': 'lz4', # Compress messages
'batch.size': 16384, # 16KB batch size
'partitioner': 'murmur2_random' # Use Murmur2 hash for partitioning (default)
}
Producing JSON Messages
It's very common to send structured data like JSON. The process is simple: convert your Python dictionary to a JSON string and then encode it to bytes.
import json
from confluent_kafka import Producer
conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(conf)
user_data = {
"user_id": 123,
"username": "johndoe",
"email": "john.doe@example.com",
"is_active": True
}
# Convert dictionary to JSON string, then encode to bytes
json_message = json.dumps(user_data).encode('utf-8')
producer.produce('user-events', value=json_message, callback=lambda err, msg: print(f"Delivered JSON message: {msg.value()}") if not err else print(f"Delivery failed: {err}"))
producer.poll(1) # Wait a bit for the callback
producer.flush()
Summary
- Install:
pip install confluent-kafka - Configure: Create a
confdictionary withbootstrap.servers. - Create Producer:
producer = Producer(conf). - Produce Message: `producer.produce(topic, value=b'my message', key=b'm
