Search Tutorials


Spring Boot + RabbitMQ Tutorial - Retry and Error Handling Example using Dead Letter Queue(DQL) | JavaInUse



Spring Boot + RabbitMQ Tutorial - Retry and Error Handling Example | JavaInUse

In a previous tutorial we had implemented a Spring Boot + RabbitMQ example to send publish message to RabbitMQ Queue. In this tutorial we will be implementing a Spring Boot + RabbitMQ example to retry messages on exception and if exception still exists after maximum retries then put message in a dead letter queue where it can be analyzed and corrected later.

What is a Dead Letter Queue?

In English vocabulary Dead letter mail is an undeliverable mail that cannot be delivered to the addressee. A dead-letter queue (DLQ), sometimes which is also known as an undelivered-message queue, is a holding queue for messages that cannot be delivered to their destinations due to some reason or other.
According to Wikipedia page - In message queueing the dead letter queue is a service implementation to store messages that meet one or more of the following failure criteria:
  • Message that is sent to a queue that does not exist.
  • Queue length limit exceeded.
  • Message length limit exceeded.
  • Message is rejected by another queue exchange.
  • Message reaches a threshold read counter number, because it is not consumed. Sometimes this is called a "back out queue".
Later on we can analyze the messages in the DLQ to know the reason why the messages are failing.
Dead Letter Queue Tutorial

RabbitMQ - Table Of Contents

What is Messaging? Getting Started with RabbitMQ - Install and start RabbitMQ. Spring Boot + RabbitMQ Publish Message Example Spring Boot + RabbitMQ Tutorial - Configure Listeners to consume messages using MessageListenerContainer Spring Boot + RabbitMQ Consume Message Example using RabbitListener Spring Boot + RabbitMQ Tutorial - Implement Exchange Types Spring Boot + RabbitMQ Tutorial - Retry and Error Handling Example Spring Cloud Stream - RabbitMQ Publish Message Example Spring Cloud Stream - RabbitMQ Consume Message Example Pivotal Cloud Foundry Tutorial - Deploying Spring Boot + RabbitMQ Application to PCF

Video

This tutorial is explained in the below Youtube Video.

We will be implementing two modules -
  • Spring Boot Producer Module - It will produce a message and put it in RabbitMQ queue. It will also be responsible for creating the required queues including the dead letter queue.
  • Spring Boot Consumer Module - It will consume a message from RabbitMQ queue. We will be throwing an exception and then retrying the message. After maximum retries it will then be put in dead letter queue.

Spring Boot + RabbitMQ Error Handling Application

Spring Boot + RabbitMQ Producer Module

The Maven project will be as follows-
Spring Boot + RabbitMQ Producer Tutorial
The pom.xml will have the following dependencies-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javainuse</groupId>
	<artifactId>spring-boot-rabbitmq-producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<packaging>jar</packaging>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.1.RELEASE</version>
		<relativePath />
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-logging</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>	 
Define the domain class Employee as follows-
package com.javainuse.model;

import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;

@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Employee.class)
public class Employee {

	private String empName;
	private String empId;
	private int salary;

	public String getEmpName() {
		return empName;
	}

	public void setEmpName(String empName) {
		this.empName = empName;
	}

	public String getEmpId() {
		return empId;
	}

	public void setEmpId(String empId) {
		this.empId = empId;
	}

	public int getSalary() {
		return salary;
	}

	public void setSalary(int salary) {
		this.salary = salary;
	}

	@Override
	public String toString() {
		return "Employee [empName=" + empName + ", empId=" + empId + ", salary=" + salary + "]";
	}
}
Next define the configuration class where we-
  • Create Direct Exchanges named - deadLetterExchange and javainuseExchange.
  • Create Queue named javainuse and dlq. For the javainuse queue specify the x-dead-letter-exchange argument as the deadLetterExchange. This means that any message in javainuse queue that cannot be delivered will be sent to the deadLetterExchange.
  • Bind the javainuse queue with javainuseExchange and the dlq queue with deadLetterExchange.
	 
package com.javainuse.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

	@Bean
	DirectExchange deadLetterExchange() {
		return new DirectExchange("deadLetterExchange");
	}
	
	@Bean
	DirectExchange exchange() {
		return new DirectExchange("javainuseExchange");
	}

	@Bean
	Queue dlq() {
		return QueueBuilder.durable("deadLetter.queue").build();
	}

	@Bean
	Queue queue() {
		return QueueBuilder.durable("javainuse.queue").withArgument("x-dead-letter-exchange", "deadLetterExchange")
				.withArgument("x-dead-letter-routing-key", "deadLetter").build();
	}

	@Bean
	Binding DLQbinding() {
		return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter");
	}

	@Bean
	Binding binding() {
		return BindingBuilder.bind(queue()).to(exchange()).with("javainuse");
	}

	@Bean
	public MessageConverter jsonMessageConverter() {
		return new Jackson2JsonMessageConverter();
	}

	public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
		rabbitTemplate.setMessageConverter(jsonMessageConverter());
		return rabbitTemplate;
	}
}
Create the RabbitMQWebController class where we expose API to send message to RabbitMQ Exchange.
package com.javainuse.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/javainuse-rabbitmq/")
public class RabbitMQWebController {

