Spring boot + rabbitmq with retry + gradle integration
While integrating RabbitMQ with gradle, we oftne feel need to retry messages on receiver side. For example, Receiver got a message and there is some issue while processing it. What will you do ? It's often a good idea to requeue the message and then try after some time. Ofcourse , this retry can not go on indefinitely, that's why we need a limit too while retrying. With Springboot, it's just a matter for setting few properties.
Setup RabbitMQ
1docker run -d -p 5672:5672 -p 8081:15671 -p 8082:15672 -p 5671:5671 --hostname my-rabbit --name my-rabbit rabbitmq:3-management
Sender
Sender application is responsible for sending message to RabbitMq
.
build.gradle
1plugins {
2 id 'org.springframework.boot' version '2.6.3'
3 id 'io.spring.dependency-management' version '1.0.11.RELEASE'
4 id 'java'
5}
6
7group = 'com.poc'
8version = '0.0.1-SNAPSHOT'
9sourceCompatibility = '11'
10
11repositories {
12 mavenCentral()
13}
14
15dependencies {
16 implementation 'org.springframework.boot:spring-boot-starter-actuator'
17 implementation 'org.springframework.boot:spring-boot-starter-amqp'
18 implementation 'org.springframework.boot:spring-boot-starter-web'
19 testImplementation 'org.springframework.boot:spring-boot-starter-test'
20 testImplementation 'org.springframework.amqp:spring-rabbit-test'
21}
22
23tasks.named('test') {
24 useJUnitPlatform()
25}
application.properties
Here you can set certain properties related to connection. In case you need to make a ssl
connection, mainly for production,
don't forget to set spring.rabbitmq.ssl.enabled=true
. Here , as we are only connecting with local, so ignoring ssl props.
1server.port=8085
2spring.rabbitmq.host=localhost
3spring.rabbitmq.port=5672
4spring.rabbitmq.username=guest
5spring.rabbitmq.password=guest
6#logging.level.root=DEBUG
7spring.main.allow-bean-definition-overriding=true
8sample.rabbitmq.queue=message-queue
9sample.rabbitmq.exchange=message-queue-exchange
10sample.rabbitmq.routingkey=message-queue-routingkey
RabbitMQConfig.java
Here we define Exchnage
, Queue
and routingKey
to connect Exchnage
and Queue
. Another configuration is MessageConverter
, for this we use jackson implementation Jackson2JsonMessageConverter
to exchange json
data.
1package com.poc.sender.config;
2
3import org.springframework.amqp.core.*;
4import org.springframework.amqp.rabbit.connection.ConnectionFactory;
5import org.springframework.amqp.rabbit.core.RabbitTemplate;
6import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
7import org.springframework.amqp.support.converter.MessageConverter;
8import org.springframework.beans.factory.annotation.Value;
9import org.springframework.context.annotation.Bean;
10import org.springframework.context.annotation.Configuration;
11
12@Configuration
13public class RabbitMQConfig {
14 @Value("${sample.rabbitmq.queue}")
15 String queueName;
16 @Value("${sample.rabbitmq.exchange}")
17 String exchange;
18 @Value("${sample.rabbitmq.routingkey}")
19 private String routingkey;
20
21 @Bean
22 Queue queue() {
23 return new Queue(queueName, false);
24 }
25
26 @Bean
27 DirectExchange exchange() {
28 return new DirectExchange(exchange);
29 }
30
31 @Bean
32 Binding binding(Queue queue, DirectExchange exchange) {
33 return BindingBuilder.bind(queue).to(exchange).with(routingkey);
34 }
35
36 @Bean
37 public MessageConverter jsonMessageConverter() {
38 return new Jackson2JsonMessageConverter();
39 }
40
41 @Bean
42 public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
43 final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
44 rabbitTemplate.setMessageConverter(jsonMessageConverter());
45 return rabbitTemplate;
46 }
47}
RabbitMQSender.java
An abstract service to send messages to RabbitMQ.
1package com.poc.sender.service;
2
3import com.poc.sender.model.Message;
4import org.springframework.amqp.core.AmqpTemplate;
5import org.springframework.beans.factory.annotation.Autowired;
6import org.springframework.beans.factory.annotation.Value;
7import org.springframework.scheduling.annotation.Scheduled;
8import org.springframework.stereotype.Service;
9
10import java.util.concurrent.atomic.AtomicLong;
11
12@Service
13public class RabbitMQSender {
14 @Autowired
15 private AmqpTemplate rabbitTemplate;
16
17 @Value("${sample.rabbitmq.exchange}")
18 private String exchange;
19
20 @Value("${sample.rabbitmq.routingkey}")
21 private String routingkey;
22 private AtomicLong count = new AtomicLong(0L);
23
24 @Scheduled
25 public void send(Message message) {
26
27 rabbitTemplate.convertAndSend(exchange, routingkey, message);
28 System.out.println("( " + count.incrementAndGet() + " ) Send =: " + message);
29 }
30}
Message.java
Message Model to exchange data.
1package com.poc.sender.model;
2
3public class Message {
4 private String message;
5
6 public Message() {
7 }
8
9 public Message(String message) {
10 this.message = message;
11 }
12
13 public String getMessage() {
14 return message;
15 }
16
17 @Override
18 public String toString() {
19 return "Message{" +
20 "message='" + message + '\'' +
21 '}';
22 }
23}
24
Controller.java
Expose endpoint to send messages via API.
1package com.poc.sender.controller;
2
3import com.poc.sender.model.Message;
4import com.poc.sender.service.RabbitMQSender;
5import org.springframework.beans.factory.annotation.Autowired;
6import org.springframework.web.bind.annotation.*;
7
8import java.io.IOException;
9
10@RestController
11@RequestMapping(value = "/rabbitmq/")
12public class Controller {
13 @Autowired
14 private RabbitMQSender rabbitMQSender;
15
16 @PostMapping(value = "/producer")
17 public String producer(@RequestBody Message message) throws IOException {
18 rabbitMQSender.send(message);
19 return "Message sent to the RabbitMQ Successfully";
20 }
21}
RabbitmqSenderApplication.java
Main Springboot class to start application.
1package com.poc.sender;
2
3import com.poc.sender.model.Message;
4import com.poc.sender.service.RabbitMQSender;
5import org.springframework.beans.factory.annotation.Autowired;
6import org.springframework.boot.CommandLineRunner;
7import org.springframework.boot.SpringApplication;
8import org.springframework.boot.autoconfigure.SpringBootApplication;
9
10import java.util.Random;
11
12@SpringBootApplication
13public class RabbitmqSenderApplication {
14 public static void main(String[] args) {
15 SpringApplication.run(RabbitmqSenderApplication.class, args);
16 }
17
18}
Receivers
Springboot RabbitMQ application to read data from RabbitMQ.
build.gradle
1plugins {
2 id 'org.springframework.boot' version '2.6.3'
3 id 'io.spring.dependency-management' version '1.0.11.RELEASE'
4 id 'java'
5}
6
7group = 'com.poc'
8version = '0.0.1-SNAPSHOT'
9sourceCompatibility = '11'
10
11repositories {
12 mavenCentral()
13}
14
15dependencies {
16 implementation 'org.springframework.boot:spring-boot-starter-actuator'
17 implementation 'org.springframework.boot:spring-boot-starter-amqp'
18 implementation 'org.springframework.boot:spring-boot-starter-web'
19 testImplementation 'org.springframework.boot:spring-boot-starter-test'
20 testImplementation 'org.springframework.amqp:spring-rabbit-test'
21}
22
23tasks.named('test') {
24 useJUnitPlatform()
25}
26
application.properties
Here you can set certain properties related to connection. In case you need to make a ssl
connection, mainly for production,
don't forget to set spring.rabbitmq.ssl.enabled=true
. Here , as we are only connecting with local, so ignoring ssl props.
1server.port=8086
2spring.rabbitmq.host=localhost
3spring.rabbitmq.port=5672
4spring.rabbitmq.username=guest
5spring.rabbitmq.password=guest
6#logging.level.root=DEBUG
7spring.main.allow-bean-definition-overriding=true
8sample.rabbitmq.queue=message-queue
9sample.rabbitmq.exchange=message-queue-exchange
10sample.rabbitmq.routingkey=message-queue-routingkey
11spring.rabbitmq.listener.simple.missing-queues-fatal=false
12spring.rabbitmq.listener.simple.acknowledge-mode=auto
13spring.rabbitmq.listener.simple.retry.max-attempts=5
14spring.rabbitmq.listener.simple.retry.enabled=true
15spring.rabbitmq.listener.simple.retry.max-interval=10000
16spring.rabbitmq.listener.simple.retry.initial-interval=5000
17spring.rabbitmq.listener.simple.retry.multiplier=2
18spring.rabbitmq.listener.simple.concurrency=3
spring.rabbitmq.listener.simple.missing-queues-fatal=false
means not to fail the spring boot application in case queue do not exist. In current example, this case will never occurred
as we have defined the queue in config.
spring.rabbitmq.listener.simple.acknowledge-mode=auto
will help setting up the ack automatically. E.g if we read message and processed it without any error, that message will get
removed from queue but if we throw some exception, like here we are throwing MessageException
if message is Hello
, then message will requeue and retry mechanism will get executed.
spring.rabbitmq.listener.simple.retry.enabled=true
Retry is enabled.
spring.rabbitmq.listener.simple.retry.max-attempts=5
each failed message will be retried 5 times.
spring.rabbitmq.listener.simple.retry.initial-interval=5000
if message read fails, this message will be retried again after this time.
spring.rabbitmq.listener.simple.retry.multiplier=2
if a failed messaged failed again , then the retry time would be last retry multiply by this multiplier and max can be
spring.rabbitmq.listener.simple.retry.max-interval=10000
E.g. if a message failed all five times here, then retry timing would be below (including first read)
- 0 Seconds
- 5 Seconds
- 10 Seconds
- 10 Seconds (because
spring.rabbitmq.listener.simple.retry.max-interval=10000
is set, otherwise it would have been 20 Seconds) - 10 Seconds (because
spring.rabbitmq.listener.simple.retry.max-interval=10000
is set, otherwise it would have been 40 Seconds)
RabbitMQConfig.java
RabbitMQ configuration for Message converter.
1package com.poc.receiver.config;
2
3import org.springframework.amqp.core.Binding;
4import org.springframework.amqp.core.BindingBuilder;
5import org.springframework.amqp.core.DirectExchange;
6import org.springframework.amqp.core.Queue;
7import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
8import org.springframework.amqp.support.converter.MessageConverter;
9import org.springframework.beans.factory.annotation.Value;
10import org.springframework.context.annotation.Bean;
11import org.springframework.context.annotation.Configuration;
12
13@Configuration
14public class RabbitMQConfig {
15 @Value("${sample.rabbitmq.queue}")
16 String queueName;
17 @Value("${sample.rabbitmq.exchange}")
18 String exchange;
19 @Value("${sample.rabbitmq.routingkey}")
20 private String routingkey;
21
22 @Bean
23 Queue queue() {
24 return new Queue(queueName, false);
25 }
26
27 @Bean
28 DirectExchange exchange() {
29 return new DirectExchange(exchange);
30 }
31
32 @Bean
33 Binding binding(Queue queue, DirectExchange exchange) {
34 return BindingBuilder.bind(queue).to(exchange).with(routingkey);
35 }
36
37 @Bean
38 public MessageConverter jsonMessageConverter() {
39 return new Jackson2JsonMessageConverter();
40 }
41}
RabbitMQ.java Listener
1package com.poc.receiver.listener;
2
3import com.poc.receiver.MessageException;
4import com.poc.receiver.model.Message;
5import org.springframework.amqp.rabbit.annotation.RabbitListener;
6import org.springframework.stereotype.Component;
7
8import java.time.Instant;
9import java.util.concurrent.atomic.AtomicLong;
10
11@Component
12public class RabbitMQ {
13 private AtomicLong count = new AtomicLong(0L);
14 @RabbitListener(queues = "${sample.rabbitmq.queue}")
15 public void receivedMessage(Message message) {
16 System.out.println(Instant.now() + " ( "+count.incrementAndGet()+" ) Received = : " + message);
17 if(message.getMessage().equalsIgnoreCase("Hello"))
18 throw new MessageException("Throwing Message exception");
19
20 }
21}
Message.java
Message Model to exchange data.
1package com.poc.receiver.model;
2
3public class Message {
4 private String message;
5
6 public Message() {
7 }
8
9 public Message(String message) {
10 this.message = message;
11 }
12
13 public String getMessage() {
14 return message;
15 }
16
17 @Override
18 public String toString() {
19 return "Message{" +
20 "message='" + message + '\'' +
21 '}';
22 }
23}
RabbitmqReceiverApplication.java
1package com.poc.receiver;
2
3import org.springframework.boot.SpringApplication;
4import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6@SpringBootApplication
7public class RabbitmqReceiverApplication {
8
9 public static void main(String[] args) {
10 SpringApplication.run(RabbitmqReceiverApplication.class, args);
11 }
12
13}
14
You might have notice this implementation 'org.springframework.boot:spring-boot-starter-actuator'
dependency in build.gradle
. Ofcourse, this provides
production ready endpoints for monitoring the application. One more important thing this do here is that it automatically add RabbitMQ
to health status of application.
And while doing so, this execute all beans define in config file, which result in creating queue
, exchnage
and bindings
automatically on application startup.
If we don't include this dependency, then queue
, exchnage
and bindings
will get created if we manually call the bean in e.g. CommandlineListener
or on first use e.g when we call API to send message.
Check complete code on github
You can check the complete code at GitHub.