Of course! Here is a comprehensive guide to using the kafka-python library to create a Kafka Consumer in Python. This guide covers everything from basic setup to advanced topics like consumer groups and error handling.

Table of Contents
- Prerequisites: What you need before you start.
- Installation: How to install the
kafka-pythonlibrary. - Basic Consumer: A simple script to consume messages from a single topic.
- Consumer Groups: The core concept for scalable and fault-tolerant consumption.
- Advanced Configuration: Key settings you should know.
- Error Handling & Robustness: How to handle failures and restarts gracefully.
- Full Example: A complete, runnable script with comments.
- Alternative Libraries: A brief look at
confluent-kafka.
Prerequisites
Before you write any Python code, you need:
- A running Kafka instance: You can run one locally using Docker or download it from the Apache Kafka website.
- A topic to consume from: Your Kafka broker should have a topic created. For example, a topic named
test-topic. - Some messages in the topic: A producer should be sending messages to
test-topicso your consumer has something to read.
Installation
The most popular and widely used library for interacting with Kafka from Python is kafka-python. You can install it using pip.
pip install kafka-python
Basic Consumer: Polling for Messages
The simplest way to consume messages is to poll the Kafka broker in a loop. This consumer will read messages from a specific topic and partition.
This example assumes you have a Kafka broker running on localhost:9092.

from kafka import KafkaConsumer
import json
# --- Basic Consumer Example ---
# Define the topic and bootstrap server
KAFKA_BROKER = 'localhost:9092'
TOPIC_NAME = 'test-topic'
# Create a consumer instance
# The `auto_offset_reset='earliest'` ensures that if there's no committed offset,
# it will read from the beginning of the partition.
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=KAFKA_BROKER,
auto_offset_reset='earliest', # or 'latest'
enable_auto_commit=True, # Automatically commit offsets after processing
value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Deserialize JSON messages
)
print(f"Consumer started. Listening for messages on topic '{TOPIC_NAME}'...")
# The consumer loop
try:
while True:
# The consumer.poll() method returns a dictionary of {TopicPartition: [messages]}
# It blocks until a message is available or the timeout is reached.
# A timeout of 1 second is a good practice to allow for graceful shutdown.
batch = consumer.poll(timeout_ms=1000)
if not batch:
# No messages received in this poll
continue
for topic_partition, messages in batch.items():
print(f"Received messages from partition {topic_partition.partition}:")
for message in messages:
# message.value contains the actual message payload
print(f" -> Message: {message.value}")
# Here you would add your business logic to process the message
# e.g., save to a database, call an API, etc.
except KeyboardInterrupt:
print("Consumer interrupted. Closing...")
finally:
# Close the consumer to commit offsets and release resources
consumer.close()
print("Consumer closed.")
Explanation:
KafkaConsumer(...): Initializes the consumer.bootstrap_servers: A list of host/port pairs for the Kafka brokers.auto_offset_reset='earliest': If the consumer has no stored offset, it will start from the beginning of the log. Use'latest'to only get new messages.enable_auto_commit=True: This tells the consumer to automatically send an offset commit request to the broker afterpoll()returns a message. This is simple but can lead to message loss if your processing logic fails after consuming but before the commit.value_deserializer: A function to decode the message value. Here, we decode from bytes to a UTF-8 string and then parse it as JSON.consumer.poll(): This is the heart of the consumer. It fetches messages from the broker. It's a blocking call.KeyboardInterrupt: A try/except block is used to allow the user to stop the consumer gracefully withCtrl+C.consumer.close(): This is crucial. It ensures that any pending offset commits are sent and the consumer's connection to the broker is closed.
Consumer Groups: Scalability and Fault Tolerance
For real-world applications, you should use consumer groups. A consumer group allows you to have multiple consumers reading from the same topic in parallel. Kafka will automatically balance the partitions among the consumers in the group.
- Partition Assignment: If a topic has 4 partitions and you have 2 consumers in the same group, each consumer will be assigned 2 partitions.
- Fault Tolerance: If a consumer crashes, Kafka will reassign its partitions to the remaining healthy consumers in the group.
- Offset Management: Kafka stores the offset of the last message each consumer in a group has processed. This ensures that even if a consumer restarts, it can pick up where it left off.
To use a consumer group, simply assign a group_id:
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=KAFKA_BROKER,
group_id='my-python-consumer-group', # <-- This is the key for consumer groups
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
Now, if you run two scripts with this exact same group_id, they will automatically share the partitions of test-topic.

Advanced Configuration
Controlling when offsets are committed is critical for data integrity.
Manual Offset Committing
For stronger guarantees (at-least-once or exactly-once semantics), you should disable auto-commit and commit the offset manually after you have successfully processed the message.
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=KAFKA_BROKER,
group_id='manual-commit-group',
auto_offset_reset='earliest',
enable_auto_commit=False, # <-- Disable auto-commit
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
try:
# --- Your message processing logic here ---
# e.g., process(message.value)
# If this step fails, an exception will be thrown.
# ----------------------------------------
# If processing is successful, manually commit the offset
# This commits the offset for the message that was just processed.
consumer.commit()
print(f"Processed and committed message: {message.value}")
except Exception as e:
print(f"Failed to process message: {message.value}. Error: {e}")
# Do not commit the offset, so the message will be re-consumed
# on the next poll.
Fetching Specific Partitions
Sometimes you need to consume from a specific partition.
# Specify the partition to consume from
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=KAFKA_BROKER,
partition_assignment_strategy=[PartitionAssignor.RoundRobinAssignor], # Or another strategy
# ... other configs
)
# Manually assign a partition (this overrides the consumer group logic)
# consumer.assign([TopicPartition(TOPIC_NAME, 0)])
Error Handling & Robustness
A robust consumer should handle various exceptions:
CommitFailedError: The offset commit failed. This can happen if the consumer group is rebalancing or the consumer is no longer a group member.NoBrokersAvailable: The consumer cannot connect to any Kafka broker.ConsumerStoppedError: The consumer has been closed and is no longer able to poll.
from kafka.errors import KafkaError, CommitFailedError
try:
for message in consumer:
try:
# Process message
print(f"Processing: {message.value}")
# Manual commit
consumer.commit()
except CommitFailedError:
print("Offset commit failed. Will retry on next message.")
except Exception as e:
print(f"Error processing message: {e}")
except KafkaError as e:
print(f"A Kafka error occurred: {e}")
finally:
consumer.close()
Full Example (Robust Consumer with Group)
Here is a complete, well-commented example of a robust consumer that uses a consumer group and manual commits.
import time
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable, CommitFailedError, KafkaConnectionError
import json
# --- Configuration ---
KAFKA_BROKER = 'localhost:9092'
TOPIC_NAME = 'test-topic'
GROUP_ID = 'robust-python-group'
CONSUMER_TIMEOUT_MS = 1000 # How long to wait for messages in poll()
def process_message(message_value):
""" 