Vert.x gRPC

The best description of gRPC can be seen at wikipedia.

gRPC is an open source remote procedure call (RPC) system initially developed at Google. It uses HTTP/2 for transport, Protocol Buffers as the interface description language, and provides features such as authentication, bidirectional streaming and flow control, blocking or nonblocking bindings, and cancellation and timeouts. It generates cross-platform client and server bindings for many languages.

— wikipedia
wikipedia

Vert.x gRPC is a module that will align the programming style of Google gRPC with Vert.x style. As a user of this module you will be more familiar with the code style using Vert.x Streams and Futures while benefiting from all the benefits of gRPC.

For more information related to gRPC please consult the official documentation site http://www.grpc.io/.

In addition Vert.x gRPC supports

  • gRPC service scaling with Verticles

  • non-blocking native transports

gRPC types

With gRPC you benefit from HTTP/2 which means that you will have asynchronous streaming support which means that your Remote Procedure Calls can have the following characteristics:

  • Client streams request objects while Server replies with a single response object

  • Client streams request objects while Server replies with a stream of response objects

  • Client sends a single request object while Server replies with a single response object

  • Client sends a single request object while Server replies with a stream of response objects

While to the untrained eye this might not look to different from other HTTP based RPC approaches you should be aware that with HTTP/2 your requests do not need to complete before the responses start to arrive. This means that your communication channel is full duplex. Being full duplex allows you to reduce the response latency and make more response application.

A simple Hello World

In order to start with your first hello world example, one needs to define the protocol. gRPC requires you to define this protocol using the protobuffer format.

syntax = "proto3";

option java_multiple_files = true;
option java_package = "examples";
option java_outer_classname = "HelloWorldProto";
package helloworld;

// The greeting service definition.
service Greeter {
 // Sends a greeting
 rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
 string name = 1;
}

// The response message containing the greetings
message HelloReply {
 string message = 1;
}

This is a very simple example showing the single request, single response mode.

Compile the RPC definition

Using the definition above we need to compile it.

You can compile the proto file using the protoc compiler if you like or you can integrate it in your build.

If you’re using Apache Maven you need to add the plugin:

<plugin>
 <groupId>org.xolstice.maven.plugins</groupId>
 <artifactId>protobuf-maven-plugin</artifactId>
 <version>0.6.1</version>
 <configuration>
   <protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
   <pluginId>grpc-java</pluginId>
   <pluginArtifact>io.grpc:protoc-gen-grpc-java:${vertx.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
   <protocPlugins>
     <protocPlugin>
       <id>vertx-grpc-protoc-plugin</id>
       <groupId>io.vertx</groupId>
       <artifactId>vertx-grpc-protoc-plugin</artifactId>
       <version>${stack.version}</version>
       <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
     </protocPlugin>
   </protocPlugins>
 </configuration>
 <executions>
   <execution>
     <id>compile</id>
     <configuration>
       <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
       <clearOutputDirectory>false</clearOutputDirectory>
     </configuration>
     <goals>
       <goal>compile</goal>
       <goal>compile-custom</goal>
     </goals>
   </execution>
   <execution>
     <id>test-compile</id>
     <goals>
       <goal>test-compile</goal>
       <goal>test-compile-custom</goal>
     </goals>
   </execution>
 </executions>
</plugin>

The ${os.detected.classifier} property is used to make the build OS independant, on OSX it is replaced by osx-x86_64 and so on. To use it you need to add the os-maven-plugin[https://github.com/trustin/os-maven-plugin] in the build section of your pom.xml:

<build>
 ...
 <extensions>
   <extension>
     <groupId>kr.motd.maven</groupId>
     <artifactId>os-maven-plugin</artifactId>
     <version>1.4.1.Final</version>
   </extension>
 </extensions>
 ...
</build>

This plugin will compile your proto files under src/main/proto and make them available to your project.

If you’re using Gradle you need to add the plugin:

...
apply plugin: 'com.google.protobuf'
...
buildscript {
 ...
 dependencies {
   // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
   classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
 }
}
...
protobuf {
 protoc {
   artifact = 'com.google.protobuf:protoc:3.2.0'
 }
 plugins {
   grpc {
     artifact = "io.grpc:protoc-gen-grpc-java:1.25.0"
   }
   vertx {
     artifact = "io.vertx:vertx-grpc-protoc-plugin:${vertx.grpc.version}"
   }
 }
 generateProtoTasks {
   all()*.plugins {
     grpc
     vertx
   }
 }
}

This plugin will compile your proto files under build/generated/source/proto/main and make them available to your project.

gRPC Server

Now you should have your RPC base code setup it is time to implement your server. As you should recall from above we described that our server should implement a sayHello method that receives a HelloRequest objects and returns a HelloReply object. So you can implement it as:

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(
    HelloRequest request,
    StreamObserver<HelloReply> responseObserver) {

    responseObserver.onNext(
      HelloReply.newBuilder()
        .setMessage(request.getName())
        .build());
    responseObserver.onCompleted();
  }
};

Once you’re happy with it you can then make your service available on a server. Vert.x makes the creation of a server quite simple all you need to add is:

VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(service)
  .build();

// Start is asynchronous
rpcServer.start();

Using Vert.x future and streams

The previous example was using a gRPC server processing asynchronously using gRPC asynchronous constructs such as io.grpc.stub.StreamObserver. This code is generated by the protoc compiler.

The plugin configuration above configures the following plugin:

<protocPlugin>
 <id>vertx-grpc-protoc-plugin</id>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-protoc-plugin</artifactId>
 <version>${stack.version}</version>
 <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>

This generates a service version that uses Vert.x asynchronous constructs such as Future or ReadStream or WriteStream which can be more convenient in the Vert.x ecosystem.

VertxGreeterGrpc.GreeterVertxImplBase service =
  new VertxGreeterGrpc.GreeterVertxImplBase() {
    @Override
    public Future<HelloReply> sayHello(HelloRequest request) {
      return Future.succeededFuture(
        HelloReply.newBuilder()
          .setMessage(request.getName())
          .build());
    }
  };

Server gzip compression

You can enable gzip compression to tell the server to send compressed responses (compressed requests are automatically handled by the server).

VertxGreeterGrpc.GreeterVertxImplBase service =
  new VertxGreeterGrpc.GreeterVertxImplBase() {
    @Override
    public Future<HelloReply> sayHello(HelloRequest request) {
      return Future.succeededFuture(
        HelloReply.newBuilder()
          .setMessage(request.getName())
          .build());
    }
  }
    .withCompression("gzip");

The withCompression configuration is generated by the Vert.x gRPC protoc plugin. You can also enable compression on default services by casting the ResponseObserver to ServerCallStreamObserver and call setCompression before sending the response.

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(
    HelloRequest request,
    StreamObserver<HelloReply> responseObserver) {

    ((ServerCallStreamObserver) responseObserver)
      .setCompression("gzip");

    responseObserver.onNext(
      HelloReply.newBuilder()
        .setMessage(request.getName())
        .build());

    responseObserver.onCompleted();
  }
};
Note
you can use other compressors as long as the server support them and they are registered against the compressor registry when building the ManagedChannel

