Building Streaming API with gRPC
Building resilient real-time applications with gRPC streaming in Java: A practical guide
In this article, I’ll guide you through implementing streaming APIs using gRPC in Java. We’ll look at various types of streaming. Then, we’ll create a practical example to show real-time flow.
Introduction
Many developers know about traditional request-response APIs. Yet, modern apps often need real-time data streaming. gRPC provides powerful streaming capabilities that can transform how your services communicate. Whether you’re making chat apps, real-time analytics, or IoT systems, knowing gRPC streaming is key.
Let’s create a weather monitoring service. It will stream temperature updates to clients.
Types of gRPC streaming
Before we start coding, let’s understand the three types of streaming gRPC offers:
- Server streaming: the server sends multiple response to a single client request.
- Client streaming: the client sends many messages to the server.
- Bidirectional streaming: both the server and multiple messages at the same time.
We’ll use server streaming for our weather monitoring service. This method suits our needs well. The client sends one request to start monitoring. Then, the server provides continuous temperature updates.
Implementation
Firtst, let’s define our service contract in protocol buffers:
syntax = "proto3";
package io.vrnsky.weather;
option java_multiple_files = true;
option java_package = "io.vrnsky.weather";
option java_outer_classname = "WeatherServiceProto";
service WeatherService {
rpc monitorTemperature (Location) returns (stream Temperature) {}
}
message Location {
double latitude = 1;
double longitude = 2;
}
message Temperature {
double celsius = 1;
string timestamp = 2;
}
Now let’s implement our service
package io.vrnsky.grpc.streaming.service;
import io.grpc.stub.StreamObserver;
import io.vrnsky.weather.Location;
import io.vrnsky.weather.Temperature;
import io.vrnsky.weather.WeatherServiceGrpc;
import net.devh.boot.grpc.server.service.GrpcService;
import java.time.LocalDate;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@GrpcService
public class WeatherServiceImpl extends WeatherServiceGrpc.WeatherServiceImplBase {
private final Random random = new Random();
@Override
public void monitorTemparature(Location request, StreamObserver<Temperature> responseObserver) {
try {
for (int i = 0; i < 10; i++ ){
Temperature temperature = Temperature.newBuilder()
.setCelsius(20 + random.nextDouble() * 5)
.setTimestamp(LocalDate.now().toString())
.build();
responseObserver.onNext(temperature);
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
responseObserver.onError(e);
} finally {
responseObserver.onCompleted();
}
}
}
The client implementation would like this:
package io.vrnsky.grpc.streaming.client;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.vrnsky.weather.Location;
import io.vrnsky.weather.WeatherServiceGrpc;
public class Client {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", 9090)
.usePlaintext()
.build();
WeatherServiceGrpc.WeatherServiceBlockingStub stub =
WeatherServiceGrpc.newBlockingStub(channel);
Location location = Location.newBuilder()
.setLatitude(37.7749)
.setLongitude(-122.4194)
.build();
stub.monitorTemparature(location).forEachRemaining(temperature -> {
System.out.printf("Temperature: %.2f°C at %s%n",
temperature.getCelsius(),
temperature.getTimestamp());
});
channel.shutdown();
}
}
Error handling and best practices
When implementing streaming APIs, proper error handling is crucial. Here are some best practices.
Graceful shutdown
Always handle the onCompleted
and onError
callback properly:
try {
for (int i = 0; i < 10; i++) {
Temperature temperature = Temperature.newBuilder()
.setCelsius(20 + random.nextDouble() * 5)
.setTimestamp(LocalDateTime.now().toString())
.build();
responseObserver.onNext(temperature);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
responseObserver.onError(
Status.INTERNAL
.withDescription("Internal error occurred")
.withCause(e)
.asException()
);
} finally {
responseObserver.onCompleted();
}
Flow control
Implement proper backpressure handling to prevent overhelming clients.
public void monitorTemperature(Location request,
StreamObserver<Temperature> responseObserver) {
Context.current().addListener(context -> {
}, MoreExecutors.directExecutor());
RateLimiter rateLimiter = RateLimiter.create(1.0);
try {
while (!Context.current().isCancelled()) {
rateLimiter.acquire();
Temperature temperature = Temperature.newBuilder()
.setCelsius(20 + random.nextDouble() * 5)
.setTimestamp(LocalDateTime.now().toString())
.build();
responseObserver.onNext(temperature);
}
} catch (Exception e) {
responseObserver.onError(e);
}
}
Resources management
Always clean up resources when the stream ends:
private final ConcurrentHashMap<String, StreamObserver<?>> activeStreams =
new ConcurrentHashMap<>();
public void monitorTemperature(Location request,
StreamObserver<Temperature> responseObserver) {
String streamId = UUID.randomUUID().toString();
activeStreams.put(streamId, responseObserver);
try {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
try {
Temperature temperature = Temperature.newBuilder()
.setCelsius(20 + random.nextDouble() * 5)
.setTimestamp(LocalDateTime.now().toString())
.build();
responseObserver.onNext(temperature);
} catch (Exception e) {
executor.shutdown();
responseObserver.onError(e);
}
}, 0, 1, TimeUnit.SECONDS);
Thread.sleep(10000);
executor.shutdown();
} finally {
activeStreams.remove(streamId);
}
}
Testing streaming API
Here is how to test your streaming implementation.
@Test
public void testTemperatureMonitoring() {
Location location = Location.newBuilder()
.setLatitude(37.7749)
.setLongitude(-122.4194)
.build();
List<Temperature> receivedTemperatures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<Temperature> responseObserver = new StreamObserver<>() {
@Override
public void onNext(Temperature temperature) {
receivedTemperatures.add(temperature);
}
@Override
public void onError(Throwable t) {
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
};
weatherService.monitorTemperature(location, responseObserver);
assertTrue(latch.await(11, TimeUnit.SECONDS));
assertEquals(10, receivedTemperatures.size());
}
Conclusion
gRPC streaming provides a powerful way to put in place real — time communication between services. We’ve looked at server side streaming using a weather monitoring example. The same ideas work for client — side and bidirectional streaming, too.
The key advantages of gRPC streaming include:
- Efficient bi — directional communication.
- Strong typing with protocol buffers.
- Built — in flow control and backpressure.
- Excellent language support.
Alwyas handle error properly. Manage resource well. Test thoroughly when you build streaming APIs for production.