In this post I will share you the spring boot producer maven project and help you to understand how to create the kafka producer in spring boot with Maven.
Table of Contents
Pre-requisite
Kafka should be installed (Refer this post for the step by step guidelines on how to install the Kafka in windows and Mac).
Good if you already know how to send and receive the messages in the command prompt as kafka producer and kafka consumer.
This post I am going to create multiple kafka brokers, so knowing about the multiple instances / kafka brokers setup in the machine will help you to be in sync with this post.
We also posted the list of mostly used kafka commands here to get familiar with the kafka in the command prompt.
Create your project with the help of Spring Initializr
Please note: Import may take some time, as it needs to download all the dependencies to your machine from the respective repositories.
About the project
- We are going to create REST API Endpoint which takes order object (orderId and orderAmount) as input (request body).
- From controller, we invoke our kafka producer then post the message(same order object as string) to the kafka consumer.
- Will keep our kafka consumer running on command prompt and we will check the message posted from the POSTMan is received in the kafka consumer command prompt or not.
Project Structure
Spring boot Kafka Producer Example – Github Project Link:
Starting as Spring Boot Application
Make sure your zookeeper and three instances of kafka broker is running, if you are not sure how to run the zookeeper and kafka multiple brokers then refer these links,
You can just keep only one kafka broker if you need like this in your way application.properties,
application.properties with multiple kafka broker instances
spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9093, localhost:9094 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.admin.properties.bootstrap.servers=localhost:9092, localhost:9093, localhost:9094
application.properties with single kafka broker
Now you can just run only this command to keep your kafka broker up (after zookeeper up only you should run, because kafka broker needs zookeeper).
Command to start your kafka broker
kafka-server-start.bat ..\..\config\server.properties
spring.kafka.producer.bootstrap-servers=localhost:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.admin.properties.bootstrap.servers=localhost:9092
Make sure you have the logs like this,
You must be able to see the kafka related stuffs during the server start up.
I have started with multiple kafka instances, so you can see the bootstrap.servers are printed with three instances.
2020-07-21 11:22:43.767 INFO 128640 --- [ main] .n.n.NgdeveloperKafkaProducerApplication : Started NgdeveloperKafkaProducerApplication in 1.233 seconds (JVM running for 1.811) 2020-07-21 11:22:47.580 INFO 128640 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2020-07-21 11:22:47.580 INFO 128640 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2020-07-21 11:22:47.584 INFO 128640 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Completed initialization in 4 ms 2020-07-21 11:22:47.702 INFO 128640 --- [nio-8080-exec-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [localhost:9092, localhost:9093, localhost:9094] buffer.memory = 33554432 client.dns.lookup = default client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2020-07-21 11:22:47.714 INFO 128640 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0 2020-07-21 11:22:47.715 INFO 128640 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84 2020-07-21 11:22:47.715 INFO 128640 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1595310767714 2020-07-21 11:22:47.722 INFO 128640 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: vzZVxCuZQkqmTb2V5hDP0A
Testing Kafka producer in Spring Boot
You can use either Curl command or Postman to hit your REST end point.
Request body
{"orderId":null, "orderAmount":200, "orderAddress":"Delhi" }
{"orderId":1, "orderAmount":200, "orderAddress":"Delhi" }
After hitting the API, you will be able to see the below response in the log,
2020-07-21 11:22:47.793 INFO 128640 --- [ad | producer-1] c.n.n.producer.OrderKafkaProducer : Message Sent SuccessFully. key : 1 and the value is {"orderId":1,"orderAmount":400,"orderAddress":"Delhi"} , partition used is 0
Receiving the posted data in kafka consumer command prompt
Open your command prompt and run this for kafka consumer command:
Here ngdev-topic is the topic name. The same only we have created and used in the kafka producer spring boot project.
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic ngdev-topic --property "key.separator=:" --property "print.key=true"