Search Tutorials


Spring Boot + RabbitMQ Tutorial - Implement Exchange Types | JavaInUse



Spring Boot + RabbitMQ Tutorial - Implement Exchange Types

In a previous tutorial we had implemented a simple Spring Boot + RabbitMQ project to publish message to RabbitMQ. We had made use of direct exchange. In this tutorial we will be implementing and understanding the various exchange types and their usage.
In next tutorial we will be implementing a Spring Boot + RabbitMQ example to retry messages on exception.

RabbitMQ - Table Of Contents

What is Messaging? Getting Started with RabbitMQ - Install and start RabbitMQ. Spring Boot + RabbitMQ Publish Message Example Spring Boot + RabbitMQ Tutorial - Configure Listeners to consume messages using MessageListenerContainer Spring Boot + RabbitMQ Consume Message Example using RabbitListener Spring Boot + RabbitMQ Tutorial - Implement Exchange Types Spring Boot + RabbitMQ Tutorial - Retry and Error Handling Example Spring Cloud Stream - RabbitMQ Publish Message Example Spring Cloud Stream - RabbitMQ Consume Message Example Pivotal Cloud Foundry Tutorial - Deploying Spring Boot + RabbitMQ Application to PCF

Video

This tutorial is explained in the below Youtube Video.

RabbitMQ Messaging Flow -

When using RabbitMQ the publisher never directly sends a message to a queue. Instead, the publisher sends messages to an exchange. Exchange is responsible for sending the message to an appropriate queue based on routing keys, bindings and header attributes. Exchanges are message routing agents which we can define and bindings are what connects the exchanges to queues. So in all our examples we will be creating first a Queue and Exchange, then bind them together.
RabbitMQ Message Flow
With RabbitMQ we have the following types of Exchanges-
  • Direct Exchange
  • Fanout Exchange
  • Topic Exchange
  • Header Exchange

Direct Exchange

Based on the routing key a message is sent to the queue having the same routing key specified in the binding rule. The routing key of exchange and the binding queue have to be an exact match. A message is sent to exactly one queue.
Direct Exchange type
The maven project will be as follows-
RabbitMQ Maven Project
The pom.xml will be as follows-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javainuse</groupId>
	<artifactId>spring-boot-exchanges</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.1.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
	</dependencies>

</project>
Create the RabbitMQDirectConfig as follows-
  • Create Queues named - marketingQueue, adminQueue, financeQueue
  • Create a DirectExchange named - direct-exchange
  • Create Bindings for each of the queue with the DirectExchange specifying the binding key
package com.javainuse.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQDirectConfig {

	@Bean
	Queue marketingQueue() {
		return new Queue("marketingQueue", false);
	}

	@Bean
	Queue financeQueue() {
		return new Queue("financeQueue", false);
	}

	@Bean
	Queue adminQueue() {
		return new Queue("adminQueue", false);
	}

	@Bean
	DirectExchange exchange() {
		return new DirectExchange("direct-exchange");
	}

	@Bean
	Binding marketingBinding(Queue marketingQueue, DirectExchange exchange) {
		return BindingBuilder.bind(marketingQueue).to(exchange).with("marketing");
	}

	@Bean
	Binding financeBinding(Queue financeQueue, DirectExchange exchange) {
		return BindingBuilder.bind(financeQueue).to(exchange).with("finance");
	}

	@Bean
	Binding adminBinding(Queue adminQueue, DirectExchange exchange) {
		return BindingBuilder.bind(adminQueue).to(exchange).with("admin");
	}

}
Create the RabbitMQDirectWebController class where we expose API to send message to RabbitMQ Exchange.
package com.javainuse.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/javainuse-rabbitmq/direct/")
public class RabbitMQDirectWebController {

	@Autowired
	private AmqpTemplate amqpTemplate;

	@GetMapping(value = "/producer")
	public String producer(@RequestParam("exchangeName") String exchange, @RequestParam("routingKey") String routingKey,
			@RequestParam("messageData") String messageData) {

		amqpTemplate.convertAndSend(exchange, routingKey, messageData);

		return "Message sent to the RabbitMQ Successfully";
	}

}
Create the bootstrap class with SpringBoot Annotation
package com.javainuse;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootRabbitMQApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringBootRabbitMQApplication.class, args);
	}
}
In a previous tutorial we have shown how to install RabbitMQ and get started. Once you have followed implemented this tutorial go to localhost:15672
RabbitMQ Console
Use the username and password as guest.If we now go to the queues section, currently there are no queues
RabbitMQ Console queues
Run the Spring Boot Application
  • We send the message using the url - http://localhost:8080/javainuse-rabbitmq/direct/producer?exchangeName=direct-exchange&routingKey=admin&messageData=HelloWorldJavaInUse
    we will be specifying the following
    • exchange name= "direct-exchange"
    • routing key ="admin"
    • message to sent to queue = "HelloWorldJavaInUse"
  • The message is sent to the admin queue.We get the web output as-
    RabbitMQ Direct Exchange Output
  • We can see that queues named marketingQueue, adminQueue and financeQueue are created. Also a message has been sent to the adminQueue.
    RabbitMQ Direct Exchange Binding
  • An exchange named direct-exchange is created with following bindings.
    RabbitMQ Direct Exchange Queues

