Building Streaming API with gRPC

Building Streaming API with gRPC
Photo by Markus Spiske / Unsplash

Building resilient real-time applications with gRPC streaming in Java: A practical guide

In this article, I’ll guide you through implementing streaming APIs using gRPC in Java. We’ll look at various types of streaming. Then, we’ll create a practical example to show real-time flow.

Introduction

Many developers know about traditional request-response APIs. Yet, modern apps often need real-time data streaming. gRPC provides powerful streaming capabilities that can transform how your services communicate. Whether you’re making chat apps, real-time analytics, or IoT systems, knowing gRPC streaming is key.

Let’s create a weather monitoring service. It will stream temperature updates to clients.

Types of gRPC streaming

Before we start coding, let’s understand the three types of streaming gRPC offers:

  • Server streaming: the server sends multiple response to a single client request.
  • Client streaming: the client sends many messages to the server.
  • Bidirectional streaming: both the server and multiple messages at the same time.

We’ll use server streaming for our weather monitoring service. This method suits our needs well. The client sends one request to start monitoring. Then, the server provides continuous temperature updates.

Implementation

Firtst, let’s define our service contract in protocol buffers:

syntax = "proto3";
package io.vrnsky.weather;

option java_multiple_files = true;
option java_package = "io.vrnsky.weather";
option java_outer_classname = "WeatherServiceProto";

service WeatherService {
  rpc monitorTemperature (Location) returns (stream Temperature) {}
}

message Location {
  double latitude = 1;
  double longitude = 2;
}

message Temperature {
  double celsius = 1;
  string timestamp = 2;
}

Now let’s implement our service

package io.vrnsky.grpc.streaming.service;

import io.grpc.stub.StreamObserver;
import io.vrnsky.weather.Location;
import io.vrnsky.weather.Temperature;
import io.vrnsky.weather.WeatherServiceGrpc;
import net.devh.boot.grpc.server.service.GrpcService;

import java.time.LocalDate;
import java.util.Random;
import java.util.concurrent.TimeUnit;

@GrpcService
public class WeatherServiceImpl extends WeatherServiceGrpc.WeatherServiceImplBase {

    private final Random random = new Random();

    @Override
    public void monitorTemparature(Location request, StreamObserver<Temperature> responseObserver) {
        try {
            for (int i = 0; i < 10; i++ ){
                Temperature temperature = Temperature.newBuilder()
                        .setCelsius(20 + random.nextDouble() * 5)
                        .setTimestamp(LocalDate.now().toString())
                        .build();

                responseObserver.onNext(temperature);

                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
            responseObserver.onError(e);
        } finally {
            responseObserver.onCompleted();
        }
    }
}

The client implementation would like this:

package io.vrnsky.grpc.streaming.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.vrnsky.weather.Location;
import io.vrnsky.weather.WeatherServiceGrpc;

public class Client {

    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress("localhost", 9090)
                .usePlaintext()
                .build();

        WeatherServiceGrpc.WeatherServiceBlockingStub stub =
                WeatherServiceGrpc.newBlockingStub(channel);

        Location location = Location.newBuilder()
                .setLatitude(37.7749)
                .setLongitude(-122.4194)
                .build();

        stub.monitorTemparature(location).forEachRemaining(temperature -> {
            System.out.printf("Temperature: %.2f°C at %s%n",
                    temperature.getCelsius(),
                    temperature.getTimestamp());
        });

        channel.shutdown();
    }
}

Error handling and best practices

When implementing streaming APIs, proper error handling is crucial. Here are some best practices.

Graceful shutdown

Always handle the onCompleted and onError callback properly:

try {
    for (int i = 0; i < 10; i++) {
        Temperature temperature = Temperature.newBuilder()
            .setCelsius(20 + random.nextDouble() * 5)
            .setTimestamp(LocalDateTime.now().toString())
            .build();
        
        responseObserver.onNext(temperature);
        TimeUnit.SECONDS.sleep(1);
    }
} catch (Exception e) {
    responseObserver.onError(
        Status.INTERNAL
            .withDescription("Internal error occurred")
            .withCause(e)
            .asException()
    );
} finally {
    responseObserver.onCompleted();
}

Flow control

Implement proper backpressure handling to prevent overhelming clients.

public void monitorTemperature(Location request, 
                             StreamObserver<Temperature> responseObserver) {
    Context.current().addListener(context -> {
    }, MoreExecutors.directExecutor());

    RateLimiter rateLimiter = RateLimiter.create(1.0); 

    try {
        while (!Context.current().isCancelled()) {
            rateLimiter.acquire();
            
            Temperature temperature = Temperature.newBuilder()
                .setCelsius(20 + random.nextDouble() * 5)
                .setTimestamp(LocalDateTime.now().toString())
                .build();
            
            responseObserver.onNext(temperature);
        }
    } catch (Exception e) {
        responseObserver.onError(e);
    }
}

Resources management

Always clean up resources when the stream ends:

private final ConcurrentHashMap<String, StreamObserver<?>> activeStreams = 
    new ConcurrentHashMap<>();

public void monitorTemperature(Location request, 
                             StreamObserver<Temperature> responseObserver) {
    String streamId = UUID.randomUUID().toString();
    activeStreams.put(streamId, responseObserver);
    
    try {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.scheduleAtFixedRate(() -> {
            try {
                Temperature temperature = Temperature.newBuilder()
                    .setCelsius(20 + random.nextDouble() * 5)
                    .setTimestamp(LocalDateTime.now().toString())
                    .build();
                responseObserver.onNext(temperature);
            } catch (Exception e) {
                executor.shutdown();
                responseObserver.onError(e);
            }
        }, 0, 1, TimeUnit.SECONDS);
        
        Thread.sleep(10000);
        executor.shutdown();
    } finally {
        activeStreams.remove(streamId);
    }
}

Testing streaming API

Here is how to test your streaming implementation.

@Test
public void testTemperatureMonitoring() {
    Location location = Location.newBuilder()
        .setLatitude(37.7749)
        .setLongitude(-122.4194)
        .build();

    List<Temperature> receivedTemperatures = new ArrayList<>();
    CountDownLatch latch = new CountDownLatch(1);

    StreamObserver<Temperature> responseObserver = new StreamObserver<>() {
        @Override
        public void onNext(Temperature temperature) {
            receivedTemperatures.add(temperature);
        }

        @Override
        public void onError(Throwable t) {
            latch.countDown();
        }

        @Override
        public void onCompleted() {
            latch.countDown();
        }
    };

    weatherService.monitorTemperature(location, responseObserver);
    
    assertTrue(latch.await(11, TimeUnit.SECONDS));
    assertEquals(10, receivedTemperatures.size());
}

Conclusion

gRPC streaming provides a powerful way to put in place real — time communication between services. We’ve looked at server side streaming using a weather monitoring example. The same ideas work for client — side and bidirectional streaming, too.

The key advantages of gRPC streaming include:

  • Efficient bi — directional communication.
  • Strong typing with protocol buffers.
  • Built — in flow control and backpressure.
  • Excellent language support.

Alwyas handle error properly. Manage resource well. Test thoroughly when you build streaming APIs for production.

References

  1. gRPC Documentation
  2. Spring Boot gRPC Starter
  3. Protocol Buffers Language Guide

Subscribe to Egor Voronianskii | Java Development and whatsoever

Sign up now to get access to the library of members-only issues.
Jamie Larson
Subscribe