Search Tutorials


Spring Boot 3 + gRPC Server Streaming Example | JavaInUse

Spring Boot 3 + gRPC Server Streaming Example

In previous tutorial we looked at Different types of gRPC calls. Also in another tutorial we had implemented Spring Boot 3 + gRPC Unary Example. In this tutorial we will be implementing spring boot + gRPC Server Streaming call. Server streaming gRPC is a communication protocol that allows the server to send a stream of responses to the client. This pattern is useful in scenarios where the server needs to push a large amount of data or a continuous stream of updates to the client.
For example, it can be used for real-time updates like stock market prices, weather updates, news feeds, or even sending a large file in chunks.
We will be implementing the following scenario.

Use Case

Consider a banking service where the the client needs to get transaction information of 30 days. This transaction information is processed by the server. But this is very large information which cannot be sent to the client as a single response. So it is sent as stream to the client.
Bank Server Streaming gRPC

Video

This tutorial is explained in the below Youtube Video.

gRPC - Table of Contents

Spring Boot+ gRPC Hello World Example Spring Boot gRPC Server + C# gRPC Client Example Spring Boot 3 + gRPC - Types of gRPC Spring Boot 3 + gRPC Unary Example Spring Boot 3 + gRPC Server Streaming Example Spring Boot 3 + gRPC Client Streaming Example Spring Boot 3 + gRPC Bidirectional Streaming Example Spring Boot + gRPC Deadline Example Spring Boot + gRPC Error Handling Example Spring Boot + gRPC Error Handling - Using Trailer Metadata Spring Boot + gRPC Error Handling - Global Exception Handler Using GrpcAdvice

Implementation

Spring Boot Server Streaming gRPC Server

We will be creating a maven project as follows -
Bank Server Streaming gRPC maven
The pom.xml will be as follows. We have the following dependencies
  • spring-boot-starter-parent- We have used version - 3.2.0. It provides default configurations, dependencies, and plugins that are commonly used in Spring Boot applications.
  • grpc-server-spring-boot-starter- We have used version - 2.15.0. Which is the latest dependency provided by maven repository. It provides the necessary components to integrate and run a gRPC server within a Spring Boot application.
  • grpc-stub-- We have used version - 1.59.0. This dependency provides classes and utilities for creating and using gRPC stubs, which are used to make remote procedure calls.
  • grpc-protobuf-- We have used version - 1.59.0. This library provides support for the Protocol Buffers serialization format used in gRPC, allowing you to define and exchange structured data between your gRPC client and server.
  • annotations-api- We have used version - 6.0.53. This artifact provides the necessary annotations for Java code generated by the protobuf-maven-plugin.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javainuse</groupId>
	<artifactId>boot-server-streaming-grpc-server</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.2.0</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<protobuf.version>3.17.3</protobuf.version>
		<protobuf-plugin.version>0.6.1</protobuf-plugin.version>
		<grpc.version>1.59.0</grpc.version>
	</properties>
	<dependencies>

		<dependency>
			<groupId>net.devh</groupId>
			<artifactId>grpc-server-spring-boot-starter</artifactId>
			<version>2.15.0.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>io.grpc</groupId>
			<artifactId>grpc-stub</artifactId>
			<version>${grpc.version}</version>
		</dependency>
		<dependency>
			<groupId>io.grpc</groupId>
			<artifactId>grpc-protobuf</artifactId>
			<version>${grpc.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.tomcat</groupId>
			<artifactId>annotations-api</artifactId>
			<version>6.0.53</version>
			<scope>provided</scope>
		</dependency>

	</dependencies>

	<build>
		<extensions>
			<extension>
				<groupId>kr.motd.maven</groupId>
				<artifactId>os-maven-plugin</artifactId>
				<version>1.7.1</version>
			</extension>
		</extensions>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
			<plugin>
				<groupId>org.xolstice.maven.plugins</groupId>
				<artifactId>protobuf-maven-plugin</artifactId>
				<version>0.6.1</version>
				<configuration>
					<protocArtifact>
						com.google.protobuf:protoc:3.24.0:exe:${os.detected.classifier}
					</protocArtifact>
					<pluginId>grpc-java</pluginId>
					<pluginArtifact>
						io.grpc:protoc-gen-grpc-java:1.59.0:exe:${os.detected.classifier}
					</pluginArtifact>
					<protoSourceRoot>
						${basedir}/src/main/proto/
					</protoSourceRoot>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>compile</goal>
							<goal>compile-custom</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>
Next create the proto file named bank-service.proto in src/main/proto folder.
A proto file in gRPC is a special type of file that helps define the structure and communication between different software components. It acts like a blueprint for creating and interacting with these components.
This protocol specifies the structure of messages for a banking application in version 3. It includes messages like AccountRequest and TransactionDetail, with fields like account_number and transaction_amount. It also defines a service called TransactionService for streaming transaction details based on a specified duration.
syntax = "proto3";

package banking;

option java_multiple_files = true;
option java_package = "com.javainuse.banking";

// Message representing a client's account transaction request
message AccountRequest {
  string account_number = 1;
  int32 duration_in_days = 2;
}

// Message representing a transaction detail
message TransactionDetail {
  string transaction_id = 1;
  string transaction_type = 2;
  float transaction_amount = 3;
}

message TransactionDetailList {
  repeated TransactionDetail transaction_details = 1;
}

// Service for streaming transaction details
service TransactionService {
  // Method to stream transaction details for a given duration in days
  rpc streamTransactions(AccountRequest) returns (stream TransactionDetailList);
}




Next create the spring bootstrap class as follows-
package com.javainuse.bank;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootGrpcServerExampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringBootGrpcServerExampleApplication.class, args);
	}
}
Run the maven install command which will generate the classes using the proto file.
Create the model class named Transaction. This will be the response we will be sending as stream data.
package com.javainuse.bank.model;