Fanout Exchange

The message is routed to all the available bounded queues. The routing key if provided is completely ignored. So this is a kind of publish-subscribe design pattern.
Fanout Exchange type
Modify the RabbitMQFanoutConfig as follows-
  • Create Queues named - marketingQueue, adminQueue, financeQueue
  • Create a FanoutExchange named - fanout-exchange
  • Create Bindings for each of the queue with the FanoutExchange. Also as this is a fanout exchange we do not need to specify a binding key.
package com.javainuse.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQFanoutConfig {

	@Bean
	Queue marketingQueue() {
		return new Queue("marketingQueue", false);
	}

	@Bean
	Queue financeQueue() {
		return new Queue("financeQueue", false);
	}

	@Bean
	Queue adminQueue() {
		return new Queue("adminQueue", false);
	}

	@Bean
	FanoutExchange exchange() {
		return new FanoutExchange("fanout-exchange");
	}

	@Bean
	Binding marketingBinding(Queue marketingQueue, FanoutExchange exchange) {
		return BindingBuilder.bind(marketingQueue).to(exchange);
	}

	@Bean
	Binding financeBinding(Queue financeQueue, FanoutExchange exchange) {
		return BindingBuilder.bind(financeQueue).to(exchange);
	}

	@Bean
	Binding adminBinding(Queue adminQueue, FanoutExchange exchange) {
		return BindingBuilder.bind(adminQueue).to(exchange);
	}

}
Create the RabbitMQFanoutWebController class where we expose API to send message to RabbitMQ Exchange.
package com.javainuse.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/javainuse-rabbitmq/fanout/")
public class RabbitMQFanoutWebController {

	@Autowired
	private AmqpTemplate amqpTemplate;

	@GetMapping(value = "/producer")
	public String producer(@RequestParam("exchangeName") String exchange,
			@RequestParam("messageData") String messageData) {

		amqpTemplate.convertAndSend(exchange, "", messageData);

		return "Message sent to the RabbitMQ Fanout Exchange Successfully";
	}
}
Run the Spring Boot Application
  • We send the message using the url - http://localhost:8080/javainuse-rabbitmq/fanout/producer?exchangeName=fanout-exchange&messageData=HelloWorldJavaInUse
    • exchange name= "fanout-exchange"
    • message to sent to queue = "HelloWorldJavaInUse"
  • We do not need to specify the routing key here as message is published to all the queues. The message is sent to the admin queue.We get the web output as-
    RabbitMQ Fanout Exchange Output
  • We can see that queues named marketingQueue, adminQueue and financeQueue are created. Also message has been sent to the adminQueue.
    RabbitMQ Fanout Exchange Queues
  • An exchange named fanout-exchange is created with following bindings.
    RabbitMQ Fanout Exchange Binding

Topic Exchange

Here again the routing key is made use of. But unlike in direct exchange type, here the routing key of the exchange and the bound queues should not necessarily be an exact match. Using regular expressions like wildcard we can send the exchange to multiple bound queues.
topic Exchange type
Modify the RabbitMQTopicConfig as follows-
  • Create Queues named - marketingQueue, adminQueue, financeQueue and allQueue
  • Create a TopicExchange named - topic-exchange
  • Create Bindings for each of the queue with the TopicExchange. We specify routing key for each binding. Also for allQueue binding we specify the binding key with wildcard.
package com.javainuse.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQTopicConfig {

	@Bean
	Queue marketingQueue() {
		return new Queue("marketingQueue", false);
	}

	@Bean
	Queue financeQueue() {
		return new Queue("financeQueue", false);
	}

	@Bean
	Queue adminQueue() {
		return new Queue("adminQueue", false);
	}

	@Bean
	Queue allQueue() {
		return new Queue("allQueue", false);
	}

	@Bean
	TopicExchange topicExchange() {
		return new TopicExchange("topic-exchange");
	}
	
	@Bean
	Binding marketingBinding(Queue marketingQueue, TopicExchange topicExchange) {
		return BindingBuilder.bind(marketingQueue).to(topicExchange).with("queue.marketing");
	}
	
	@Bean
	Binding financeBinding(Queue financeQueue, TopicExchange topicExchange) {
		return BindingBuilder.bind(financeQueue).to(topicExchange).with("queue.finance");
	}
	
	@Bean
	Binding adminBinding(Queue adminQueue, TopicExchange topicExchange) {
		return BindingBuilder.bind(adminQueue).to(topicExchange).with("queue.admin");
	}
	
	@Bean
	Binding allBinding(Queue allQueue, TopicExchange topicExchange) {
		return BindingBuilder.bind(allQueue).to(topicExchange).with("queue.*");
	}

}
Create the RabbitMQTopicWebController class where we expose API to send message to RabbitMQ Exchange.
package com.javainuse.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/javainuse-rabbitmq/topic/")
public class RabbitMQTopicWebController {

