Vert.x gRPC
The best description of gRPC can be seen at wikipedia.
gRPC is an open source remote procedure call (RPC) system initially developed at Google. It uses HTTP/2 for transport, Protocol Buffers as the interface description language, and provides features such as authentication, bidirectional streaming and flow control, blocking or nonblocking bindings, and cancellation and timeouts. It generates cross-platform client and server bindings for many languages.
wikipedia
Vert.x gRPC is a module that will align the programming style of Google gRPC with Vert.x style. As a user of this module you will be more familiar with the code style using Vert.x Streams and Futures while benefiting from all the benefits of gRPC.
For more information related to gRPC please consult the official documentation site http://www.grpc.io/.
In addition Vert.x gRPC supports
-
gRPC service scaling with Verticles
-
non-blocking native transports
gRPC types
With gRPC you benefit from HTTP/2 which means that you will have asynchronous streaming support which means that your Remote Procedure Calls can have the following characteristics:
-
Client streams request objects while Server replies with a single response object
-
Client streams request objects while Server replies with a stream of response objects
-
Client sends a single request object while Server replies with a single response object
-
Client sends a single request object while Server replies with a stream of response objects
While to the untrained eye this might not look to different from other HTTP based RPC approaches you should be aware that with HTTP/2 your requests do not need to complete before the responses start to arrive. This means that your communication channel is full duplex. Being full duplex allows you to reduce the response latency and make more response application.
A simple Hello World
In order to start with your first hello world example, one needs to define the protocol. gRPC requires you to define
this protocol using the protobuffer
format.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
This is a very simple example showing the single request, single response mode.
Compile the RPC definition
Using the definition above we need to compile it.
You can compile the proto file using the protoc
compiler if you like
or you can integrate it in your build.
If you’re using Apache Maven you need to add the plugin:
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${vertx.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>vertx-grpc-protoc-plugin</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
<executions>
<execution>
<id>compile</id>
<configuration>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
The ${os.detected.classifier}
property is used to make the build OS independant, on OSX it is replaced
by osx-x86_64 and so on. To use it you need to add the os-maven-plugin[https://github.com/trustin/os-maven-plugin]
in the build
section of your pom.xml
:
<build>
...
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
...
</build>
This plugin will compile your proto files under src/main/proto
and make them available to your project.
If you’re using Gradle you need to add the plugin:
...
apply plugin: 'com.google.protobuf'
...
buildscript {
...
dependencies {
// ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
}
}
...
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.2.0'
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
}
vertx {
artifact = "io.vertx:vertx-grpc-protoc-plugin:${vertx.grpc.version}"
}
}
generateProtoTasks {
all()*.plugins {
grpc
vertx
}
}
}
This plugin will compile your proto files under build/generated/source/proto/main
and make them available to your project.
gRPC Server
Now you should have your RPC base code setup it is time to implement your server. As you should recall from above we
described that our server should implement a sayHello
method that receives a HelloRequest
objects and returns a
HelloReply
object. So you can implement it as:
GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request,
StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
};
Once you’re happy with it you can then make your service available on a server. Vert.x makes the creation of a server quite simple all you need to add is:
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(service)
.build();
// Start is asynchronous
rpcServer.start();
Using Vert.x future and streams
The previous example was using a gRPC server processing asynchronously using gRPC asynchronous constructs such
as io.grpc.stub.StreamObserver
. This code is generated by the protoc compiler.
The plugin configuration above configures the following plugin:
<protocPlugin>
<id>vertx-grpc-protoc-plugin</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
This generates a service version that uses Vert.x asynchronous constructs such as Future
or ReadStream
or WriteStream
which can be more convenient in the Vert.x ecosystem.
VertxGreeterGrpc.GreeterVertxImplBase service =
new VertxGreeterGrpc.GreeterVertxImplBase() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
}
};
Server gzip compression
You can enable gzip compression to tell the server to send compressed responses (compressed requests are automatically handled by the server).
VertxGreeterGrpc.GreeterVertxImplBase service =
new VertxGreeterGrpc.GreeterVertxImplBase() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
}
}
.withCompression("gzip");
The withCompression
configuration is generated by the Vert.x gRPC protoc plugin. You can also enable compression
on default services by casting the ResponseObserver
to ServerCallStreamObserver
and call setCompression
before
sending the response.
GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request,
StreamObserver<HelloReply> responseObserver) {
((ServerCallStreamObserver) responseObserver)
.setCompression("gzip");
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
};
Note
|
you can use other compressors as long as the server support them and they are registered against the compressor
registry when building the ManagedChannel
|
SSL configuration
The previous example was simple but your RPC is not secure. In order to make it secure we should enable SSL/TLS:
VertxServerBuilder builder = VertxServerBuilder.forPort(vertx, 8080)
.useSsl(options -> options
.setSsl(true)
.setUseAlpn(true)
.setKeyStoreOptions(new JksOptions()
.setPath("server-keystore.jks")
.setPassword("secret")));
Congratulations you just completed your first gRPC server.
Important
|
since gRPC uses HTTP/2 transport, SSL/TLS setup requires the Application-Layer Protocol Negotiation in your server |
Server scaling
When you deploy several instances of the same verticles, the gRPC server will be scaled on the verticle event-loops.
vertx.deployVerticle(
// Verticle supplier - should be called 4 times
() -> new AbstractVerticle() {
BindableService service = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request,
StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(
HelloReply.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
};
@Override
public void start() throws Exception {
VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(service)
.build()
.start();
}
},
// Deploy 4 instances, i.e the service is scaled on 4 event-loops
new DeploymentOptions()
.setInstances(4));
BlockingServerInterceptor
gRPC ServerInterceptor is a mechanism for intercepting incoming calls before they are sent to the service. It has synchronous behavior and will be execute on the Vert.x event loop.
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(ServerInterceptors.intercept(service, myInterceptor))
.build();
Suppose we have an interceptor that does something blocking the event loop:
class MyInterceptor implements ServerInterceptor {
@Override
public <Q, A> ServerCall.Listener<Q> interceptCall(
ServerCall<Q, A> call, Metadata headers, ServerCallHandler<Q, A> next) {
// do something hard and update the metadata, for example
return next.startCall(call, headers);
}
}
MyInterceptor myInterceptor = new MyInterceptor();
To avoid the blocking one should wrap the interceptor. Then it will be called on the Vert.x worker thread.
ServerInterceptor wrapped =
BlockingServerInterceptor.wrap(vertx, myInterceptor);
// Create the server
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(ServerInterceptors.intercept(service, wrapped))
.build();
// Start it
rpcServer.start();
Context Server Interceptor
An abstract context server interceptor is available to allow intercepting server calls and extract metadata into the vert.x context. This context does not rely on thread locals so it is safe to use on vert.x APIs. This interceptor should be the first (or one of the firsts to be added to the interceptors list).
A typical example is the use of a session id. A client can create a client interceptor that sets a session id in all connections as:
Metadata extraHeaders = new Metadata();
extraHeaders.put(
Metadata.Key.of("sessionId", Metadata.ASCII_STRING_MARSHALLER), theSessionId);
ClientInterceptor clientInterceptor = MetadataUtils
.newAttachHeadersInterceptor(extraHeaders);
channel = VertxChannelBuilder.forAddress(vertx, "localhost", port)
.intercept(clientInterceptor)
.build();
And then on the server side an interceptor can be added as:
BindableService service = new VertxGreeterGrpc.GreeterVertxImplBase() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(
HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
ServerInterceptor contextInterceptor = new ContextServerInterceptor() {
@Override
public void bind(Metadata metadata, ConcurrentMap<String, String> context) {
context.put("sessionId", metadata.get(SESSION_ID_METADATA_KEY));
}
};
// Create the server
VertxServer rpcServer = VertxServerBuilder
.forAddress(vertx, "my.host", 8080)
.addService(ServerInterceptors.intercept(service, contextInterceptor))
.build();
gRPC Client
A server without a client is of no use, so lets create a client. In order to do this some steps overlap with the server. First we need to have the RPC definition, which should already done otherwise there would be no server and the same definition should have been compiled.
Note that the compiler will always generate both the base server and a client stub so if you already compiled once you do not need to re-compile it again.
Every client stub will always require a communication channel to a server so first we need to create a gRPC channel:
ManagedChannel channel = VertxChannelBuilder
.forAddress(vertx, "localhost", 8080)
.usePlaintext()
.build();
// Get a stub to use for interacting with the remote service
GreeterGrpc.GreeterStub stub = GreeterGrpc.newStub(channel);
Once the stub is created we can communicate with our server, this time it is easier since the stub already provides the correct method definition and parameter types:
HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();
// Call the remote service
stub.sayHello(request, new StreamObserver<HelloReply>() {
private HelloReply helloReply;
@Override
public void onNext(HelloReply helloReply) {
this.helloReply = helloReply;
}
@Override
public void onError(Throwable throwable) {
System.out.println("Coult not reach server " + throwable.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Got the server response: " + helloReply.getMessage());
}
});
Using Vert.x future and streams
The previous example was using a gRPC client processing asynchronously using gRPC asynchronous constructs such
as io.grpc.stub.StreamObserver
. This code is generated by the protoc compiler.
The plugin configuration above configures the following plugin:
<protocPlugin>
<id>vertx-grpc-protoc-plugin</id>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-protoc-plugin</artifactId>
<version>${stack.version}</version>
<mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>
This generates a client version that uses Vert.x asynchronous constructs such as Future
or ReadStream
or WriteStream
which can be more convenient in the Vert.x ecosystem.
HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();
// Call the remote service
Future<HelloReply> future = stub.sayHello(request);
// Listen to completion events
future
.onSuccess(helloReply -> System.out.println("Got the server response: " + helloReply.getMessage())).onFailure(err -> System.out.println("Coult not reach server " + err));
Client gzip compression
You can enable gzip compression to tell the client to send compressed messages.
GreeterGrpc.GreeterStub stub = GreeterGrpc
.newStub(channel)
.withCompression("gzip");
Note
|
you can use other compressors as long as the server support them and they are registered against the compressor
registry when building the ManagedChannel
|
SSL configuration
If you enabled SSL previously your client will also require SSL, in order to do this we need to configure the channel:
ManagedChannel channel = VertxChannelBuilder.
forAddress(vertx, "localhost", 8080)
.useSsl(options -> options
.setSsl(true)
.setUseAlpn(true)
.setTrustStoreOptions(new JksOptions()
.setPath("client-truststore.jks")
.setPassword("secret")))
.build();
Important
|
since gRPC uses HTTP/2 transport, SSL/TLS setup requires the Application-Layer Protocol Negotiation in your client |
Advanced configuration
Until now all gRPC examples where using sensible defaults but there is more, if you need to have full control over
the server configuration you should refer to the documentation: VertxServerBuilder
, or if you
need to control your client channel VertxChannelBuilder
. Vert.x gRPC extends the grpc-java
project (Netty transport) and therefore reading its documentation is
recommended.