public class Transaction {

	private String id;
	private String type;
	private float amount;

	public Transaction() {
	}

	public Transaction(String id, String type, float amount) {
		super();
		this.id = id;
		this.type = type;
		this.amount = amount;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public float getAmount() {
		return amount;
	}

	public void setAmount(float amount) {
		this.amount = amount;
	}

}
Create the service named TransactionHistoryService. We use the @GrpcService to expose this class as a grpc service and expose it over the gRPC protocol. The TransactionHistoryService class is a gRPC service that extends TransactionServiceGrpc.TransactionServiceImplBase. TransactionServiceGrpc.TransactionServiceImplBase is the abstract base class generated by the gRPC framework based on the protocol buffer definition file bank-service.proto for the service. By extending the generated base class, you get a template with the necessary methods and can focus on implementing the business logic without worrying about handling low-level details of the gRPC communication.
It fetches transaction details based on a duration and sends them in batches. The fetchTransactions fetches mock transaction data and createTransactionDetailFromTransaction method creates a transaction detail creates from this mock transaction data. The class handles sending batches with a delay.
package com.javainuse.bank.service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import com.javainuse.bank.model.Transaction;
import com.javainuse.banking.AccountRequest;
import com.javainuse.banking.TransactionDetail;
import com.javainuse.banking.TransactionDetailList;
import com.javainuse.banking.TransactionServiceGrpc;

import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
public class TransactionHistoryService extends TransactionServiceGrpc.TransactionServiceImplBase {

	@Override
	public void streamTransactions(AccountRequest request, StreamObserver<TransactionDetailList> responseObserver) {
		// Assuming you have a method to fetch transaction details based on the duration
		// in days
		List<Transaction> transactions = fetchTransactions(request.getDurationInDays());
		int batchSize = 3; // How many transactions to send at once

		for (int i = 0; i < transactions.size(); i += batchSize) {
			int endIndex = Math.min(i + batchSize, transactions.size());
			List<Transaction> batchTransactions = transactions.subList(i, endIndex);

			TransactionDetailList.Builder transactionDetailListBuilder = TransactionDetailList.newBuilder();

			for (Transaction transaction : batchTransactions) {
				TransactionDetail transactionDetail = createTransactionDetailFromTransaction(transaction);
				transactionDetailListBuilder.addTransactionDetails(transactionDetail);
			}
			TransactionDetailList transactionDetailList = transactionDetailListBuilder.build();

			responseObserver.onNext(transactionDetailList);

			// Delay between sending batches (if necessary)
			// You can adjust this based on your requirements
			try {
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				e.printStackTrace();
				responseObserver.onError(e);
				return;
			}
		}

		responseObserver.onCompleted();
	}