	@Autowired
	private AmqpTemplate amqpTemplate;

	@GetMapping(value = "/producer")
	public String producer(@RequestParam("exchangeName") String exchange, @RequestParam("routingKey") String routingKey,
			@RequestParam("messageData") String messageData) {

		amqpTemplate.convertAndSend(exchange, routingKey, messageData);

		return "Message sent to the RabbitMQ Topic Exchange Successfully";
	}

}

Run the Spring Boot Application
  • We send the message using the url - http://localhost:8080//javainuse-rabbitmq/topic/producer?exchangeName=topic-exchange&routingKey=queue.admin&messageData=HelloWorldJavaInUse
    • exchange name= "topic-exchange"
    • routing key ="queue.admin"
    • message to be sent to queue = "HelloWorldJavaInUse"
  • The message is sent to the admin queue.We get the web output as-
    RabbitMQ Topic Exchange Output
  • We can see that queues named marketingQueue, adminQueue and financeQueue and allQueue are created.
    RabbitMQ Topic Exchange Queues
  • An exchange named topic-exchange is created with following bindings.
    RabbitMQ topic Exchange Binding

Header Exchange

In this type of exchange the routing queue is selected based on the criteria specified in the headers instead of the routing key. This is similar to topic exchange type, but here we can specify complex criteria for selecting routing queues.
Header Exchange type
Modify the RabbitMQHeaderConfig as follows-
  • Create Queues named - marketingQueue, adminQueue, financeQueue
  • Create a HeaderExchange named - header-exchange
  • Create Bindings for each of the queue with the HeaderExchange. As this is a Header Exchange, instead of binding key we specify the criteria rules which should be present in the message header.
package com.javainuse.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQHeaderConfig {

	@Bean
	Queue marketingQueue() {
		return new Queue("marketingQueue", false);
	}

	@Bean
	Queue financeQueue() {
		return new Queue("financeQueue", false);
	}

	@Bean
	Queue adminQueue() {
		return new Queue("adminQueue", false);
	}

	@Bean
	HeadersExchange headerExchange() {
		return new HeadersExchange("header-exchange");
	}

	@Bean
	Binding marketingBinding(Queue marketingQueue, HeadersExchange headerExchange) {
		return BindingBuilder.bind(marketingQueue).to(headerExchange).where("department").matches("marketing");
	}

	@Bean
	Binding financeBinding(Queue financeQueue, HeadersExchange headerExchange) {
		return BindingBuilder.bind(financeQueue).to(headerExchange).where("department").matches("finance");
	}

	@Bean
	Binding adminBinding(Queue adminQueue, HeadersExchange headerExchange) {
		return BindingBuilder.bind(adminQueue).to(headerExchange).where("department").matches("admin");
	}

}

Create the RabbitMQHeaderWebController class where we expose API to send message to RabbitMQ Exchange.
package com.javainuse.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/javainuse-rabbitmq/header/")
public class RabbitMQHeaderWebController {

	@Autowired
	private AmqpTemplate amqpTemplate;

	@GetMapping(value = "/producer")
	public String producer(@RequestParam("exchangeName") String exchange, @RequestParam("department") String department,
			@RequestParam("messageData") String messageData) {

		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setHeader("department", department);
		MessageConverter messageConverter = new SimpleMessageConverter();
		Message message = messageConverter.toMessage(messageData, messageProperties);
		amqpTemplate.send(exchange, "", message);

		return "Message sent to the RabbitMQ Header Exchange Successfully";
	}
}
Run the Spring Boot Application
  • We send the message using the url - http://localhost:8080/javainuse-rabbitmq/header/producer?exchangeName=header-exchange&department=admin&messageData=HelloWorldJavaInUse
    • exchange name= "header-exchange"
    • header key ="admin"
    • message to be sent to queue = "HelloWorldJavaInUse"
  • The message is sent to the admin queue.We get the web output as-
    RabbitMQ Heaader Exchange Output
  • We can see that queues named marketingQueue, adminQueue and financeQueue are created. Also adminQueue has a message.
    RabbitMQ Fanout Exchange Queues
  • An exchange named header-exchange is created with following bindings.
    RabbitMQ header Exchange Binding

Download Source Code

Download it -
Spring Boot + RabbitMQ Exchanges Example