Search Tutorials


Understanding Apache Camel EIP - Splitter and Aggregator pattern using example | JavaInUse

Understanding Apache Camel EIP - Splitter and Aggregator pattern using example

In this tutorial we will create an apache camel application to understand the Enterprise Integartion Pattern - Splitter and Aggregator
The Splitter from the EIP patterns allows you split a message into a number of pieces and process them individually.The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message.
Apache Camel EIP - Splitter and Aggregator pattern Example

Apache Camel - Table of Contents

File Transfer Using Java DSL Apache Camel Apache Camel Java DSL + Spring Integration Hello World Example Apache Camel Exception Handling Using Simple Example Apache Camel Redelivery policy using example Integrate Apache Camel and ActiveMQ EIP patterns using Apache Camel Apache Camel Tutorial- Integrate Spring Boot+ Apache Camel Apache Camel Tutorial- Integrate with MySQL DB using SQL query Apache Camel EIP - Splitter and Aggregator pattern Apache Camel Unit Testing Apache Camel + Spring + Quartz Hello World Example Camel application deployment on JBoss Fuse Apache Camel + Apache CXF SOAP Webservices Apache Camel + JAX-RS REST Webservice Apache Camel + CXFRS REST Webservice Apache Camel Routing Slip EIP Pattern Apache Camel Dynamic Router Pattern Apache Camel Load Balancer EIP Pattern Apache Camel Interceptors Apache Camel + Kafka Hello World Example Apache Camel - Marshalling/Unmarshalling XML/JSON Data Example Calling and Consuming Webservices using Apache Camel Apache Camel Tutorial - Send SMTP Email Using Gmail Apache Camel Tutorial - SEDA component Hello World Example Spring Boot + Apache Camel + RabbitMQ - Hello World Example Apache Camel Tutorial - Idempotent Consumer using MemoryIdempotentRepository and FileIdempotentRepository Spring Boot + Apache Camel JDBC component + MySQL - Hello World Example Spring Boot + Apache Camel SQL component + MySQL - Hello World Example Spring Boot + Apache Camel SQL component + Transaction Management Example

Lets Begin

We will create Eclipse maven project as follows-

Apache Camel EIP - Splitter and Aggregator pattern tutorial
Our pom file will be as follows-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javainuse</groupId>
  <artifactId>camel-splitter-aggregation</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <properties>
    <camelspring.version>2.16.0</camelspring.version>  
    <spring.version>4.1.6.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-core</artifactId>
    <version></version>
</dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-spring</artifactId>
    <version></version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version></version>
</dependency>
</dependencies>
</project>



Define the domain class Employee as follows-
package com.javainuse.domain;

public class Employee {

    public Employee(String empId, String name, String type) {
        this.empId = empId;
        this.name = name;
        this.department = type;
    }

    private String empId;

    private String name;
    
    private String department;

    private double salary;

    public String getId() {
        return empId;
    }

    public String getName() {
        return name;
    }

    public double getSalary() {
        return salary;
    }

    public void setSalary(double salary) {
        this.salary = salary;
    }
    
    public String getDepartment() {
        return department;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("Employee [empId=");
        builder.append(empId);
        builder.append(", name=");
        builder.append(name);
        builder.append(", department=");
        builder.append(department);
        builder.append(", salary=");
        builder.append(salary);
        builder.append("]");
        return builder.toString();
    }
    
}
  

Define the domain class Department as follows-
  package com.javainuse.domain;

import java.util.List;

public class Department {
    
    
    private List<Employee> employees;
    
    private double totalSalary;
        

    public List<Employee> getEmployees() {
        return employees;
    }

    public void setEmployees(List<Employee> employees) {
        this.employees = employees;
    }    

     public double getTotalSalary() {
        return totalSalary;
    }

    public void setTotalSalary(double totalSalary) {
        this.totalSalary = totalSalary;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("Department");
        builder.append(", Employees=");
        builder.append(employees);
        builder.append(", totalSalary=");
        builder.append(totalSalary);
        builder.append("]");
        return builder.toString();
    }

}
  
Next define the Route as follows
 package com.javainuse.aggregatesplit;

import org.apache.camel.builder.RouteBuilder;

public class DepartmentRouter extends RouteBuilder {

	@Override
	public void configure() throws Exception {

		from("direct:processDept")
				.split(body().method("getEmployees"),
						new DepartmentEmployeeStrategy())
				.to("direct:processEmployee").end();

		from("direct:processEmployee").choice()
				.when(body().method("getDepartment").isEqualTo("Finance"))
				.to("bean:employeeService?method=processFinanceDept")
				.when(body().method("getDepartment").isEqualTo("IT"))
				.to("bean:employeeService?method=processITDept");
	}

}
  

