Define the domain class Employee as follows-
package com.javainuse.domain;
public class Employee {
public Employee(String empId, String name, String type) {
this.empId = empId;
this.name = name;
this.department = type;
}
private String empId;
private String name;
private String department;
private double salary;
public String getId() {
return empId;
}
public String getName() {
return name;
}
public double getSalary() {
return salary;
}
public void setSalary(double salary) {
this.salary = salary;
}
public String getDepartment() {
return department;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("Employee [empId=");
builder.append(empId);
builder.append(", name=");
builder.append(name);
builder.append(", department=");
builder.append(department);
builder.append(", salary=");
builder.append(salary);
builder.append("]");
return builder.toString();
}
}
Define the domain class Department as follows-
package com.javainuse.domain;
import java.util.List;
public class Department {
private List<Employee> employees;
private double totalSalary;
public List<Employee> getEmployees() {
return employees;
}
public void setEmployees(List<Employee> employees) {
this.employees = employees;
}
public double getTotalSalary() {
return totalSalary;
}
public void setTotalSalary(double totalSalary) {
this.totalSalary = totalSalary;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("Department");
builder.append(", Employees=");
builder.append(employees);
builder.append(", totalSalary=");
builder.append(totalSalary);
builder.append("]");
return builder.toString();
}
}
Next define the Route as follows
package com.javainuse.aggregatesplit;
import org.apache.camel.builder.RouteBuilder;
public class DepartmentRouter extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:processDept")
.split(body().method("getEmployees"),
new DepartmentEmployeeStrategy())
.to("direct:processEmployee").end();
from("direct:processEmployee").choice()
.when(body().method("getDepartment").isEqualTo("Finance"))
.to("bean:employeeService?method=processFinanceDept")
.when(body().method("getDepartment").isEqualTo("IT"))
.to("bean:employeeService?method=processITDept");
}
}
Define the ProcessEmployees class which processes the Employee salary based on their department.
package com.javainuse.aggregatesplit;
import com.javainuse.domain.Employee;
public class ProcessEmployees {
public Employee processITDept(Employee employee)
throws InterruptedException {
System.out.println("handling employee department:" + employee);
employee.setSalary(10000);
System.out.println("IT dept employee processed");
return employee;
}
public Employee processFinanceDept(Employee employee)
throws InterruptedException {
System.out.println("handling employee department:" + employee);
employee.setSalary(5000);
System.out.println("Finance dept employee processed");
return employee;
}
}
Next define the class implementing our AggregationStrategy.
package com.javainuse.aggregatesplit;
import java.util.ArrayList;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import com.javainuse.domain.Department;
import com.javainuse.domain.Employee;
public class DepartmentEmployeeStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldEmployeeExchange, Exchange newEmployeeExchange) {
if (oldEmployeeExchange == null) {
Employee newEmployee= newEmployeeExchange.getIn().getBody(Employee.class);
System.out.println("Aggregate first employee: " + newEmployee);
Department department = new Department();
List<Employee> employees = new ArrayList<Employee>();
employees.add(newEmployee);
department.setEmployees(employees);
department.setTotalSalary(newEmployee.getSalary());
newEmployeeExchange.getIn().setBody(department);
return newEmployeeExchange;
}
Department department = oldEmployeeExchange.getIn().getBody(Department.class);
Employee newEmployee= newEmployeeExchange.getIn().getBody(Employee.class);
System.out.println("Aggregate old employees: " + department);
System.out.println("Aggregate new department: " + newEmployee);
department.getEmployees().add(newEmployee);
double totalSalary = department.getTotalSalary() + newEmployee.getSalary();
department.setTotalSalary(totalSalary);
return oldEmployeeExchange;
}
}
Next define the camel-context as follows-
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cxf="http://camel.apache.org/schema/cxf"
xmlns:jaxrs="http://cxf.apache.org/jaxrs"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/cxf
http://camel.apache.org/schema/cxf/camel-cxf.xsd
http://cxf.apache.org/jaxrs
http://cxf.apache.org/schemas/jaxrs.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<bean id="deptRouter" class="com.javainuse.aggregatesplit.DepartmentRouter" />
<bean id="employeeService" class="com.javainuse.aggregatesplit.ProcessEmployees" />
<camelContext id="departmentCtx" xmlns="http://camel.apache.org/schema/spring">
<routeBuilder ref="deptRouter" />
</camelContext>
</beans>
Finally load the camel context and call the routes with the data.
package com.javainuse.aggregatesplit;
import java.util.ArrayList;
import java.util.List;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.javainuse.domain.Department;
import com.javainuse.domain.Employee;
public class DepartmentApp {
public static void main(String[] args) {
try {
ApplicationContext springCtx = new ClassPathXmlApplicationContext(
"camel-context.xml");
CamelContext context = springCtx.getBean("departmentCtx",
CamelContext.class);
context.start();
ProducerTemplate producerTemplate = context.createProducerTemplate();
List<Employee> employees = new ArrayList<Employee>();
employees.add(new Employee("1", "emp1", "Finance"));
employees.add(new Employee("2", "emp2", "IT"));
employees.add(new Employee("2", "emp3", "IT"));
Department dept = new Department();
dept.setEmployees(employees);
Department deptDetails = producerTemplate.requestBody(
"direct:processDept", dept, Department.class);
System.out.println("Department Details - "+deptDetails);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
We can see that the individual employee salary is processed based on the department. We also get the total Salary of all employees.
Download Source Code
Download it -
Apache Camel EIP - Splitter and Aggregator pattern
See Also
Spring Boot Hello World Application- Create simple controller and jsp view using Maven
Spring Boot Tutorial-Spring Data JPA
Spring Boot + Simple Security Configuration
Pagination using Spring Boot Simple Example
Spring Boot + ActiveMQ Hello world Example
Spring Boot + Swagger Example Hello World Example
Spring Boot + Swagger- Understanding the various Swagger Annotations
Spring Boot Main Menu
Spring Boot Interview Questions