	private TransactionDetail createTransactionDetailFromTransaction(Transaction transaction) {
		return TransactionDetail.newBuilder().setTransactionId(transaction.getId())
				.setTransactionType(transaction.getType()).setTransactionAmount(transaction.getAmount()).build();
	}

	private List<Transaction> fetchTransactions(int durationInDays) {
		List<Transaction> transactions = new ArrayList<>();

		// Mock data for transaction details
		Transaction transaction1 = new Transaction("1", "Deposit", 100.0f);
		Transaction transaction2 = new Transaction("2", "Withdrawal", 50.0f);
		Transaction transaction3 = new Transaction("3", "Transfer", 75.0f);
		Transaction transaction4 = new Transaction("4", "Deposit", 200.0f);
		Transaction transaction5 = new Transaction("5", "Withdrawal", 30.0f);

		transactions.addAll(Arrays.asList(transaction1, transaction2, transaction3, transaction4, transaction5));

		return transactions;

	}
}
Next create a file named application.properties where we specify the grpc port configuration-
grpc.server.port=8090

Test using BloomRPC

Start the spring boot project created. Start BloomRPC and load the proto file. This will create the gRPC client.
Bank Server Streaming gRPC BloomRPC

Spring Boot Server Streaming gRPC Client

We will be creating a maven project as follows -
Bank Server Streaming gRPC client
The pom.xml will be as follows-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javainuse</groupId>
	<artifactId>boot-server-streaming-grpc-client</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.2.0</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<protobuf.version>3.17.3</protobuf.version>
		<protobuf-plugin.version>0.6.1</protobuf-plugin.version>
		<grpc.version>1.59.0</grpc.version>
	</properties>
	<dependencies>

