Apache Kafka has become the backbone of modern event-driven applications, and at the heart of consuming data lies the Kafka Consumer. If you are just starting out and want to learn how to build a Kafka consumer in Java, this tutorial will guide you step-by-step. We’ll cover polling records, Kafka offset management (auto commit vs. manual commit), and consumer groups for scalability.

By the end of this post, you’ll have a working Java Kafka consumer and a clear understanding of how it works in real-world applications.
Table of Contents
What is a Kafka Consumer?
A Kafka Consumer is a client application that subscribes to Kafka topics and reads messages from them. Unlike a producer that sends data, the consumer continuously polls records from Kafka brokers. Key responsibilities of a Kafka consumer include:
- Polling data from assigned partitions.
- Keeping track of consumed messages using offsets.
- Coordinating with other consumers in a consumer group for load balancing.
Setting Up Kafka Consumer in Java
To get started, you need to add Kafka client dependencies to your Maven pom.xml
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.1</version>
</dependency>
Kafka Consumer Java Tutorial – Basic Example
Let’s create a simple Kafka Consumer Java example that reads string messages from a topic named demo-topic
. This code creates a consumer group, polls messages, and automatically commits offsets.
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "demo-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "true"); // auto commit enabled
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("demo-topic"));
System.out.println("Starting Kafka consumer...");
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close();
}
}
}
Kafka Offset Management (Auto Commit vs Manual Commit)
One of the most important topics in the Kafka consumer Java tutorial is offset management.
- Auto Commit (default)
- Kafka automatically commits offsets periodically (controlled by
auto.commit.interval.ms
). - Simple, but risky: if your consumer crashes before processing is done, you may lose messages.
- Kafka automatically commits offsets periodically (controlled by
- Manual Commit
- You control when to commit offsets.
- Safer for applications that must guarantee processing before marking a message as “done.”
Example: Manual Offset Commit
Use manual commit when you want fine-grained control and stronger guarantees.
props.put("enable.auto.commit", "false"); // disable auto commit
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("demo-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Processing record: value=%s, offset=%d%n", record.value(), record.offset());
}
consumer.commitSync(); // commit offsets manually
}
} finally {
consumer.close();
}
Java Kafka Consumer Groups for Scalability
A consumer group is a collection of consumers that work together to read data from topics.
- Each consumer in a group gets assigned a subset of partitions.
- Kafka ensures load balancing: no two consumers in the same group read the same partition.
- If a new consumer joins, partitions are rebalanced.
Example:
If a topic has 6 partitions and a consumer group has 3 consumers, each consumer will read from 2 partitions.
Kafka Consumer Example: Consuming Custom Serializer Object
In the previous blog, we created a producer that published Order
objects using a custom serializer. Now, let’s build a Kafka Consumer that can correctly deserialize these objects and read them from the topic.
Order.java (Model)
package com.javacodepoint.kafka.consumer;
import java.io.Serializable;
public class Order implements Serializable {
private String orderId;
private String product;
private double amount;
public Order() {}
public Order(String orderId, String product, double amount) {
this.orderId = orderId;
this.product = product;
this.amount = amount;
}
// Getters and Setters
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getProduct() { return product; }
public void setProduct(String product) { this.product = product; }
public double getAmount() { return amount; }
public void setAmount(double amount) { this.amount = amount; }
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", product='" + product + '\'' +
", amount=" + amount +
'}';
}
}
CustomOrderDeserializer.java
package com.javacodepoint.kafka.consumer;
import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.serialization.Deserializer;
public class CustomOrderDeserializer implements Deserializer<Order> {
@Override
public Order deserialize(String topic, byte[] data) {
if (data == null || data.length == 0) return null;
try {
String value = new String(data, StandardCharsets.UTF_8);
String[] parts = value.split(",");
if (parts.length != 3) {
throw new IllegalArgumentException("Invalid data format for Order: " + value);
}
String orderId = parts[0];
String product = parts[1];
double amount = Double.parseDouble(parts[2]);
return new Order(orderId, product, amount);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize Order", e);
}
}
}
OrderConsumerExample.java
package com.javacodepoint.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class OrderConsumerExample {
public static void main(String[] args) {
String topicName = "custom-topic";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomOrderDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // start from earliest messages
try (KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topicName));
System.out.println("Consuming Order messages from topic: " + topicName);
while (true) {
ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Order> record : records) {
System.out.println("Consumed -> Key: " + record.key() + ", Value: " + record.value());
}
}
}
}
}
OUTPUT:
Consuming Order messages from topic: custom-topic
Consumed -> Key: O123, Value: Order{orderId=O123, product=’Laptop’, amount=1200.5}
Kafka Consumer Best Practices
When writing a Java Kafka consumer group application, keep these best practices in mind:
- Use manual commit for critical data pipelines.
- Tune
max.poll.records
to control batch size. - Set
session.timeout.ms
andheartbeat.interval.ms
properly for stable group management. - Gracefully shut down consumers to commit offsets before exiting.
- Monitor lag to ensure consumers keep up with producers.
Common Troubleshooting Tips
Even beginners often face common issues when running a Kafka consumer Java example.
- Confusing Zookeeper with Kafka bootstrap servers
- Always use
bootstrap.servers=localhost:9092
instead of Zookeeper connection.
- Always use
- Consumer is not receiving messages
- Make sure the producer sent messages to the same topic.
- Ensure ConsumerConfig.GROUP_ID_CONFIG (
group.id
) is correctly set.
- Rebalancing too frequently
- Adjust heartbeat and session timeout values.
Conclusion
In this Kafka consumer Java tutorial, we built a working consumer that polls messages, manages offsets, and uses consumer groups for scalability. You now understand the difference between Kafka auto commit vs manual commit, how offsets ensure reliability, and how consumer groups balance workload.
From here, the next step is to integrate error handling, multi-threaded consumers, and explore Kafka Streams for advanced use cases.
Want to revisit the lessons or explore more?
Whether you want to review a specific topic or go through the full tutorial again, everything is structured to help you master Apache Kafka step by step.