added docs for AMQP::Reliable and AMQP::Throttle
This commit is contained in:
parent
110706771a
commit
9df895143f
145
README.md
145
README.md
|
|
@ -1064,38 +1064,159 @@ and consuming.
|
||||||
PUBLISHER CONFIRMS
|
PUBLISHER CONFIRMS
|
||||||
===================
|
===================
|
||||||
|
|
||||||
RabbitMQ supports lightweight method of confirming that broker received and processed
|
RabbitMQ supports a lightweight method of confirming that broker received
|
||||||
a message. For this method to work, the channel needs to be put in so-called _confirm mode_.
|
and processed a message. When you enable this, RabbitMQ sends back an
|
||||||
This is done using confirmSelect() method. When channel is successfully put in
|
'ack' for each publish-operation that has been handled. For this method
|
||||||
confirm mode, the server and client count messages (starting from 1) and server sends
|
to work, the channel needs to be put in _confirm mode_. This is done using
|
||||||
acknowledgments for every message it processed (it can also acknowledge multiple message at
|
confirmSelect() method. When the channel is successfully put in confirm mode,
|
||||||
once).
|
the server starts counting the received messages (starting from 1) and sends
|
||||||
|
acknowledgments for every message it processed (it can also acknowledge
|
||||||
|
multiple message at once).
|
||||||
|
|
||||||
If server is unable to process a message, it will send send negative acknowledgments. Both
|
If server is unable to process a message, it will send send negative
|
||||||
positive and negative acknowledgments handling are implemented as callbacks for Channel object.
|
acknowledgments. Both positive and negative acknowledgments handling are
|
||||||
|
implemented as callbacks for that should be installed on the object that
|
||||||
|
is returned by the confirmSelect() method:
|
||||||
|
|
||||||
````c++
|
````c++
|
||||||
// setup confirm mode and ack/nack callbacks
|
// setup confirm mode and ack/nack callbacks (from this moment onwards
|
||||||
|
// ack/nack confirmations are coming in)
|
||||||
channel.confirmSelect().onSuccess([&]() {
|
channel.confirmSelect().onSuccess([&]() {
|
||||||
// from this moment onwards ack/nack confirmations are coming in
|
|
||||||
|
|
||||||
|
// publish the first message (this will be acked/nacked with deliveryTag=1)
|
||||||
channel.publish("my-exchange", "my-key", "my first message");
|
channel.publish("my-exchange", "my-key", "my first message");
|
||||||
// message counter is now 1, will call onAck/onNack with deliveryTag=1
|
|
||||||
|
|
||||||
|
// publish the second message (this will be acked/nacked with deliveryTag=2)
|
||||||
channel.publish("my-exchange", "my-key", "my second message");
|
channel.publish("my-exchange", "my-key", "my second message");
|
||||||
// message counter is now 2, will call onAck/onNack with deliveryTag=2
|
|
||||||
|
|
||||||
}).onAck([&](uint64_t deliveryTag, bool multiple) {
|
}).onAck([&](uint64_t deliveryTag, bool multiple) {
|
||||||
|
|
||||||
// deliveryTag is message number
|
// deliveryTag is message number
|
||||||
// multiple is set to true, if all messages UP TO deliveryTag have been processed
|
// multiple is set to true, if all messages UP TO deliveryTag have been processed
|
||||||
|
|
||||||
}).onNack([&](uint64 deliveryTag, bool multiple, bool requeue) {
|
}).onNack([&](uint64 deliveryTag, bool multiple, bool requeue) {
|
||||||
|
|
||||||
// deliveryTag is message number
|
// deliveryTag is message number
|
||||||
// multiple is set to true, if all messages UP TO deliveryTag have not been processed
|
// multiple is set to true, if all messages UP TO deliveryTag have not been processed
|
||||||
// requeue is to be ignored
|
// requeue is to be ignored
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
````
|
````
|
||||||
|
|
||||||
|
If you use this feature, you will have to implement your own bookkeeping to
|
||||||
|
track which messages have already been acked/nacked, and which messages
|
||||||
|
are still being handled. For your convenience however, the AMQP-CPP library
|
||||||
|
comes with a number of helper classes that can take over this responsibility.
|
||||||
|
|
||||||
|
The AMQP::Reliable class is an optional wrapper around channels. When you use
|
||||||
|
it, your underlying channel is automatically put it _confirm method_, and all publish
|
||||||
|
operations are individually acknowledged:
|
||||||
|
|
||||||
|
````c++
|
||||||
|
// create a channel
|
||||||
|
AMQP::TcpChannel mychannel(connection);
|
||||||
|
|
||||||
|
// wrap the channel into a reliable-object so that publish-opertions are
|
||||||
|
// individually confirmed (after wrapping the channel, it is recommended
|
||||||
|
// to no longer make direct calls to the channel)
|
||||||
|
AMQP::Reliable reliable(mychannel);
|
||||||
|
|
||||||
|
// publish a message via the reliable-channel
|
||||||
|
reliable.publish("my-exchange", "my-key", "my first message").onAck([]() {
|
||||||
|
|
||||||
|
// the message has been acknowledged by RabbitMQ (in your application
|
||||||
|
// code you can now safely discard the message as it has been picked up)
|
||||||
|
|
||||||
|
}).onNack([]() {
|
||||||
|
|
||||||
|
// the message has _explicitly_ been nack'ed by RabbitMQ (in your application
|
||||||
|
// code you probably want to log handle this to avoid data-loss)
|
||||||
|
|
||||||
|
}).onError([](const char *message) {
|
||||||
|
|
||||||
|
// a channel-error occured before any ack or nack was received, and the
|
||||||
|
// message is probably lost too (which you want to handle)
|
||||||
|
|
||||||
|
}).onLost([]() {
|
||||||
|
|
||||||
|
// because the implementation for onNack() and onError() will be the same
|
||||||
|
// in many applications, you can also choose to install a onLost() handler,
|
||||||
|
// which is called when the message has either been nack'ed, or lost.
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
````
|
||||||
|
|
||||||
|
In the above example we have implemented four callback methods. In a real life
|
||||||
|
application, implementing the onAck() and onLost() is normally sufficient.
|
||||||
|
|
||||||
|
Publisher-confirms are often useful in situations where you need reliability.
|
||||||
|
If you want to have certainty about whether your message was handled by RabbitMQ
|
||||||
|
or not, you can enable this feature, either by explicitly calling channel.confirmSelect()
|
||||||
|
if you want to do your own bookkeeping, or using AMQP::Reliable for a simpler
|
||||||
|
API.
|
||||||
|
|
||||||
|
But it also is useful for flood prevention. RabbitMQ turns out not to be very
|
||||||
|
good at handling big loads of publish-operations. If you publish messages faster
|
||||||
|
than RabbitMQ can handle, a server-side buffer builds up, and RabbitMQ gets slow
|
||||||
|
(which causes the buffer to build up evenm further, et cetera). With publish-confirms
|
||||||
|
you can keep the messages in your own application, and only proceed with publishing
|
||||||
|
them when your previous messages have been handled. With this approach you
|
||||||
|
prevent that RabbitMQ gets overloaded. We call it throttling.
|
||||||
|
|
||||||
|
You can build your own throttling mechanism using the confirmSelect() approach
|
||||||
|
or the AMQP::Reliable class. But also here the AMQP-CPP library already has
|
||||||
|
a utility class for you that you can use instead: AMQP::Throttle:
|
||||||
|
|
||||||
|
````c++
|
||||||
|
// create a channel
|
||||||
|
AMQP::TcpChannel mychannel(connection);
|
||||||
|
|
||||||
|
// create a throttle (do not publish more than 20 messages at once) (after
|
||||||
|
// wrapping the channel in a throttle you should no longer call any of the
|
||||||
|
// channel-methods directly)
|
||||||
|
AMQP::Throttle throttle(connection, 20);
|
||||||
|
|
||||||
|
// publish way more messages than RabbitMQ can handle (the Throttle class
|
||||||
|
// will make sure that messages are buffered inside your application if
|
||||||
|
// there are more than 20 unacked messages)
|
||||||
|
for (size_t i = 0; i < 100000; ++i)
|
||||||
|
{
|
||||||
|
// publish a message
|
||||||
|
throttle.publish("my-exchange", "my-key", "my first message");
|
||||||
|
}
|
||||||
|
````
|
||||||
|
|
||||||
|
The AMQP::Reliable and AMQP::Throttle classes that you wrap around a channel.
|
||||||
|
But what if you want to use both? You want to throttle messages, but also like
|
||||||
|
to install your own callbacks for onAck and onLost? This is possible too:
|
||||||
|
|
||||||
|
````c++
|
||||||
|
// create a channel
|
||||||
|
AMQP::TcpChannel mychannel(connection);
|
||||||
|
|
||||||
|
// create a throttle that allows reliable-publishing
|
||||||
|
AMQP::Throttle<AMQP::Reliable> throttle(connection, 20);
|
||||||
|
|
||||||
|
// publish way more messages than RabbitMQ can handle (the Throttle class
|
||||||
|
// will make sure that messages are buffered inside your application if
|
||||||
|
// there are more than 20 unacked messages)
|
||||||
|
for (size_t i = 0; i < 100000; ++i)
|
||||||
|
{
|
||||||
|
// publish a message
|
||||||
|
throttle.publish("my-exchange", "my-key", "my first message").onAck([]() {
|
||||||
|
|
||||||
|
// @todo add your own code
|
||||||
|
|
||||||
|
}).onLost([]() {
|
||||||
|
|
||||||
|
// @todo add your own code
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
````
|
||||||
|
|
||||||
For more information, please see http://www.rabbitmq.com/confirms.html.
|
For more information, please see http://www.rabbitmq.com/confirms.html.
|
||||||
|
|
||||||
CONSUMING MESSAGES
|
CONSUMING MESSAGES
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue