Kafka by template behaves the asynchrous way and does not wait for the response and returns the ListenableFuture<SendResult<key, value>>.
But sometimes we may need to respond to our API based on the kafka producer actual response.
Table of Contents
When to use Asynchronous Kafka producer ?
Asynchronous kafka producer is required/meaningful in below use cases,
- Sending notifications like email/sms
- Writing the log files etc.
Kafka Asynchronous Producer Example code
- I have included the important snippet for the asynchronous call only, If you are looking for the complete class and methods then refer this post and this github link.
- Here ListenableFuture is the return type of the Asynchronous kafka call.
- Once this line get executed, then one more new thread will take over the control, so the old thread will just continue to run without waiting for the new thread result.
- kafkaTemplate.send(“ngdev-topic”, key, value);
- So in our case, irrepective of the actual response from the kafka send method, it will just proceed and continue with the next line executions.
@Autowired KafkaTemplate<Integer, String> kafkaTemplate; @Autowired ObjectMapper objectMapper; public void sendOrderToConsumers(Order order) throws JsonProcessingException { Integer key = order.getOrderId(); String value = objectMapper.writeValueAsString(order); ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send("ngdev-topic", key, value); listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable ex) { handleFailure(key, value, ex); } @Override public void onSuccess(SendResult<Integer, String> result) { handleSuccess(key, value, result); } }); }
When to use synchronous Kafka producer ?
Synchronous kafka producer is required in below use cases,
- Whenever we need to responsdTo get the response from the third party system and proceed based on th
- Writing the log files etc.
Kafka Synchronous Producer Example code
- send(“ngdev-topic”, key, value).get() -> .get() method makes the send method from Asynchronous to synchronous so that everything runs on the same thread.
- Still asynchronous thread gets invoked on the kafka producer, but still the response of the kafka producer get merged with the old existing thread and executes the stuff.
- We can call the .send() method with timeout to avoid getting hang for the response.
@Autowired KafkaTemplate<Integer, String> kafkaTemplate; @Autowired ObjectMapper objectMapper; public SendResult<Integer, String> sendOrderToConsumers(Order order) throws JsonProcessingException, ExecutionException, InterruptedException { Integer key = order.getOrderId(); String value = objectMapper.writeValueAsString(order); SendResult<Integer, String> sendResult = null; try{ sendResult = kafkaTemplate.send("ngdev-topic", key, value).get(); } catch(ExecutionException | InterruptedException e){ log.error("Execution/Interrupted Exception and the details is {}", e.getMessage()); throw e; } catch(Exception e){ log.error("Execution sending the message and the details are {}", e.getMessage()); throw e; } return sendResult; }