Kafka Producer Java Tutorial: Send Messages with Custom Serializers and Retry Logic

If you are a Java developer exploring Apache Kafka, one of the first things you will build is a Kafka Producer. The producer is responsible for publishing data (messages) to Kafka topics. In this Kafka Producer Java tutorial, we’ll walk you through the step-by-step process of building a producer, sending JSON and custom objects, handling retries, and implementing best practices.

Kafka Producer Java Tutorial: Send Messages with Custom Serializers and Retry Logic

By the end of this guide, you’ll be able to:

  • Write a Kafka Producer Java example to send string, JSON, and custom messages.
  • Implement a Kafka custom serializer in Java.
  • Configure retry logic and error handling in a Kafka producer.
  • Follow Kafka producer best practices for reliability and performance.

Setup Instructions for Java Kafka Producer

Before we start coding, make sure you have:

  • Apache Kafka is installed locally (if not, see here).
  • Java 8 or above is installed (if not, see here).
  • Maven project setup (if not, see here).

Maven Dependency

Add the following dependencies to your pom.xml:

Maven Dependencies
Java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class SimpleStringProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 1; i <= 5; i++) {
            String message = "Hello Kafka " + i;
            producer.send(new ProducerRecord<>("demo-topic", message));
            System.out.println("Sent: " + message);
        }

        producer.close();
    }
}

Sending JSON Messages with Kafka Producer

For real-world applications, JSON is a common data format. Let’s send a User object as JSON.

User Model

Java
public class User {
    private String id;
    private String name;
    private int age;

    public User(String id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    // getters and setters
}

JSON Serializer

JsonSerializer.java
package com.javacodepoint.kafka.producer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;

public class JsonSerializer<T> implements Serializer<T> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public byte[] serialize(String topic, T data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new RuntimeException("Error serializing JSON", e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
         // nothing to configure
    }
    @Override
    public void close() {
        // nothing to close
    }
}

JSON Producer Example

This program sends a JSON message to Kafka.

Java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class JsonProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());

        KafkaProducer<String, User> producer = new KafkaProducer<>(props);

        User user = new User("1", "Alice");
        producer.send(new ProducerRecord<>("user-topic", user));
        System.out.println("Sent JSON User: " + user.getName());

        producer.close();
    }
}

Kafka Producer with JSON Serializer (complete program)

This example shows how to send structured data to Kafka by converting Java objects into JSON messages using a custom JSON serializer. An User object is serialized with Jackson and sent to a Kafka topic, making it easy to integrate Kafka with applications or microservices that rely on JSON-based communication.

JsonSerializer.java (use the same above code)
JsonProducerExample.java

JsonProducerExample.java
package com.javacodepoint.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

// Define a simple User class for JSON serialization
class User {
    private String id;
    private String name;
    private int age;

    public User(String id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    // Getters required for serialization
    public String getId() { return id; }
    public String getName() { return name; }
    public int getAge() { return age; }
}

public class JsonProducerExample {
    public static void main(String[] args) {
        // Kafka configuration
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());

        KafkaProducer<String, User> producer = new KafkaProducer<>(props);

        try {
            User user = new User("U1", "Alice", 25);

            ProducerRecord<String, User> record =
                    new ProducerRecord<>("json-topic", user.getId(), user);

            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("Sent JSON message to topic: " + metadata.topic()
                            + " partition: " + metadata.partition()
                            + " offset: " + metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });

        } finally {
            producer.flush();
            producer.close();
        }
    }
}

OUTPUT:
Sent JSON message to topic: json-topic partition: 0 offset: 0

Kafka Producer with Custom Serializer

Sometimes JSON is not enough; you might need a Kafka custom serializer in Java.

Order Model

Java
// Custom object
class Order {
    private String orderId;
    private String product;
    private double amount;

    public Order(String orderId, String product, double amount) {
        this.orderId = orderId;
        this.product = product;
        this.amount = amount;
    }

    // getters and setters
}

Example: Custom Object Serializer

CustomOrderSerializer.java
package com.javacodepoint.kafka.producer;

import org.apache.kafka.common.serialization.Serializer;

//Custom order serializer
public class CustomOrderSerializer implements Serializer<Order> {
 @Override
 public byte[] serialize(String topic, Order order) {
     if (order == null) return null;
     return (order.getOrderId() + "," + order.getProduct() + "," + order.getAmount()).getBytes();
 }
}

