diff --git a/README.md b/README.md index 88b54ce..9884cd7 100644 --- a/README.md +++ b/README.md @@ -1064,38 +1064,159 @@ and consuming. PUBLISHER CONFIRMS =================== -RabbitMQ supports lightweight method of confirming that broker received and processed -a message. For this method to work, the channel needs to be put in so-called _confirm mode_. -This is done using confirmSelect() method. When channel is successfully put in -confirm mode, the server and client count messages (starting from 1) and server sends -acknowledgments for every message it processed (it can also acknowledge multiple message at -once). +RabbitMQ supports a lightweight method of confirming that broker received +and processed a message. When you enable this, RabbitMQ sends back an +'ack' for each publish-operation that has been handled. For this method +to work, the channel needs to be put in _confirm mode_. This is done using +confirmSelect() method. When the channel is successfully put in confirm mode, +the server starts counting the received messages (starting from 1) and sends +acknowledgments for every message it processed (it can also acknowledge +multiple message at once). -If server is unable to process a message, it will send send negative acknowledgments. Both -positive and negative acknowledgments handling are implemented as callbacks for Channel object. +If server is unable to process a message, it will send send negative +acknowledgments. Both positive and negative acknowledgments handling are +implemented as callbacks for that should be installed on the object that +is returned by the confirmSelect() method: ````c++ -// setup confirm mode and ack/nack callbacks +// setup confirm mode and ack/nack callbacks (from this moment onwards +// ack/nack confirmations are coming in) channel.confirmSelect().onSuccess([&]() { - // from this moment onwards ack/nack confirmations are coming in + // publish the first message (this will be acked/nacked with deliveryTag=1) channel.publish("my-exchange", "my-key", "my first message"); - // message counter is now 1, will call onAck/onNack with deliveryTag=1 + // publish the second message (this will be acked/nacked with deliveryTag=2) channel.publish("my-exchange", "my-key", "my second message"); - // message counter is now 2, will call onAck/onNack with deliveryTag=2 }).onAck([&](uint64_t deliveryTag, bool multiple) { + // deliveryTag is message number // multiple is set to true, if all messages UP TO deliveryTag have been processed + }).onNack([&](uint64 deliveryTag, bool multiple, bool requeue) { + // deliveryTag is message number // multiple is set to true, if all messages UP TO deliveryTag have not been processed // requeue is to be ignored + }); ```` +If you use this feature, you will have to implement your own bookkeeping to +track which messages have already been acked/nacked, and which messages +are still being handled. For your convenience however, the AMQP-CPP library +comes with a number of helper classes that can take over this responsibility. + +The AMQP::Reliable class is an optional wrapper around channels. When you use +it, your underlying channel is automatically put it _confirm method_, and all publish +operations are individually acknowledged: + +````c++ +// create a channel +AMQP::TcpChannel mychannel(connection); + +// wrap the channel into a reliable-object so that publish-opertions are +// individually confirmed (after wrapping the channel, it is recommended +// to no longer make direct calls to the channel) +AMQP::Reliable reliable(mychannel); + +// publish a message via the reliable-channel +reliable.publish("my-exchange", "my-key", "my first message").onAck([]() { + + // the message has been acknowledged by RabbitMQ (in your application + // code you can now safely discard the message as it has been picked up) + +}).onNack([]() { + + // the message has _explicitly_ been nack'ed by RabbitMQ (in your application + // code you probably want to log handle this to avoid data-loss) + +}).onError([](const char *message) { + + // a channel-error occured before any ack or nack was received, and the + // message is probably lost too (which you want to handle) + +}).onLost([]() { + + // because the implementation for onNack() and onError() will be the same + // in many applications, you can also choose to install a onLost() handler, + // which is called when the message has either been nack'ed, or lost. + +}); + +```` + +In the above example we have implemented four callback methods. In a real life +application, implementing the onAck() and onLost() is normally sufficient. + +Publisher-confirms are often useful in situations where you need reliability. +If you want to have certainty about whether your message was handled by RabbitMQ +or not, you can enable this feature, either by explicitly calling channel.confirmSelect() +if you want to do your own bookkeeping, or using AMQP::Reliable for a simpler +API. + +But it also is useful for flood prevention. RabbitMQ turns out not to be very +good at handling big loads of publish-operations. If you publish messages faster +than RabbitMQ can handle, a server-side buffer builds up, and RabbitMQ gets slow +(which causes the buffer to build up evenm further, et cetera). With publish-confirms +you can keep the messages in your own application, and only proceed with publishing +them when your previous messages have been handled. With this approach you +prevent that RabbitMQ gets overloaded. We call it throttling. + +You can build your own throttling mechanism using the confirmSelect() approach +or the AMQP::Reliable class. But also here the AMQP-CPP library already has +a utility class for you that you can use instead: AMQP::Throttle: + +````c++ +// create a channel +AMQP::TcpChannel mychannel(connection); + +// create a throttle (do not publish more than 20 messages at once) (after +// wrapping the channel in a throttle you should no longer call any of the +// channel-methods directly) +AMQP::Throttle throttle(connection, 20); + +// publish way more messages than RabbitMQ can handle (the Throttle class +// will make sure that messages are buffered inside your application if +// there are more than 20 unacked messages) +for (size_t i = 0; i < 100000; ++i) +{ + // publish a message + throttle.publish("my-exchange", "my-key", "my first message"); +} +```` + +The AMQP::Reliable and AMQP::Throttle classes that you wrap around a channel. +But what if you want to use both? You want to throttle messages, but also like +to install your own callbacks for onAck and onLost? This is possible too: + +````c++ +// create a channel +AMQP::TcpChannel mychannel(connection); + +// create a throttle that allows reliable-publishing +AMQP::Throttle throttle(connection, 20); + +// publish way more messages than RabbitMQ can handle (the Throttle class +// will make sure that messages are buffered inside your application if +// there are more than 20 unacked messages) +for (size_t i = 0; i < 100000; ++i) +{ + // publish a message + throttle.publish("my-exchange", "my-key", "my first message").onAck([]() { + + // @todo add your own code + + }).onLost([]() { + + // @todo add your own code + + }); +} +```` + For more information, please see http://www.rabbitmq.com/confirms.html. CONSUMING MESSAGES