Kafka Consumer in Java: Polling, Offset Management, and Consumer Groups

Apache Kafka has become the backbone of modern event-driven applications, and at the heart of consuming data lies the Kafka Consumer. If you are just starting out and want to learn how to build a Kafka consumer in Java, this tutorial will guide you step-by-step. We’ll cover polling records, Kafka offset management (auto commit vs. manual commit), and consumer groups for scalability.

Kafka Consumer in Java Polling, Offset Management, and Consumer Groups Explained

By the end of this post, you’ll have a working Java Kafka consumer and a clear understanding of how it works in real-world applications.

What is a Kafka Consumer?

A Kafka Consumer is a client application that subscribes to Kafka topics and reads messages from them. Unlike a producer that sends data, the consumer continuously polls records from Kafka brokers. Key responsibilities of a Kafka consumer include:

  • Polling data from assigned partitions.
  • Keeping track of consumed messages using offsets.
  • Coordinating with other consumers in a consumer group for load balancing.

Setting Up Kafka Consumer in Java

To get started, you need to add Kafka client dependencies to your Maven pom.xml:

Maven Dependency
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.9.1</version>
</dependency>

Kafka Consumer Java Tutorial – Basic Example

Let’s create a simple Kafka Consumer Java example that reads string messages from a topic named demo-topic. This code creates a consumer group, polls messages, and automatically commits offsets.

Java
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "demo-consumer-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "true"); // auto commit enabled

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("demo-topic"));

        System.out.println("Starting Kafka consumer...");

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
                            record.key(), record.value(), record.partition(), record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Kafka Offset Management (Auto Commit vs Manual Commit)

One of the most important topics in the Kafka consumer Java tutorial is offset management.

  • Auto Commit (default)
    • Kafka automatically commits offsets periodically (controlled by auto.commit.interval.ms).
    • Simple, but risky: if your consumer crashes before processing is done, you may lose messages.
  • Manual Commit
    • You control when to commit offsets.
    • Safer for applications that must guarantee processing before marking a message as “done.”

Example: Manual Offset Commit

Use manual commit when you want fine-grained control and stronger guarantees.

Java
props.put("enable.auto.commit", "false"); // disable auto commit

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("demo-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Processing record: value=%s, offset=%d%n", record.value(), record.offset());
        }
        consumer.commitSync(); // commit offsets manually
    }
} finally {
    consumer.close();
}

Java Kafka Consumer Groups for Scalability

A consumer group is a collection of consumers that work together to read data from topics.

  • Each consumer in a group gets assigned a subset of partitions.
  • Kafka ensures load balancing: no two consumers in the same group read the same partition.
  • If a new consumer joins, partitions are rebalanced.

Example:
If a topic has 6 partitions and a consumer group has 3 consumers, each consumer will read from 2 partitions.

Kafka Consumer Example: Consuming Custom Serializer Object

In the previous blog, we created a producer that published Order objects using a custom serializer. Now, let’s build a Kafka Consumer that can correctly deserialize these objects and read them from the topic.

Order.java (Model)

Java
package com.javacodepoint.kafka.consumer;

import java.io.Serializable;

public class Order implements Serializable {
    private String orderId;
    private String product;
    private double amount;

    public Order() {}

    public Order(String orderId, String product, double amount) {
        this.orderId = orderId;
        this.product = product;
        this.amount = amount;
    }

    // Getters and Setters
    public String getOrderId() { return orderId; }
    public void setOrderId(String orderId) { this.orderId = orderId; }

    public String getProduct() { return product; }
    public void setProduct(String product) { this.product = product; }

    public double getAmount() { return amount; }
    public void setAmount(double amount) { this.amount = amount; }

    @Override
    public String toString() {
        return "Order{" +
                "orderId=" + orderId +
                ", product='" + product + '\'' +
                ", amount=" + amount +
                '}';
    }
}

CustomOrderDeserializer.java

Java
package com.javacodepoint.kafka.consumer;

import java.nio.charset.StandardCharsets;

import org.apache.kafka.common.serialization.Deserializer;

public class CustomOrderDeserializer implements Deserializer<Order> {

    @Override
    public Order deserialize(String topic, byte[] data) {
        if (data == null || data.length == 0) return null;

        try {
            String value = new String(data, StandardCharsets.UTF_8);
            String[] parts = value.split(",");

            if (parts.length != 3) {
                throw new IllegalArgumentException("Invalid data format for Order: " + value);
            }

            String orderId = parts[0];
            String product = parts[1];
            double amount = Double.parseDouble(parts[2]);

            return new Order(orderId, product, amount);
        } catch (Exception e) {
            throw new RuntimeException("Failed to deserialize Order", e);
        }
    }
}

OrderConsumerExample.java

Java
package com.javacodepoint.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class OrderConsumerExample {
    public static void main(String[] args) {
        String topicName = "custom-topic";

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomOrderDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // start from earliest messages

        try (KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(topicName));

            System.out.println("Consuming Order messages from topic: " + topicName);

            while (true) {
                ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, Order> record : records) {
                    System.out.println("Consumed -> Key: " + record.key() + ", Value: " + record.value());
                }
            }
        }
    }
}

OUTPUT:
Consuming Order messages from topic: custom-topic
Consumed -> Key: O123, Value: Order{orderId=O123, product=’Laptop’, amount=1200.5}

Kafka Consumer Best Practices

When writing a Java Kafka consumer group application, keep these best practices in mind:

  1. Use manual commit for critical data pipelines.
  2. Tune max.poll.records to control batch size.
  3. Set session.timeout.ms and heartbeat.interval.ms properly for stable group management.
  4. Gracefully shut down consumers to commit offsets before exiting.
  5. Monitor lag to ensure consumers keep up with producers.

Common Troubleshooting Tips

Even beginners often face common issues when running a Kafka consumer Java example.

  • Confusing Zookeeper with Kafka bootstrap servers
    • Always use bootstrap.servers=localhost:9092 instead of Zookeeper connection.
  • Consumer is not receiving messages
    • Make sure the producer sent messages to the same topic.
    • Ensure ConsumerConfig.GROUP_ID_CONFIG (group.id) is correctly set.
  • Rebalancing too frequently
    • Adjust heartbeat and session timeout values.

Conclusion

In this Kafka consumer Java tutorial, we built a working consumer that polls messages, manages offsets, and uses consumer groups for scalability. You now understand the difference between Kafka auto commit vs manual commit, how offsets ensure reliability, and how consumer groups balance workload.

From here, the next step is to integrate error handling, multi-threaded consumers, and explore Kafka Streams for advanced use cases.


Want to revisit the lessons or explore more?

Return to the Apache Kafka Tutorial Home Page

Whether you want to review a specific topic or go through the full tutorial again, everything is structured to help you master Apache Kafka step by step.

Share with friends

Leave a Comment

Your email address will not be published. Required fields are marked *