KafkaTemplate is required to send the messages as a Producer.
KafkaTemplate must be Auto wired. here Integer, String is the key and value which we are planning to send using kafka template.
@Autowired KafkaTemplate<Integer,String> kafkaTemplate;
Make sure you are using KafkaTemplate autowiring within spring scope, which means your class must be annotated with @configuration/@component/@service.
Sample Kafka Producer Configuration in Java 8 Spring Boot:
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.ngdeveloper.domain.Coupon; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @Component @Slf4j public class CouponsProducer { @Autowired KafkaTemplate<Integer,String> kafkaTemplate; @Autowired ObjectMapper objectMapper; public void sendCoupons(Coupon coupon) throws JsonProcessingException { Integer key = coupon.getStoreId(); String value = objectMapper.writeValueAsString(coupon); ListenableFuture<SendResult<Integer,String>> listenableFuture = kafkaTemplate.sendDefault(key,value); listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable ex) { handleFailure(key, value, ex); } @Override public void onSuccess(SendResult<Integer, String> result) { handleSuccess(key, value, result); } }); } private void handleFailure(Integer key, String value, Throwable ex) { log.error("Error while Sending the Message and the exception is {}", ex.getMessage()); try { throw ex; } catch (Throwable throwable) { log.error("Error in OnFailure: {}", throwable.getMessage()); } } private void handleSuccess(Integer key, String value, SendResult<Integer, String> result) { log.info("Message Sent SuccessFully for the key : {} and the value is {} , partition is {}", key, value, result.getRecordMetadata().partition()); } }
Kafka as producer in Spring Boot – Things to Note
1.kafkaTemplate provides multiple overloaded methods for sending the messages like,
- send(topic, key, data)
- send(topic, partition, key, data)
- sendDefault(key,value)
- sendOffsetsToTransaction(offsets) etc.
2. Before starting your spring boot project to test your kafka producer configurations and sending messages you need to make sure that you have already started the zookeeper, kafka broker(s) and the topic must be already created, you can also auto create it if does not exist with the below snippet, but still that’s not advicable.
@Configuration @Profile("local") public class AutoCreateConfig { @Bean public NewTopic libraryEvents(){ return TopicBuilder.name("coupon-topic") .partitions(3) .replicas(3) .build(); } }
3. Your application.yml configuration should be similar to this,
- bootstrap-servers: your kafka servers (default kafka port is 9092), but you can create multiple kafka brokers by simply replicating the server.properties files with broker-id, log file path and port name changes. Creating multiple instances/multiple brokers in the kafka is posted in detail here.
- Added producer configuration mainly the list of kafka brokers and the key and value type.
- Added admin configuration with kafka running hosts only. We need kafka admin to send the messages.
spring: profiles: local kafka: template: default-topic: coupon-topic producer: bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 key-serializer: org.apache.kafka.common.serialization.IntegerSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer admin: properties: bootstrap.servers: localhost:9092,localhost:9093,localhost:9094
Using Maven project ? then equivalent to the above configuration looks like this in application.properties
spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9093, localhost:9094 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.admin.properties.bootstrap.servers=localhost:9092, localhost:9093, localhost:9094
Answer is simple: Use ObjectMapper and convert your coupon object/bean to the string and in the consumer side they use objectMapper read(reverse) to convert from string to the actual object.
@Autowired ObjectMapper objectMapper; String value = objectMapper.writeValueAsString(coupon);