If you are a Java developer exploring Apache Kafka, one of the first things you will build is a Kafka Producer. The producer is responsible for publishing data (messages) to Kafka topics. In this Kafka Producer Java tutorial, we’ll walk you through the step-by-step process of building a producer, sending JSON and custom objects, handling retries, and implementing best practices.

By the end of this guide, you’ll be able to:
- Write a Kafka Producer Java example to send string, JSON, and custom messages.
- Implement a Kafka custom serializer in Java.
- Configure retry logic and error handling in a Kafka producer.
- Follow Kafka producer best practices for reliability and performance.
Table of Contents
Setup Instructions for Java Kafka Producer
Before we start coding, make sure you have:
- Apache Kafka is installed locally (if not, see here).
- Java 8 or above is installed (if not, see here).
- Maven project setup (if not, see here).
Maven Dependency
Add the following dependencies to your pom.xml
:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleStringProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 1; i <= 5; i++) {
String message = "Hello Kafka " + i;
producer.send(new ProducerRecord<>("demo-topic", message));
System.out.println("Sent: " + message);
}
producer.close();
}
}
Sending JSON Messages with Kafka Producer
For real-world applications, JSON is a common data format. Let’s send a User object as JSON.
User Model
public class User {
private String id;
private String name;
private int age;
public User(String id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
// getters and setters
}
JSON Serializer
package com.javacodepoint.kafka.producer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, T data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException("Error serializing JSON", e);
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to configure
}
@Override
public void close() {
// nothing to close
}
}
JSON Producer Example
This program sends a JSON message to Kafka.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class JsonProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
KafkaProducer<String, User> producer = new KafkaProducer<>(props);
User user = new User("1", "Alice");
producer.send(new ProducerRecord<>("user-topic", user));
System.out.println("Sent JSON User: " + user.getName());
producer.close();
}
}
Kafka Producer with JSON Serializer (complete program)
This example shows how to send structured data to Kafka by converting Java objects into JSON messages using a custom JSON serializer. An User
object is serialized with Jackson and sent to a Kafka topic, making it easy to integrate Kafka with applications or microservices that rely on JSON-based communication.
JsonSerializer.java (use the same above code)
JsonProducerExample.java
package com.javacodepoint.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
// Define a simple User class for JSON serialization
class User {
private String id;
private String name;
private int age;
public User(String id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
// Getters required for serialization
public String getId() { return id; }
public String getName() { return name; }
public int getAge() { return age; }
}
public class JsonProducerExample {
public static void main(String[] args) {
// Kafka configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
KafkaProducer<String, User> producer = new KafkaProducer<>(props);
try {
User user = new User("U1", "Alice", 25);
ProducerRecord<String, User> record =
new ProducerRecord<>("json-topic", user.getId(), user);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent JSON message to topic: " + metadata.topic()
+ " partition: " + metadata.partition()
+ " offset: " + metadata.offset());
} else {
exception.printStackTrace();
}
});
} finally {
producer.flush();
producer.close();
}
}
}
OUTPUT:
Sent JSON message to topic: json-topic partition: 0 offset: 0
Kafka Producer with Custom Serializer
Sometimes JSON is not enough; you might need a Kafka custom serializer in Java.
Order Model
// Custom object
class Order {
private String orderId;
private String product;
private double amount;
public Order(String orderId, String product, double amount) {
this.orderId = orderId;
this.product = product;
this.amount = amount;
}
// getters and setters
}
Example: Custom Object Serializer
package com.javacodepoint.kafka.producer;
import org.apache.kafka.common.serialization.Serializer;
//Custom order serializer
public class CustomOrderSerializer implements Serializer<Order> {
@Override
public byte[] serialize(String topic, Order order) {
if (order == null) return null;
return (order.getOrderId() + "," + order.getProduct() + "," + order.getAmount()).getBytes();
}
}
Using Custom Serializer
This will store messages as orderId,product,amount
format.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomOrderSerializer.class.getName());
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
Order order = new Order("O123", "Laptop", 1200.50);
ProducerRecord<String, Order> record = new ProducerRecord<>("custom-topic", order.getOrderId(), order);
producer.send(record);
Kafka Producer Retry Logic and Error Handling
Kafka is distributed, so failures can occur. Let’s configure retries and error handling.
Retry Configuration
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200);
- ProducerConfig.RETRIES_CONFIG (
retries
): Number of retry attempts. - ProducerConfig.ACKS_CONFIG (
acks=all
): Ensures strong durability (acknowledgement from all replicas). - ProducerConfig.RETRY_BACKOFF_MS_CONFIG (
retry.backoff.ms
): Time between retries.
Error Handling Example
Using callbacks ensures you log and handle producer errors.
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent Order message to topic: " + metadata.topic()
+ " partition: " + metadata.partition()
+ " offset: " + metadata.offset());
} else {
System.err.println("Error sending order: " + exception.getMessage());
exception.printStackTrace();
}
});
Custom Object Producer with Retry & Error Handling (complete program)
This example demonstrates how to implement a custom serializer for a Order
object and enhance the producer with retry logic, acknowledgments, and error handling. It ensures reliable message delivery and is suitable for building robust Kafka producers that handle transient failures gracefully.
CustomOrderSerializer.java (use the same above code)
CustomObjectProducerWithRetry.java
package com.javacodepoint.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
// Custom object
class Order {
private String orderId;
private String product;
private double amount;
public Order(String orderId, String product, double amount) {
this.orderId = orderId;
this.product = product;
this.amount = amount;
}
public String getOrderId() { return orderId; }
public String getProduct() { return product; }
public double getAmount() { return amount; }
}
public class CustomObjectProducerWithRetry {
public static void main(String[] args) {
// Kafka configuration with retry logic
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomOrderSerializer.class.getName());
// Retry and error handling configs
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 5); // retry 5 times
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200); // wait between retries
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2 mins timeout
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
try {
Order order = new Order("O123", "Laptop", 1200.50);
ProducerRecord<String, Order> record =
new ProducerRecord<>("custom-topic", order.getOrderId(), order);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent Order message to topic: " + metadata.topic()
+ " partition: " + metadata.partition()
+ " offset: " + metadata.offset());
} else {
System.err.println("Error sending order: " + exception.getMessage());
exception.printStackTrace();
}
});
} finally {
producer.flush();
producer.close();
}
}
}
OUTPUT:
Sent Order message to topic: custom-topic partition: 0 offset: 1
Kafka Producer Best Practices
To make your producer production-ready, follow these Kafka producer best practices:
- Use Idempotent Producer
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
This avoids duplicate messages. - Batch Messages
Configure batch size to improve throughput:props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
- Monitor Producer Errors
Always use callbacks for error handling. - Choose a Proper Serializer
- Use StringSerializer for plain text.
- Use JsonSerializer for structured data.
- Write a custom serializer for domain-specific objects.
- Handle Retries Gracefully
Don’t retry indefinitely; implement backoff strategies.
Conclusion
In this Kafka Producer Java tutorial, we explored:
- A basic Apache Kafka Java example for sending strings.
- Sending JSON messages with a custom JsonSerializer.
- Writing a Kafka custom serializer in Java for object serialization.
- Implementing the Kafka retry mechanism in Java with proper configurations.
- Following Kafka producer best practices for reliable applications.
Now that you’ve mastered the producer side, the next step is to explore the Kafka Consumer API in Java to read these messages.
Want to revisit the lessons or explore more?
Return to the Apache Kafka Tutorial Home PageWhether you want to review a specific topic or go through the full tutorial again, everything is structured to help you master Apache Kafka step by step.