Eclipse Vert.x RabbitMQ client gets a new consumer API!

In Eclipse Vert.x 3.6.0, the Rab­bitMQ client will get a new con­sumer API. In this post we are going to show the im­prove­ments since the pre­vi­ous API and how easy it is to use now.

Be­fore dig­ging into the new API let’s find out what were the lim­i­ta­tions of the ac­tual one:

  1. The API uses the event bus in such lim­it­ing the con­trol of the con­sumer over the Rab­bitMQ queue.
  2. The mes­sage API is based on JsonObject which does not pro­vide a typed API

The new API at a glance

Here is how sim­ple queue con­sump­tion looks like with the new API:

RabbitMQClient client = RabbitMQClient.create(vertx, new RabbitMQOptions());

client.basicConsumer("my.queue", res -> {
  if (res.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer mqConsumer = res.result();
    mqConsumer.handler((RabbitMQMessage message) -> {
        System.out.println("Got message: " + message.body().toString());
    });
  } else {
    // Oups something went wrong
    res.cause().printStackTrace();
  }
});

Now to cre­ate a queue you sim­ply call the basicConsumer method and you ob­tain asyn­chro­nously a RabbitMQConsumer.

Then you need to pro­vide a han­dler called for each mes­sage con­sumed via Rab­bit­MQ­Con­sumer#han­dler which is the id­iomatic way to con­sumer stream in Vert.x

You may also note that when we a mes­sage ar­rives, it has the type of RabbitMQMessage, this is a typed mes­sage rep­re­sen­ta­tion.

Since RabbitMQConsumer is a stream, you also al­lowed to pause and resume the stream, sub­scribe to the end event, get no­ti­fied when an ex­cep­tion oc­curs.

In ad­di­tion, you can can­cel the sub­scrip­tion by call­ing RabbitMQConsumer#cancel method.

Backpressure

Some­times you can have more in­com­ing mes­sages than you can han­dle.

The new con­sumer API al­lows you to con­trol this and lets you store ar­rived mes­sages in the in­ter­nal queue be­fore they are de­liv­ered to the ap­pli­ca­tion. In­deed, you can con­fig­ure the queue size.

Here is how you can limit the in­ter­nal queue size:


// Limit to max 300 messages
QueueOptions options = new QueueOptions()
  .setMaxInternalQueueSize(300);

RabbitMQClient client = RabbitMQClient.create(vertx, new RabbitMQOptions());

client.basicConsumer("my.queue", options, res -> {
  if (res.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer mqConsumer = res.result();
    mqConsumer.handler((RabbitMQMessage message) -> {
      System.out.println("Got message: " + message.body().toString());
    });
  } else {
    res.cause().printStackTrace();
  }
});

When the in­ten­ral queue queue ca­pac­ity is ex­ceeded, the new mes­sage will be sim­ply dropped.

An al­ter­na­tive op­tion is to drop the old­est mes­sage in the queue.

In order to achieve this, you should spec­ify the be­hav­ior by call­ing QueueOptions#setKeepMostRecent method.

Finally

The new Vert.x Rab­bitMQ client con­sumer API is way more id­iomatic and mod­ern way to con­sume mes­sages from a queue.

This API is going to pro­vided in the 3.6.0 re­lease, while the old will be dep­re­cated.

I hope you en­joyed read­ing this ar­ti­cle. See you soon on our Git­ter chan­nel!

Next post

Eclipse Vert.x goes Native

This blog post introduces native image generation of Vert.x applications with GraalVM. Generated executables can be run without a JVM.

Read more
Previous post

New community channels

In order to better support the community, we (the core team and module maintainers) now also provide help on Stack Overflow and Gitter.

Read more
Related posts

Some Rest with Vert.x

This post is part of the Introduction to Vert.x series. Let’s go a bit further this time and develop a CRUD-ish application

Read more

Real-time bidding with Websockets and Vert.x

The expectations of users for interactivity with web applications have changed over the past few years. Users during bidding in auction no longer want to press the refresh button.

Read more

Eclipse Vert.x 3.7.0 released!

We are extremely pleased to announce that the Eclipse Vert.x version 3.7.0 has been released.

Read more