Site icon NgDeveloper

KafkaTemplate in Spring Boot

Kafka-template-Spring-Boot-featured

KafkaTemplate is required to send the messages as a Producer.

KafkaTemplate must be Auto wired. here Integer, String is the key and value which we are planning to send using kafka template.

@Autowired
KafkaTemplate<Integer,String> kafkaTemplate;

Make sure you are using KafkaTemplate autowiring within spring scope, which means your class must be annotated with @configuration/@component/@service.

Sample Kafka Producer Configuration in Java 8 Spring Boot:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ngdeveloper.domain.Coupon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
@Slf4j
public class CouponsProducer {

    @Autowired
    KafkaTemplate<Integer,String> kafkaTemplate;

    @Autowired
    ObjectMapper objectMapper;

    public void sendCoupons(Coupon coupon) throws JsonProcessingException {

        Integer key = coupon.getStoreId();
        String value = objectMapper.writeValueAsString(coupon);

        ListenableFuture<SendResult<Integer,String>> listenableFuture =  kafkaTemplate.sendDefault(key,value);
        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                handleFailure(key, value, ex);
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                handleSuccess(key, value, result);
            }
        });

    }

    private void handleFailure(Integer key, String value, Throwable ex) {
        log.error("Error while Sending the Message and the exception is {}", ex.getMessage());
        try {
            throw ex;
        } catch (Throwable throwable) {
            log.error("Error in OnFailure: {}", throwable.getMessage());
        }


    }

    private void handleSuccess(Integer key, String value, SendResult<Integer, String> result) {
        log.info("Message Sent SuccessFully for the key : {} and the value is {} , partition is {}", key, value, result.getRecordMetadata().partition());
    }
}

Kafka as producer in Spring Boot – Things to Note

1.kafkaTemplate provides multiple overloaded methods for sending the messages like,

2. Before starting your spring boot project to test your kafka producer configurations and sending messages you need to make sure that you have already started the zookeeper, kafka broker(s) and the topic must be already created, you can also auto create it if does not exist with the below snippet, but still that’s not advicable.

@Configuration
@Profile("local")
public class AutoCreateConfig {

    @Bean
    public NewTopic libraryEvents(){
        return TopicBuilder.name("coupon-topic")
                .partitions(3)
                .replicas(3)
                .build();
    }
}

3. Your application.yml configuration should be similar to this,

spring:
  profiles: local
  kafka:
    template:
      default-topic: coupon-topic
    producer:
      bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    admin:
      properties:
        bootstrap.servers: localhost:9092,localhost:9093,localhost:9094

Using Maven project ? then equivalent to the above configuration looks like this in application.properties

spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9093, localhost:9094
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.admin.properties.bootstrap.servers=localhost:9092, localhost:9093, localhost:9094

You may get the question, that how to send my custom object as value ?

Answer is simple: Use ObjectMapper and convert your coupon object/bean to the string and in the consumer side they use objectMapper read(reverse) to convert from string to the actual object.

    @Autowired
    ObjectMapper objectMapper;
    String value = objectMapper.writeValueAsString(coupon);

Exit mobile version