Vert.x AMQP Client
The Vert.x AMQP Client allows interacting with AMQP 1.0 brokers and routers. It allows:
-
Connecting to an AMQP broker or router - SASL and TLS connections are supported
-
Consuming message from a queue or a topic
-
Sending messages to a queue or a topic
-
Checking acknowledgement for sent messages
The AMQP 1.0 protocol support durable subscriptions, persistence, security, conversations, sophisticated routing… More details on the protocol can be found on the AMQP homepage.
The Vert.x AMQP client is based on Vert.x Proton. If you need fine-grain control, we recommend using Vert.x Proton directly.
Using Vert.x AMQP Client
To use the Vert.x AMQP Client, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<version>3.9.1</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile 'io.vertx:vertx-amqp-client:3.9.1'
Creating an AMQP client
Once you have added the client to your CLASSPATH, you can instantiate an AmqpClient
as
follows:
AmqpClientOptions options = new AmqpClientOptions()
.setHost("localhost")
.setPort(5672)
.setUsername("user")
.setPassword("secret");
// Create a client using its own internal Vert.x instance.
AmqpClient client1 = AmqpClient.create(options);
// USe an explicit Vert.x instance.
AmqpClient client2 = AmqpClient.create(vertx, options);
There are two methods to instantiate an AmqpClient
. You can pass an explicit Vert.x instance.
Use this approach if you are in a Vert.x application, or a Vert.x verticle. Otherwise you can omit passing the Vert.x
instance, an internal instance is created and closed when the client is closed.
To instantiate an AmqpClient
, you need to pass AmqpClientOptions
.
These options contains the location of the broker or router, credentials… Many aspect of the AMQP client can be
configured using these options. Note that you can also use these options to configure the underlying Proton client.
Host, port, username and password can also be configured from system properties or environment variables:
-
Host: system property:
amqp-client-host
, environment variable: AMQP_CLIENT_HOST` (mandatory) -
Port: system property:
amqp-client-port
, environment variable: AMQP_CLIENT_PORT` (defaults to 5672) -
Username: system property:
amqp-client-username
, environment variable: AMQP_CLIENT_USERNAME` -
Password: system property:
amqp-client-password
, environment variable: AMQP_CLIENT_PASSWORD`
Establishing a connection
Once you have created a client, you need to explicitly connect to the remote server. This is done using the connect
method:
client.connect(ar -> {
if (ar.failed()) {
System.out.println("Unable to connect to the broker");
} else {
System.out.println("Connection succeeded");
AmqpConnection connection = ar.result();
}
});
Once established or failed, the handler is called. Note that the connection is used to create receivers and senders.
Creating a receiver
A receiver is used to receive messages. The AMQP receiver can be retrieved using one of the two following methods:
connection.createReceiver("my-queue",
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
// called on every received messages
System.out.println("Received " + msg.bodyAsString());
});
}
}
);
connection.createReceiver("my-queue",
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver
.exceptionHandler(t -> {
// Error thrown.
})
.handler(msg -> {
// Attach the message handler
});
}
}
);
The main difference between these 2 approaches is when the message handler is attached to the receiver. In the first approach, the handler is immediately passed and will start receiving messages immediately. In the second approach, the handler is attached manually after the completion. This give you more control and let you attach other handlers.
The receiver passed in the completion handler can be used as a stream. So, you can pause and resume the reception of messages. The back-pressure protocol is implemented using AMQP credits.
The received messages are instances of AmqpMessage
. Instances are immutable, and provide
access to most of the metadata supported by AMQP. See the list of
properties as references. Note
that retrieving a JSON object or a JSON array from the body required the value to be passed as AMQP Data.
You can also create a receiver directly from the client:
client.createReceiver("my-queue"
,
done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
// called on every received messages
System.out.println("Received " + msg.bodyAsString());
});
}
}
);
In this case, a connection is established automatically. You can retrieve it using
connection
By default the messages are automatically acknowledged. You can disable this behavior using
setAutoAcknowledgement
. Then, you need to explicitly acknowledge
the incoming messages using:
* accepted
* rejected
* released
Creating a sender
Senders allows publishing messages to queues and topics. You retrieve a sender as follows:
connection.createSender("my-queue", done -> {
if (done.failed()) {
System.out.println("Unable to create a sender");
} else {
AmqpSender result = done.result();
}
});
Once you have retrieved an AMQP sender, you can create messages. Because AmqpMessage
are
immutable, the creation uses the AmqpMessageBuilder
builder class. The following snippet
provides a few examples:
AmqpMessageBuilder builder = AmqpMessage.create();
// Very simple message
AmqpMessage m1 = builder.withBody("hello").build();
// Message overriding the destination
AmqpMessage m2 = builder.withBody("hello").address("another-queue").build();
// Message with a JSON object as body, metadata and TTL
AmqpMessage m3 = builder
.withJsonObjectAsBody(new JsonObject().put("message", "hello"))
.subject("subject")
.ttl(10000)
.applicationProperties(new JsonObject().put("prop1", "value1"))
.build();
Once you have the sender and created the message, you can send it using:
-
send
- send the message -
sendWithAck
- send the message and monitor its acknowledgment
The simplest way to send a message is the following:
sender.send(AmqpMessage.create().withBody("hello").build());
When sending a message, you can monitor the acknowledgment:
sender.sendWithAck(AmqpMessage.create().withBody("hello").build(), acked -> {
if (acked.succeeded()) {
System.out.println("Message accepted");
} else {
System.out.println("Message not accepted");
}
});
Note that message is considered as acknowledged if the delivery is set fo ACCEPTED
. Other delivery values are considered
as non-acknowledged (details can be found in the passed cause).
The AmqpSender
can be used as a write stream. The flow control is implemented using AMQP credits.
You can also create a sender directly from the client:
client.createSender("my-queue", maybeSender -> {
//...
});
In this case, a connection is established automatically. You can retrieve it using
connection
.
Implementing request-reply
To implement a request-reply behavior, you could use a dynamic receiver and an anonymous sender. A dynamic receiver is not associated with an address by the user, but the address it provided by the broker. Anonymous senders are also not associated to a specific address, requiring all messages to contain an address.
The following snippet shows how request-reply can be implemented:
connection.createAnonymousSender(responseSender -> {
// You got an anonymous sender, used to send the reply
// Now register the main receiver:
connection.createReceiver("my-queue", done -> {
if (done.failed()) {
System.out.println("Unable to create receiver");
} else {
AmqpReceiver receiver = done.result();
receiver.handler(msg -> {
// You got the message, let's reply.
responseSender.result().send(AmqpMessage.create()
.address(msg.replyTo())
.correlationId(msg.id()) // send the message id as correlation id
.withBody("my response to your request")
.build()
);
});
}
});
});
// On the sender side (sending the initial request and expecting a reply)
connection.createDynamicReceiver(replyReceiver -> {
// We got a receiver, the address is provided by the broker
String replyToAddress = replyReceiver.result().address();
// Attach the handler receiving the reply
replyReceiver.result().handler(msg -> {
System.out.println("Got the reply! " + msg.bodyAsString());
});
// Create a sender and send the message:
connection.createSender("my-queue", sender -> {
sender.result().send(AmqpMessage.create()
.replyTo(replyToAddress)
.id("my-message-id")
.withBody("This is my request").build());
});
});
To reply to a message, send it to the address specified into the reply-to
. Also, it’s a good practice to indicate the
correlation id
using the message id
, so the reply receiver can associate the response to the request.
Closing the client
Once you are done with a connection receiver or sender, you should close them using the close
method. Closing a
connection, closes all created receivers and senders.
Once the client is not used anymore, you must also close it. It would close all opened connections, and as a consequences receivers and senders.