Spring Boot + RabbitMQ Tutorial - Configure Listeners to consume messages | JavaInUse
RabbitMQ - Table Of Contents
What is Messaging? Getting Started with RabbitMQ - Install and start RabbitMQ. Spring Boot + RabbitMQ Publish Message Example Spring Boot + RabbitMQ Tutorial - Configure Listeners to consume messages using MessageListenerContainer Spring Boot + RabbitMQ Consume Message Example using RabbitListener Spring Boot + RabbitMQ Tutorial - Implement Exchange Types Spring Boot + RabbitMQ Tutorial - Retry and Error Handling Example Spring Cloud Stream - RabbitMQ Publish Message Example Spring Cloud Stream - RabbitMQ Consume Message Example Pivotal Cloud Foundry Tutorial - Deploying Spring Boot + RabbitMQ Application to PCF
Video
This tutorial is explained in the below Youtube Video.Let's Begin
The pom.xml will have the following dependencies-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javainuse</groupId> <artifactId>spring-boot</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.1.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
package com.javainuse.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; @Service public class RabbitMQListner implements MessageListener { public void onMessage(Message message) { System.out.println("Consuming Message - " + new String(message.getBody())); } }Next we will be creating the Spring Boot Configuration class for RabbitMQ. We will
- Create Queue bean using which we create a RabbitMQ named javainuse.input-queue. This will be a non durable queue. Do not misunderstand a non durable queue to be a temporary queue. Durability property is related to how long a message will be stored in the queue. For example for in RabbitMQ restart messages in non durable queue will be lost while those in durable queue will not be lost.
- Create MessageListenerContainer where we configure the RabbitMQConnections. Spring MessageListenerContainer is a replacement for a Message-Driven EJB. A connection is set up with the AMQ topic/queue, it gets messages from that topic/queue and feeds them to your MessageListener. We will be making use of the default connectionfactory. If we do not wish to use the default connectionfactory we can can create our own CachingConnectionFactory and use it.
package com.javainuse.config; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.javainuse.service.RabbitMQListner; @Configuration public class RabbitMQConfig { @Value("${javainuse.rabbitmq.queue}") String queueName; @Value("${spring.rabbitmq.username}") String username; @Value("${spring.rabbitmq.password}") private String password; @Bean Queue queue() { return new Queue(queueName, false); } //create MessageListenerContainer using default connection factory @Bean MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory ) { SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); simpleMessageListenerContainer.setConnectionFactory(connectionFactory); simpleMessageListenerContainer.setQueues(queue()); simpleMessageListenerContainer.setMessageListener(new RabbitMQListner()); return simpleMessageListenerContainer; } //create custom connection factory /*@Bean ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost"); cachingConnectionFactory.setUsername(username); cachingConnectionFactory.setUsername(password); return cachingConnectionFactory; }*/ //create MessageListenerContainer using custom connection factory /*@Bean MessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); simpleMessageListenerContainer.setConnectionFactory(connectionFactory()); simpleMessageListenerContainer.setQueues(queue()); simpleMessageListenerContainer.setMessageListener(new RabbitMQListner()); return simpleMessageListenerContainer; }*/ }