Spring Boot Kafka Producer Maven Project

Spring-Boot-Kafka-Producer-Maven-Project-featured

In this post I will share you the spring boot producer maven project and help you to understand how to create the kafka producer in spring boot with Maven.

Spring-Boot-Kafka-Producer-Maven-Project-blog

Pre-requisite

Kafka should be installed (Refer this post for the step by step guidelines on how to install the Kafka in windows and Mac).

Good if you already know how to send and receive the messages in the command prompt as kafka producer and kafka consumer.

This post I am going to create multiple kafka brokers, so knowing about the multiple instances / kafka brokers setup in the machine will help you to be in sync with this post.

We also posted the list of mostly used kafka commands here to get familiar with the kafka in the command prompt.

Create your project with the help of Spring Initializr

Now Click GENERATE to download the base project with the selected dependencies.

Then Extract the downloaded ZIP and import as Maven project in Eclipse/STS/IntelliJ

Please note: Import may take some time, as it needs to download all the dependencies to your machine from the respective repositories.

About the project

  • We are going to create REST API Endpoint which takes order object (orderId and orderAmount) as input (request body).
  • From controller, we invoke our kafka producer then post the message(same order object as string) to the kafka consumer.
  • Will keep our kafka consumer running on command prompt and we will check the message posted from the POSTMan is received in the kafka consumer command prompt or not.

Project Structure

Spring boot Kafka Producer Example – Github Project Link:

GitHub - Git and GitHub have paved the way for better web development.
Download spring boot kafka producer example project from github.

Starting as Spring Boot Application

Make sure your zookeeper and three instances of kafka broker is running, if you are not sure how to run the zookeeper and kafka multiple brokers then refer these links,

You can just keep only one kafka broker if you need like this in your way application.properties,

application.properties with multiple kafka broker instances

spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9093, localhost:9094
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.admin.properties.bootstrap.servers=localhost:9092, localhost:9093, localhost:9094

application.properties with single kafka broker

Now you can just run only this command to keep your kafka broker up (after zookeeper up only you should run, because kafka broker needs zookeeper).

Command to start your kafka broker
kafka-server-start.bat ..\..\config\server.properties
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.admin.properties.bootstrap.servers=localhost:9092

Make sure you have the logs like this,

You must be able to see the kafka related stuffs during the server start up.

Not able to see kafka related stuffs in the logs ? Then kafka is not configured in your project properly.

Make sure your kafka configuration are done correctly, still not working try to do maven build then start the server.

I have started with multiple kafka instances, so you can see the bootstrap.servers are printed with three instances.

2020-07-21 11:22:43.767  INFO 128640 --- [           main] .n.n.NgdeveloperKafkaProducerApplication : Started NgdeveloperKafkaProducerApplication in 1.233 seconds (JVM running for 1.811)
2020-07-21 11:22:47.580  INFO 128640 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-07-21 11:22:47.580  INFO 128640 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-07-21 11:22:47.584  INFO 128640 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 4 ms
2020-07-21 11:22:47.702  INFO 128640 --- [nio-8080-exec-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092, localhost:9093, localhost:9094]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2020-07-21 11:22:47.714  INFO 128640 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-07-21 11:22:47.715  INFO 128640 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-07-21 11:22:47.715  INFO 128640 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1595310767714
2020-07-21 11:22:47.722  INFO 128640 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: vzZVxCuZQkqmTb2V5hDP0A

Testing Kafka producer in Spring Boot

You can use either Curl command or Postman to hit your REST end point.

Request body

{"orderId":null,
"orderAmount":200,
"orderAddress":"Delhi"
}
{"orderId":1,
"orderAmount":200,
"orderAddress":"Delhi"
}

After hitting the API, you will be able to see the below response in the log,

2020-07-21 11:22:47.793  INFO 128640 --- [ad | producer-1] c.n.n.producer.OrderKafkaProducer        : Message Sent SuccessFully. key : 1 and the value is {"orderId":1,"orderAmount":400,"orderAddress":"Delhi"} , partition used is 0

Receiving the posted data in kafka consumer command prompt

Open your command prompt and run this for kafka consumer command:

Here ngdev-topic is the topic name. The same only we have created and used in the kafka producer spring boot project.

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic ngdev-topic --property "key.separator=:" --property "print.key=true"

Leave a Reply