Spring Boot + RabbitMQ Tutorial - Retry and Error Handling Example | JavaInUse
What is a Dead Letter Queue?
In English vocabulary Dead letter mail is an undeliverable mail that cannot be delivered to the addressee. A dead-letter queue (DLQ), sometimes which is also known as an undelivered-message queue, is a holding queue for messages that cannot be delivered to their destinations due to some reason or other.According to Wikipedia page - In message queueing the dead letter queue is a service implementation to store messages that meet one or more of the following failure criteria:
- Message that is sent to a queue that does not exist.
- Queue length limit exceeded.
- Message length limit exceeded.
- Message is rejected by another queue exchange.
- Message reaches a threshold read counter number, because it is not consumed. Sometimes this is called a "back out queue".
RabbitMQ - Table Of Contents
What is Messaging? Getting Started with RabbitMQ - Install and start RabbitMQ. Spring Boot + RabbitMQ Publish Message Example Spring Boot + RabbitMQ Tutorial - Configure Listeners to consume messages using MessageListenerContainer Spring Boot + RabbitMQ Consume Message Example using RabbitListener Spring Boot + RabbitMQ Tutorial - Implement Exchange Types Spring Boot + RabbitMQ Tutorial - Retry and Error Handling Example Spring Cloud Stream - RabbitMQ Publish Message Example Spring Cloud Stream - RabbitMQ Consume Message Example Pivotal Cloud Foundry Tutorial - Deploying Spring Boot + RabbitMQ Application to PCF
Video
This tutorial is explained in the below Youtube Video.- Spring Boot Producer Module - It will produce a message and put it in RabbitMQ queue. It will also be responsible for creating the required queues including the dead letter queue.
- Spring Boot Consumer Module - It will consume a message from RabbitMQ queue. We will be throwing an exception and then retrying the message. After maximum retries it will then be put in dead letter queue.
Spring Boot + RabbitMQ Producer Module
The Maven project will be as follows-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javainuse</groupId> <artifactId>spring-boot-rabbitmq-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.1.RELEASE</version> <relativePath /> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
package com.javainuse.model; import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Employee.class) public class Employee { private String empName; private String empId; private int salary; public String getEmpName() { return empName; } public void setEmpName(String empName) { this.empName = empName; } public String getEmpId() { return empId; } public void setEmpId(String empId) { this.empId = empId; } public int getSalary() { return salary; } public void setSalary(int salary) { this.salary = salary; } @Override public String toString() { return "Employee [empName=" + empName + ", empId=" + empId + ", salary=" + salary + "]"; } }Next define the configuration class where we-
- Create Direct Exchanges named - deadLetterExchange and javainuseExchange.
- Create Queue named javainuse and dlq. For the javainuse queue specify the x-dead-letter-exchange argument as the deadLetterExchange. This means that any message in javainuse queue that cannot be delivered will be sent to the deadLetterExchange.
- Bind the javainuse queue with javainuseExchange and the dlq queue with deadLetterExchange.
package com.javainuse.config; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean DirectExchange deadLetterExchange() { return new DirectExchange("deadLetterExchange"); } @Bean DirectExchange exchange() { return new DirectExchange("javainuseExchange"); } @Bean Queue dlq() { return QueueBuilder.durable("deadLetter.queue").build(); } @Bean Queue queue() { return QueueBuilder.durable("javainuse.queue").withArgument("x-dead-letter-exchange", "deadLetterExchange") .withArgument("x-dead-letter-routing-key", "deadLetter").build(); } @Bean Binding DLQbinding() { return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter"); } @Bean Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with("javainuse"); } @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } }Create the RabbitMQWebController class where we expose API to send message to RabbitMQ Exchange.
package com.javainuse.controller; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "/javainuse-rabbitmq/") public class RabbitMQWebController { @Autowired private AmqpTemplate amqpTemplate; @GetMapping(value = "/producer") public String producer(@RequestParam("empName") String empName,@RequestParam("empId") String empId,@RequestParam("salary") int salary) { Employee emp=new Employee(); emp.setEmpId(empId); emp.setEmpName(empName); emp.setSalary(salary); amqpTemplate.convertAndSend("javainuseExchange", "javainuse", emp); return "Message sent to the RabbitMQ Successfully"; } }Create the Spring Boot Bootstrap class with SpringBootApplication annotation.
package com.javainuse; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringBootHelloWorldApplication { public static void main(String[] args) { SpringApplication.run(SpringBootHelloWorldApplication.class, args); } }
Spring Boot Consumer Module
The project will be as follows-Define the pom.xml as follows- Add the spring-boot-starter-amqp dependency.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javainuse</groupId> <artifactId>spring-boot-rabbitmq-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-boot-rabbitmq-consumer</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.1.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <description>SpringBootRabbitMQConsumer</description> </project>
package com.javainuse.model; import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Employee.class) public class Employee { private String empName; private String empId; private int salary; public String getEmpName() { return empName; } public void setEmpName(String empName) { this.empName = empName; } public String getEmpId() { return empId; } public void setEmpId(String empId) { this.empId = empId; } @Override public String toString() { return "Employee [empName=" + empName + ", empId=" + empId + ", salary=" + salary + "]"; } public int getSalary() { return salary; } public void setSalary(int salary) { this.salary = salary; } }Define a custom checked exception named InvalidSalaryException as follows-
package com.javainuse.exception; public class InvalidSalaryException extends Exception { private static final long serialVersionUID = -3154618962130084535L; }