SSL configuration

The previous example was simple but your RPC is not secure. In order to make it secure we should enable SSL/TLS:

VertxServerBuilder builder = VertxServerBuilder.forPort(vertx, 8080)
  .useSsl(options -> options
    .setSsl(true)
    .setUseAlpn(true)
    .setKeyStoreOptions(new JksOptions()
      .setPath("server-keystore.jks")
      .setPassword("secret")));

Congratulations you just completed your first gRPC server.

Important
since gRPC uses HTTP/2 transport, SSL/TLS setup requires the Application-Layer Protocol Negotiation in your server

Server scaling

When you deploy several instances of the same verticles, the gRPC server will be scaled on the verticle event-loops.

vertx.deployVerticle(

  // Verticle supplier - should be called 4 times
  () -> new AbstractVerticle() {

    BindableService service = new GreeterGrpc.GreeterImplBase() {
      @Override
      public void sayHello(
        HelloRequest request,
        StreamObserver<HelloReply> responseObserver) {

        responseObserver.onNext(
          HelloReply.newBuilder()
            .setMessage(request.getName())
            .build());

        responseObserver.onCompleted();
      }
    };

    @Override
    public void start() throws Exception {
      VertxServerBuilder
        .forAddress(vertx, "my.host", 8080)
        .addService(service)
        .build()
        .start();
    }
  },

  // Deploy 4 instances, i.e the service is scaled on 4 event-loops
  new DeploymentOptions()
    .setInstances(4));

BlockingServerInterceptor

gRPC ServerInterceptor is a mechanism for intercepting incoming calls before they are sent to the service. It has synchronous behavior and will be execute on the Vert.x event loop.

VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, myInterceptor))
  .build();

Suppose we have an interceptor that does something blocking the event loop:

class MyInterceptor implements ServerInterceptor {
  @Override
  public <Q, A> ServerCall.Listener<Q> interceptCall(
    ServerCall<Q, A> call, Metadata headers, ServerCallHandler<Q, A> next) {
    // do something hard and update the metadata, for example
    return next.startCall(call, headers);
  }
}
MyInterceptor myInterceptor = new MyInterceptor();

To avoid the blocking one should wrap the interceptor. Then it will be called on the Vert.x worker thread.

ServerInterceptor wrapped =
  BlockingServerInterceptor.wrap(vertx, myInterceptor);

// Create the server
VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, wrapped))
  .build();

// Start it
rpcServer.start();

Context Server Interceptor