Define the ProcessEmployees class which processes the Employee salary based on their department.
  package com.javainuse.aggregatesplit;

import com.javainuse.domain.Employee;

public class ProcessEmployees {

	public Employee processITDept(Employee employee)
			throws InterruptedException {

		System.out.println("handling employee department:" + employee);
		employee.setSalary(10000);

		System.out.println("IT dept employee processed");

		return employee;
	}

	public Employee processFinanceDept(Employee employee)
			throws InterruptedException {

		System.out.println("handling employee department:" + employee);
		employee.setSalary(5000);

		System.out.println("Finance dept employee processed");

		return employee;
	}
}
  
Next define the class implementing our AggregationStrategy.
  package com.javainuse.aggregatesplit;

import java.util.ArrayList;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

import com.javainuse.domain.Department;
import com.javainuse.domain.Employee;

public class DepartmentEmployeeStrategy implements AggregationStrategy {
    
    @Override
    public Exchange aggregate(Exchange oldEmployeeExchange, Exchange newEmployeeExchange) {
        
           if (oldEmployeeExchange == null) {
               
               Employee newEmployee= newEmployeeExchange.getIn().getBody(Employee.class);
               System.out.println("Aggregate first employee: " + newEmployee);
               
               Department department = new Department();
               List<Employee> employees = new ArrayList<Employee>();
   
               employees.add(newEmployee);
               department.setEmployees(employees);
               department.setTotalSalary(newEmployee.getSalary());
               
               newEmployeeExchange.getIn().setBody(department);
               
                return newEmployeeExchange;
            }
           
            Department department = oldEmployeeExchange.getIn().getBody(Department.class);
            Employee newEmployee= newEmployeeExchange.getIn().getBody(Employee.class);
     
            System.out.println("Aggregate old employees: " + department);
            System.out.println("Aggregate new department: " + newEmployee);
            
            department.getEmployees().add(newEmployee);
           
            double totalSalary = department.getTotalSalary() + newEmployee.getSalary();
            department.setTotalSalary(totalSalary);

            return oldEmployeeExchange;
    }

}
  

Next define the camel-context as follows-
 <?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:cxf="http://camel.apache.org/schema/cxf"
    xmlns:jaxrs="http://cxf.apache.org/jaxrs" 
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/cxf
        http://camel.apache.org/schema/cxf/camel-cxf.xsd
        http://cxf.apache.org/jaxrs
        http://cxf.apache.org/schemas/jaxrs.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context.xsd">


    <bean id="deptRouter" class="com.javainuse.aggregatesplit.DepartmentRouter" />
    <bean id="employeeService" class="com.javainuse.aggregatesplit.ProcessEmployees" />
    <camelContext id="departmentCtx" xmlns="http://camel.apache.org/schema/spring">
        <routeBuilder ref="deptRouter" />
    </camelContext>
    
</beans>
  

Finally load the camel context and call the routes with the data.
  package com.javainuse.aggregatesplit;

import java.util.ArrayList;
import java.util.List;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.javainuse.domain.Department;
import com.javainuse.domain.Employee;

public class DepartmentApp {
    
    public static void main(String[] args) {
        
    
        try {
            ApplicationContext springCtx = new ClassPathXmlApplicationContext(
                    "camel-context.xml");

            CamelContext context = springCtx.getBean("departmentCtx",
                    CamelContext.class);
            
            context.start();
                
            ProducerTemplate producerTemplate = context.createProducerTemplate();
            
            List<Employee> employees = new ArrayList<Employee>();
            employees.add(new Employee("1", "emp1", "Finance"));
            employees.add(new Employee("2", "emp2", "IT"));
            employees.add(new Employee("2", "emp3", "IT"));
            
            Department dept = new Department();
            dept.setEmployees(employees);
                        
            Department deptDetails = producerTemplate.requestBody(
                    "direct:processDept", dept, Department.class);
            
            System.out.println("Department Details - "+deptDetails);

            context.stop();
            
        } catch (Exception e) {
            
            e.printStackTrace();
        
        } 

    }

}
We can see that the individual employee salary is processed based on the department. We also get the total Salary of all employees.

Apache Camel EIP - Splitter and Aggregator pattern

Download Source Code

Download it - Apache Camel EIP - Splitter and Aggregator pattern

See Also

Spring Boot Hello World Application- Create simple controller and jsp view using Maven Spring Boot Tutorial-Spring Data JPA Spring Boot + Simple Security Configuration Pagination using Spring Boot Simple Example Spring Boot + ActiveMQ Hello world Example Spring Boot + Swagger Example Hello World Example Spring Boot + Swagger- Understanding the various Swagger Annotations Spring Boot Main Menu Spring Boot Interview Questions