Eclipse Vert.x RabbitMQ client gets a new consumer API!
In Eclipse Vert.x 3.6.0, the RabbitMQ client will get a new consumer API. In this post we are going to show the improvements since the previous API and how easy it is to use now.
Before digging into the new API let’s find out what were the limitations of the actual one:
- The API uses the event bus in such limiting the control of the consumer over the RabbitMQ queue.
- The message API is based on
JsonObject
which does not provide a typed API
The new API at a glance
Here is how simple queue consumption looks like with the new API:
RabbitMQClient client = RabbitMQClient.create(vertx, new RabbitMQOptions());
client.basicConsumer("my.queue", res -> {
if (res.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer mqConsumer = res.result();
mqConsumer.handler((RabbitMQMessage message) -> {
System.out.println("Got message: " + message.body().toString());
});
} else {
// Oups something went wrong
res.cause().printStackTrace();
}
});
Now to create a queue you simply call the basicConsumer
method and you obtain asynchronously
a RabbitMQConsumer
.
Then you need to provide a handler called for each message consumed via RabbitMQConsumer#handler which is the idiomatic way to consumer stream in Vert.x
You may also note that when we a message arrives, it has the type of RabbitMQMessage
, this is a typed
message representation.
Since RabbitMQConsumer
is a stream, you also allowed to pause
and resume
the stream, subscribe to the
end event, get notified when an exception occurs.
In addition, you can cancel the subscription by calling RabbitMQConsumer#cancel
method.
Backpressure
Sometimes you can have more incoming messages than you can handle.
The new consumer API allows you to control this and lets you store arrived messages in the internal queue before they are delivered to the application. Indeed, you can configure the queue size.
Here is how you can limit the internal queue size:
// Limit to max 300 messages
QueueOptions options = new QueueOptions()
.setMaxInternalQueueSize(300);
RabbitMQClient client = RabbitMQClient.create(vertx, new RabbitMQOptions());
client.basicConsumer("my.queue", options, res -> {
if (res.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer mqConsumer = res.result();
mqConsumer.handler((RabbitMQMessage message) -> {
System.out.println("Got message: " + message.body().toString());
});
} else {
res.cause().printStackTrace();
}
});
When the intenral queue queue capacity is exceeded, the new message will be simply dropped.
An alternative option is to drop the oldest message in the queue.
In order to achieve this, you should specify the behavior by calling QueueOptions#setKeepMostRecent
method.
Finally
The new Vert.x RabbitMQ client consumer API is way more idiomatic and modern way to consume messages from a queue.
This API is going to provided in the 3.6.0 release, while the old will be deprecated.
I hope you enjoyed reading this article. See you soon on our Gitter channel!