		<dependency>
			<groupId>net.devh</groupId>
			<artifactId>grpc-server-spring-boot-starter</artifactId>
			<version>2.15.0.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>io.grpc</groupId>
			<artifactId>grpc-stub</artifactId>
			<version>${grpc.version}</version>
		</dependency>
		<dependency>
			<groupId>io.grpc</groupId>
			<artifactId>grpc-protobuf</artifactId>
			<version>${grpc.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.tomcat</groupId>
			<artifactId>annotations-api</artifactId>
			<version>6.0.53</version>
			<scope>provided</scope>
		</dependency>

	</dependencies>

	<build>
		<extensions>
			<extension>
				<groupId>kr.motd.maven</groupId>
				<artifactId>os-maven-plugin</artifactId>
				<version>1.7.1</version>
			</extension>
		</extensions>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
			<plugin>
				<groupId>org.xolstice.maven.plugins</groupId>
				<artifactId>protobuf-maven-plugin</artifactId>
				<version>0.6.1</version>
				<configuration>
					<protocArtifact>
						com.google.protobuf:protoc:3.24.0:exe:${os.detected.classifier}
					</protocArtifact>
					<pluginId>grpc-java</pluginId>
					<pluginArtifact>
						io.grpc:protoc-gen-grpc-java:1.59.0:exe:${os.detected.classifier}
					</pluginArtifact>
					<protoSourceRoot>
						${basedir}/src/main/proto/
					</protoSourceRoot>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>compile</goal>
							<goal>compile-custom</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>
Next create the proto file named bank-service.proto in src/main/proto folder
syntax = "proto3";

package banking;

option java_multiple_files = true;
option java_package = "com.javainuse.banking";

// Message representing a client's account transaction request
message AccountRequest {
  string account_number = 1;
  int32 duration_in_days = 2;
}

// Message representing a transaction detail
message TransactionDetail {
  string transaction_id = 1;
  string transaction_type = 2;
  float transaction_amount = 3;
}

message TransactionDetailList {
  repeated TransactionDetail transaction_details = 1;
}

// Service for streaming transaction details
service TransactionService {
  // Method to stream transaction details for a given duration in days
  rpc streamTransactions(AccountRequest) returns (stream TransactionDetailList);
}
Create the spring bootstrap class as follows-
package com.javainuse.bank;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class SpringBootGrpcClientExampleApplication {

	public static void main(String[] args) throws InterruptedException {
		SpringApplication.run(SpringBootGrpcClientExampleApplication.class, args);		
	}
}
Run the maven install command which will generate the classes using the proto file.
The TransactionServiceClient class creates a client to communicate with a transaction service. It establishes a connection with the service using a host and port or an existing channel. It can stream transactions for a specified account number and duration. It also provides a method to shut down the connection.
package com.javainuse.bank.service;

import java.util.concurrent.TimeUnit;

import com.javainuse.banking.AccountRequest;
import com.javainuse.banking.TransactionDetailList;
import com.javainuse.banking.TransactionServiceGrpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

public class TransactionServiceClient {

	private final ManagedChannel channel;
	private final TransactionServiceGrpc.TransactionServiceStub asyncStub;

	public TransactionServiceClient(String host, int port) {
		this(ManagedChannelBuilder.forAddress(host, port).usePlaintext().build());
	}

	public TransactionServiceClient(ManagedChannel channel) {
		this.channel = channel;
		asyncStub = TransactionServiceGrpc.newStub(channel);
	}

	public void streamTransactions(String accountNumber, int durationInDays) {
		AccountRequest request = AccountRequest.newBuilder().setAccountNumber(accountNumber)
				.setDurationInDays(durationInDays).build();

		asyncStub.streamTransactions(request, new StreamObserver<TransactionDetailList>() {
			@Override
			public void onNext(TransactionDetailList transactionDetail) {
				// Handle each incoming TransactionDetail here
				System.out.println("Received transaction detail: " + transactionDetail);
			}

			@Override
			public void onError(Throwable throwable) {
				System.err.println("Error occurred during transaction streaming: " + throwable);
			}

			@Override
			public void onCompleted() {
				System.out.println("Transaction streaming completed");
			}
		});
	}

	public void shutdown() throws InterruptedException {
		channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
	}

}
BankService class is marked as a service. It has a method getTransactions() which connects to a TransactionServiceClient at localhost:8090. It streams transactions for account "123456789" for 30 seconds. It waits indefinitely to receive all transactions unless a timeout is added. Finally, it shuts down the client.
package com.javainuse.bank.service;

import java.util.concurrent.CountDownLatch;

import org.springframework.stereotype.Service;

@Service
public class BankService {

	public void getTransactions() throws InterruptedException {
		TransactionServiceClient client = new TransactionServiceClient("localhost", 8090);
		client.streamTransactions("123456789", 30);
		// Wait indefinitely to receive all transactions. Add a timeout if required.
		new CountDownLatch(1).await();
		client.shutdown();
	}
}
Finally modify the spring bootstrap class to call the getTransactions method of the bankService -
package com.javainuse.bank;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

import com.javainuse.bank.service.BankService;

@SpringBootApplication
public class SpringBootGrpcClientExampleApplication {

	public static void main(String[] args) throws InterruptedException {
		ApplicationContext context = SpringApplication.run(SpringBootGrpcClientExampleApplication.class, args);
		BankService bankService = context.getBean(BankService.class);
		bankService.getTransactions();
		
	}
}
Run the client to get the response.
Bank Server Streaming gRPC output


	

Download Source Code

Download it - Spring Boot + gRPC Server Streaming Client Example
Download it - Spring Boot + gRPC Server Streaming Server Example