	@Autowired
	private AmqpTemplate amqpTemplate;

	@GetMapping(value = "/producer")
	public String producer(@RequestParam("empName") String empName,@RequestParam("empId") String empId,@RequestParam("salary") int salary) {
		Employee emp=new Employee();
		emp.setEmpId(empId);
		emp.setEmpName(empName);
		emp.setSalary(salary);

		amqpTemplate.convertAndSend("javainuseExchange", "javainuse", emp);
		return "Message sent to the RabbitMQ Successfully";
	}
}
Create the Spring Boot Bootstrap class with SpringBootApplication annotation.
package com.javainuse;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootHelloWorldApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringBootHelloWorldApplication.class, args);
	}
}

Spring Boot Consumer Module

The project will be as follows-
Spring Boot RabbitMQ consume Eclipse Setup
Define the pom.xml as follows- Add the spring-boot-starter-amqp dependency.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.javainuse</groupId>
	<artifactId>spring-boot-rabbitmq-consumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spring-boot-rabbitmq-consumer</name>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.1.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-logging</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


	<description>SpringBootRabbitMQConsumer</description>
</project>


Define the domain class Employee as follows-
package com.javainuse.model;

import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;

@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Employee.class)
public class Employee {

	private String empName;
	private String empId;
	private int salary;

	public String getEmpName() {
		return empName;
	}

	public void setEmpName(String empName) {
		this.empName = empName;
	}

	public String getEmpId() {
		return empId;
	}

	public void setEmpId(String empId) {
		this.empId = empId;
	}

	@Override
	public String toString() {
		return "Employee [empName=" + empName + ", empId=" + empId + ", salary=" + salary + "]";
	}

	public int getSalary() {
		return salary;
	}

	public void setSalary(int salary) {
		this.salary = salary;
	}
}
Define a custom checked exception named InvalidSalaryException as follows-
package com.javainuse.exception;

public class InvalidSalaryException extends Exception {

	private static final long serialVersionUID = -3154618962130084535L;

}

Define the RabbitMQConsumer class which consumes the message from RabbitMQ using RabbitListener.The RabbitMQ Listener listens to RabbitMQ Queue for any incoming messages. For the basic configuration we specify the the Queue/Topic Name (the name of the queue/topic where the message should be consumed). Also here we will be checking the incoming message for salary field. If this field is negative we will be throwing an InvalidSalaryException.
package com.javainuse.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.javainuse.exception.InvalidSalaryException;
import com.javainuse.model.Employee;

@Component
public class RabbitMQConsumer {

	private static final Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class);

	@RabbitListener(queues = "javainuse.queue")
	public void recievedMessage(Employee employee) throws InvalidSalaryException {
		logger.info("Recieved Message From RabbitMQ: " + employee);
		if (employee.getSalary() < 0) {
			throw new InvalidSalaryException();
		}
	}
}

Next define th following properties in application.yml. Here we enable the Spring Boot RabbitMQ retry mechanism and specify some more additional parameters-
  • initial interval- The message should be retried after an interval of 3s.
  • max-attempts- The message should be retried maximum of 6 times. After which it will be sent to dead letter Queue.
  • max-interval- The maximum time interval between two retries should never exceed 10s.
  • multiplier- The interval between second retry gets multiplied by 2. But this interval can never exceed the max-interval. So the retry interval values will be 3s, 6s, 10s, 10s, 10s. As 10 sec is the max interval specified.
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 3s
          max-attempts: 6
          max-interval: 10s
          multiplier: 2
         
server:
  port: 8081
Finally Define the Spring Boot Class with @SpringBootApplication annotation
package com.javainuse;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class SpringBootConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringBootConsumerApplication.class, args);
	}

	@Bean
	public Jackson2JsonMessageConverter converter() {
		return new Jackson2JsonMessageConverter();
	}
}

In a previous tutorial we have shown how to install RabbitMQ and get started.
Start the Producer and Consumer applications. And go to http://localhost:8080/javainuse-rabbitmq/producer?empName=emp1&empId=emp001&salary=-50 The message will be sent to rabbitMQ queue named javainuse.queue and consumed by the consumer application. As the salary is negative , InvalidSalaryException will be thrown. This message will be retried 6 times and then will be put in dead letter queue.
Spring Boot RabbitMQ Retry Example

Spring Boot DLQ Example

Download Source Code

Download it -
Spring Boot + RabbitMQ Error Handling Producer Application
Spring Boot + RabbitMQ Error Handling Consumer Application