Using Custom Serializer

This will store messages as orderId,product,amount format.

Java
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomOrderSerializer.class.getName());
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
Order order = new Order("O123", "Laptop", 1200.50);
ProducerRecord<String, Order> record = new ProducerRecord<>("custom-topic", order.getOrderId(), order);
producer.send(record);

Kafka Producer Retry Logic and Error Handling

Kafka is distributed, so failures can occur. Let’s configure retries and error handling.

Retry Configuration

Java
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200);
  • ProducerConfig.RETRIES_CONFIG (retries): Number of retry attempts.
  • ProducerConfig.ACKS_CONFIG (acks=all): Ensures strong durability (acknowledgement from all replicas).
  • ProducerConfig.RETRY_BACKOFF_MS_CONFIG (retry.backoff.ms): Time between retries.

Error Handling Example

Using callbacks ensures you log and handle producer errors.

Java
producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("Sent Order message to topic: " + metadata.topic()
                            + " partition: " + metadata.partition()
                            + " offset: " + metadata.offset());
                } else {
                    System.err.println("Error sending order: " + exception.getMessage());
                    exception.printStackTrace();
                }
            });

Custom Object Producer with Retry & Error Handling (complete program)

This example demonstrates how to implement a custom serializer for a Order object and enhance the producer with retry logic, acknowledgments, and error handling. It ensures reliable message delivery and is suitable for building robust Kafka producers that handle transient failures gracefully.

CustomOrderSerializer.java (use the same above code)
CustomObjectProducerWithRetry.java

CustomObjectProducerWithRetry.java
package com.javacodepoint.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

// Custom object
class Order {
    private String orderId;
    private String product;
    private double amount;

    public Order(String orderId, String product, double amount) {
        this.orderId = orderId;
        this.product = product;
        this.amount = amount;
    }

    public String getOrderId() { return orderId; }
    public String getProduct() { return product; }
    public double getAmount() { return amount; }
}

public class CustomObjectProducerWithRetry {
    public static void main(String[] args) {
        // Kafka configuration with retry logic
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomOrderSerializer.class.getName());
        
        // Retry and error handling configs
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 5); // retry 5 times
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200); // wait between retries
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2 mins timeout

        KafkaProducer<String, Order> producer = new KafkaProducer<>(props);

        try {
            Order order = new Order("O123", "Laptop", 1200.50);

            ProducerRecord<String, Order> record =
                    new ProducerRecord<>("custom-topic", order.getOrderId(), order);

            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("Sent Order message to topic: " + metadata.topic()
                            + " partition: " + metadata.partition()
                            + " offset: " + metadata.offset());
                } else {
                    System.err.println("Error sending order: " + exception.getMessage());
                    exception.printStackTrace();
                }
            });

        } finally {
            producer.flush();
            producer.close();
        }
    }
}

OUTPUT:
Sent Order message to topic: custom-topic partition: 0 offset: 1

Kafka Producer Best Practices

To make your producer production-ready, follow these Kafka producer best practices:

  1. Use Idempotent Producer props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); This avoids duplicate messages.
  2. Batch Messages
    Configure batch size to improve throughput: props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
  3. Monitor Producer Errors
    Always use callbacks for error handling.
  4. Choose a Proper Serializer
    • Use StringSerializer for plain text.
    • Use JsonSerializer for structured data.
    • Write a custom serializer for domain-specific objects.
  5. Handle Retries Gracefully
    Don’t retry indefinitely; implement backoff strategies.

Conclusion

In this Kafka Producer Java tutorial, we explored:

  • A basic Apache Kafka Java example for sending strings.
  • Sending JSON messages with a custom JsonSerializer.
  • Writing a Kafka custom serializer in Java for object serialization.
  • Implementing the Kafka retry mechanism in Java with proper configurations.
  • Following Kafka producer best practices for reliable applications.

Now that you’ve mastered the producer side, the next step is to explore the Kafka Consumer API in Java to read these messages.


Want to revisit the lessons or explore more?

Return to the Apache Kafka Tutorial Home Page

Whether you want to review a specific topic or go through the full tutorial again, everything is structured to help you master Apache Kafka step by step.

Share with friends

Leave a Comment

Your email address will not be published. Required fields are marked *