An abstract context server interceptor is available to allow intercepting server calls and extract metadata into the vert.x context. This context does not rely on thread locals so it is safe to use on vert.x APIs. This interceptor should be the first (or one of the firsts to be added to the interceptors list).

A typical example is the use of a session id. A client can create a client interceptor that sets a session id in all connections as:

Metadata extraHeaders = new Metadata();
extraHeaders.put(
  Metadata.Key.of("sessionId", Metadata.ASCII_STRING_MARSHALLER), theSessionId);

ClientInterceptor clientInterceptor = MetadataUtils
  .newAttachHeadersInterceptor(extraHeaders);

channel = VertxChannelBuilder.forAddress(vertx, "localhost", port)
  .intercept(clientInterceptor)
  .build();

And then on the server side an interceptor can be added as:

BindableService service = new VertxGreeterGrpc.GreeterVertxImplBase() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(
      HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

ServerInterceptor contextInterceptor = new ContextServerInterceptor() {
  @Override
  public void bind(Metadata metadata, ConcurrentMap<String, String> context) {
    context.put("sessionId", metadata.get(SESSION_ID_METADATA_KEY));
  }
};

// Create the server
VertxServer rpcServer = VertxServerBuilder
  .forAddress(vertx, "my.host", 8080)
  .addService(ServerInterceptors.intercept(service, contextInterceptor))
  .build();

gRPC Client

A server without a client is of no use, so lets create a client. In order to do this some steps overlap with the server. First we need to have the RPC definition, which should already done otherwise there would be no server and the same definition should have been compiled.

Note that the compiler will always generate both the base server and a client stub so if you already compiled once you do not need to re-compile it again.

Every client stub will always require a communication channel to a server so first we need to create a gRPC channel:

ManagedChannel channel = VertxChannelBuilder
  .forAddress(vertx, "localhost", 8080)
  .usePlaintext()
  .build();

// Get a stub to use for interacting with the remote service
GreeterGrpc.GreeterStub stub = GreeterGrpc.newStub(channel);

Once the stub is created we can communicate with our server, this time it is easier since the stub already provides the correct method definition and parameter types:

HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();

// Call the remote service
stub.sayHello(request, new StreamObserver<HelloReply>() {
  private HelloReply helloReply;

  @Override
  public void onNext(HelloReply helloReply) {
    this.helloReply = helloReply;
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("Coult not reach server " + throwable.getMessage());
  }

  @Override
  public void onCompleted() {
    System.out.println("Got the server response: " + helloReply.getMessage());
  }
});

Using Vert.x future and streams

The previous example was using a gRPC client processing asynchronously using gRPC asynchronous constructs such as io.grpc.stub.StreamObserver. This code is generated by the protoc compiler.

The plugin configuration above configures the following plugin:

<protocPlugin>
 <id>vertx-grpc-protoc-plugin</id>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-protoc-plugin</artifactId>
 <version>${stack.version}</version>
 <mainClass>io.vertx.grpc.protoc.plugin.VertxGrpcGenerator</mainClass>
</protocPlugin>

This generates a client version that uses Vert.x asynchronous constructs such as Future or ReadStream or WriteStream which can be more convenient in the Vert.x ecosystem.

HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();

// Call the remote service
Future<HelloReply> future = stub.sayHello(request);

// Listen to completion events
future
  .onSuccess(helloReply -> System.out.println("Got the server response: " + helloReply.getMessage())).onFailure(err -> System.out.println("Coult not reach server " + err));

Client gzip compression

You can enable gzip compression to tell the client to send compressed messages.

GreeterGrpc.GreeterStub stub = GreeterGrpc
  .newStub(channel)
  .withCompression("gzip");
Note
you can use other compressors as long as the server support them and they are registered against the compressor registry when building the ManagedChannel

SSL configuration

If you enabled SSL previously your client will also require SSL, in order to do this we need to configure the channel:

ManagedChannel channel = VertxChannelBuilder.
  forAddress(vertx, "localhost", 8080)
  .useSsl(options -> options
    .setSsl(true)
    .setUseAlpn(true)
    .setTrustStoreOptions(new JksOptions()
      .setPath("client-truststore.jks")
      .setPassword("secret")))
  .build();
Important
since gRPC uses HTTP/2 transport, SSL/TLS setup requires the Application-Layer Protocol Negotiation in your client

Advanced configuration

Until now all gRPC examples where using sensible defaults but there is more, if you need to have full control over the server configuration you should refer to the documentation: VertxServerBuilder, or if you need to control your client channel VertxChannelBuilder. Vert.x gRPC extends the grpc-java project (Netty transport) and therefore reading its documentation is recommended.

Native transports

The client and server can be deployed with Netty’s native transports, this is achieved when creating the Vert.x instance.

Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));

Please refer Vert.x Core documentation for more information about native transports.