KafkaTemplate in Spring Boot

Kafka-template-Spring-Boot-featured

KafkaTemplate is required to send the messages as a Producer.

KafkaTemplate must be Auto wired. here Integer, String is the key and value which we are planning to send using kafka template.

Kafka-template-Spring-Boot-blog
@Autowired
KafkaTemplate<Integer,String> kafkaTemplate;

Make sure you are using KafkaTemplate autowiring within spring scope, which means your class must be annotated with @configuration/@component/@service.

Sample Kafka Producer Configuration in Java 8 Spring Boot:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ngdeveloper.domain.Coupon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
@Slf4j
public class CouponsProducer {

    @Autowired
    KafkaTemplate<Integer,String> kafkaTemplate;

    @Autowired
    ObjectMapper objectMapper;

    public void sendCoupons(Coupon coupon) throws JsonProcessingException {

        Integer key = coupon.getStoreId();
        String value = objectMapper.writeValueAsString(coupon);

        ListenableFuture<SendResult<Integer,String>> listenableFuture =  kafkaTemplate.sendDefault(key,value);
        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                handleFailure(key, value, ex);
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                handleSuccess(key, value, result);
            }
        });

    }

    private void handleFailure(Integer key, String value, Throwable ex) {
        log.error("Error while Sending the Message and the exception is {}", ex.getMessage());
        try {
            throw ex;
        } catch (Throwable throwable) {
            log.error("Error in OnFailure: {}", throwable.getMessage());
        }


    }

    private void handleSuccess(Integer key, String value, SendResult<Integer, String> result) {
        log.info("Message Sent SuccessFully for the key : {} and the value is {} , partition is {}", key, value, result.getRecordMetadata().partition());
    }
}

Kafka as producer in Spring Boot – Things to Note

1.kafkaTemplate provides multiple overloaded methods for sending the messages like,

  • send(topic, key, data)
  • send(topic, partition, key, data)
  • sendDefault(key,value)
  • sendOffsetsToTransaction(offsets) etc.

2. Before starting your spring boot project to test your kafka producer configurations and sending messages you need to make sure that you have already started the zookeeper, kafka broker(s) and the topic must be already created, you can also auto create it if does not exist with the below snippet, but still that’s not advicable.

@Configuration
@Profile("local")
public class AutoCreateConfig {

    @Bean
    public NewTopic libraryEvents(){
        return TopicBuilder.name("coupon-topic")
                .partitions(3)
                .replicas(3)
                .build();
    }
}

3. Your application.yml configuration should be similar to this,

  • bootstrap-servers: your kafka servers (default kafka port is 9092), but you can create multiple kafka brokers by simply replicating the server.properties files with broker-id, log file path and port name changes. Creating multiple instances/multiple brokers in the kafka is posted in detail here.
  • Added producer configuration mainly the list of kafka brokers and the key and value type.
  • Added admin configuration with kafka running hosts only. We need kafka admin to send the messages.
spring:
  profiles: local
  kafka:
    template:
      default-topic: coupon-topic
    producer:
      bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    admin:
      properties:
        bootstrap.servers: localhost:9092,localhost:9093,localhost:9094

Using Maven project ? then equivalent to the above configuration looks like this in application.properties

spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9093, localhost:9094
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.admin.properties.bootstrap.servers=localhost:9092, localhost:9093, localhost:9094

You may get the question, that how to send my custom object as value ?

Answer is simple: Use ObjectMapper and convert your coupon object/bean to the string and in the consumer side they use objectMapper read(reverse) to convert from string to the actual object.

    @Autowired
    ObjectMapper objectMapper;
    String value = objectMapper.writeValueAsString(coupon);

One comment

Leave a Reply