Site icon NgDeveloper

Kafka Asynchronous & Synchrous Spring Boot Producer Example

kafka-asynchronous-synchronous-spring-boot-producer-example-featured

Kafka by template behaves the asynchrous way and does not wait for the response and returns the ListenableFuture<SendResult<key, value>>.

But sometimes we may need to respond to our API based on the kafka producer actual response.

When to use Asynchronous Kafka producer ?

Asynchronous kafka producer is required/meaningful in below use cases,

Kafka Asynchronous Producer Example code

	@Autowired
	KafkaTemplate<Integer, String> kafkaTemplate;

	@Autowired
	ObjectMapper objectMapper;

	public void sendOrderToConsumers(Order order) throws JsonProcessingException {

		Integer key = order.getOrderId();
		String value = objectMapper.writeValueAsString(order);

		ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send("ngdev-topic", key, value);
		listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
			@Override
			public void onFailure(Throwable ex) {
				handleFailure(key, value, ex);
			}

			@Override
			public void onSuccess(SendResult<Integer, String> result) {
				handleSuccess(key, value, result);
			}
		});

	}

When to use synchronous Kafka producer ?

Synchronous kafka producer is required in below use cases,

Kafka Synchronous Producer Example code

	@Autowired
	KafkaTemplate<Integer, String> kafkaTemplate;

	@Autowired
	ObjectMapper objectMapper;

	public SendResult<Integer, String> sendOrderToConsumers(Order order) throws JsonProcessingException, ExecutionException, InterruptedException {
		Integer key = order.getOrderId();
		String value = objectMapper.writeValueAsString(order);
                SendResult<Integer, String> sendResult = null;
                try{
                     sendResult = kafkaTemplate.send("ngdev-topic", key, value).get();
                } catch(ExecutionException | InterruptedException e){
                      log.error("Execution/Interrupted Exception and the details is {}", e.getMessage());
                      throw e;
                } catch(Exception e){
                      log.error("Execution sending the message and the details are {}", e.getMessage());
                      throw e;
                }
                 return sendResult;
	}
Exit mobile version