From c7a12d22c1e6d49ab2890827f8ee19e3345b11a3 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Thu, 8 Jun 2017 22:00:28 +0200 Subject: [PATCH 01/19] Add confirm.select and confirm.select-ok frames --- include/receivedframe.h | 7 +++ src/CMakeLists.txt | 2 + src/confirmframe.h | 56 ++++++++++++++++++++++++ src/confirmselectframe.h | 88 +++++++++++++++++++++++++++++++++++++ src/confirmselectokframe.h | 89 ++++++++++++++++++++++++++++++++++++++ src/includes.h | 1 + src/receivedframe.cpp | 24 ++++++++++ 7 files changed, 267 insertions(+) create mode 100644 src/confirmframe.h create mode 100644 src/confirmselectframe.h create mode 100644 src/confirmselectokframe.h diff --git a/include/receivedframe.h b/include/receivedframe.h index 7c1e1d5..98b9272 100644 --- a/include/receivedframe.h +++ b/include/receivedframe.h @@ -111,6 +111,13 @@ private: */ bool processBasicFrame(ConnectionImpl *connection); + /** + * Process a confirm frame + * @param connection + * @return bool + */ + bool processConfirmFrame(ConnectionImpl *connection); + /** * Process a transaction frame * @param connection diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index eae0c22..e06b204 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -30,6 +30,8 @@ channelframe.h channelimpl.cpp channelopenframe.h channelopenokframe.h +confirmselectframe.h +confirmselectokframe.h connectioncloseframe.h connectioncloseokframe.h connectionframe.h diff --git a/src/confirmframe.h b/src/confirmframe.h new file mode 100644 index 0000000..02377f9 --- /dev/null +++ b/src/confirmframe.h @@ -0,0 +1,56 @@ +/** + * Class describing an AMQP confirm frame + * + * @author Marcin Gibula + * @copyright 2017 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class implementation + */ +class ConfirmFrame : public MethodFrame +{ +protected: + /** + * Constructor + * @param channel channel identifier + * @param size frame size + */ + ConfirmFrame(uint16_t channel, uint32_t size) : + MethodFrame(channel, size) + {} + + /** + * Constructor based on incoming frame + * @param frame + */ + ConfirmFrame(ReceivedFrame &frame) : + MethodFrame(frame) + {} + +public: + /** + * Destructor + */ + virtual ~ConfirmFrame() {} + + /** + * Class id + * @return uint16_t + */ + virtual uint16_t classID() const override + { + return 85; + } +}; + +/** + * end namespace + */ +} + diff --git a/src/confirmselectframe.h b/src/confirmselectframe.h new file mode 100644 index 0000000..8a9f79d --- /dev/null +++ b/src/confirmselectframe.h @@ -0,0 +1,88 @@ +/** + * Class describing an AMQP confirm select frame + * + * @author Marcin Gibula + * @copyright 2017 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class implementation + */ +class ConfirmSelectFrame : public ConfirmFrame +{ +private: + + /** + * whether to wait for a response + * @var BooleanSet + */ + BooleanSet _noWait; + +protected: + /** + * Encode a frame on a string buffer + * + * @param buffer buffer to write frame to + */ + virtual void fill(OutBuffer& buffer) const override + { + // call base + ConfirmFrame::fill(buffer); + + // add boolean + _noWait.fill(buffer); + } + +public: + /** + * Decode a confirm select frame from a received frame + * + * @param frame received frame to decode + */ + ConfirmSelectFrame(ReceivedFrame& frame) : ConfirmFrame(frame), _noWait(frame) {} + + /** + * Construct a confirm select frame + * + * @param channel channel identifier + * @return newly created confirm select frame + */ + ConfirmSelectFrame(uint16_t channel, bool noWait = false) : + ConfirmFrame(channel, 0), + _noWait(noWait) + {} + + /** + * Destructor + */ + virtual ~ConfirmSelectFrame() {} + + /** + * return the method id + * @return uint16_t + */ + virtual uint16_t methodID() const override + { + return 10; + } + + /** + * Return whether to wait for a response + * @return boolean + */ + bool noWait() const + { + return _noWait.get(0); + } +}; + +/** + * end namespace + */ +} + diff --git a/src/confirmselectokframe.h b/src/confirmselectokframe.h new file mode 100644 index 0000000..4a20992 --- /dev/null +++ b/src/confirmselectokframe.h @@ -0,0 +1,89 @@ +/** + * Class describing an AMQP confirm select ok frame + * + * @author Marcin Gibula + * @copyright 2017 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class implementation + */ +class ConfirmSelectOKFrame : public ConfirmFrame +{ +protected: + /** + * Encode a frame on a string buffer + * + * @param buffer buffer to write frame to + */ + virtual void fill(OutBuffer& buffer) const override + { + // call base + ConfirmFrame::fill(buffer); + } + +public: + /** + * Constructor for an incoming frame + * + * @param frame received frame to decode + */ + ConfirmSelectOKFrame(ReceivedFrame& frame) : + ConfirmFrame(frame) + {} + + /** + * Construct a confirm select ok frame + * + * @param channel channel identifier + * @return newly created confirm select ok frame + */ + ConfirmSelectOKFrame(uint16_t channel) : + ConfirmFrame(channel, 0) + {} + + /** + * Destructor + */ + virtual ~ConfirmSelectOKFrame() {} + + /** + * return the method id + * @return uint16_t + */ + virtual uint16_t methodID() const override + { + return 11; + } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + auto channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report that the channel is open + if (channel->reportSuccess()) channel->onSynchronized(); + + // done + return true; + } +}; + +/** + * end namespace + */ +} + diff --git a/src/includes.h b/src/includes.h index 2a10061..b105932 100644 --- a/src/includes.h +++ b/src/includes.h @@ -90,6 +90,7 @@ #include "exchangeframe.h" #include "queueframe.h" #include "basicframe.h" +#include "confirmframe.h" #include "transactionframe.h" #include "addressinfo.h" diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index 31406f4..83b83aa 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -7,6 +7,8 @@ */ #include "includes.h" #include "heartbeatframe.h" +#include "confirmselectframe.h" +#include "confirmselectokframe.h" #include "connectionstartokframe.h" #include "connectionstartframe.h" #include "connectionsecureframe.h" @@ -336,6 +338,7 @@ bool ReceivedFrame::processMethodFrame(ConnectionImpl *connection) case 40: return processExchangeFrame(connection); case 50: return processQueueFrame(connection); case 60: return processBasicFrame(connection); + case 85: return processConfirmFrame(connection); case 90: return processTransactionFrame(connection); } @@ -493,6 +496,27 @@ bool ReceivedFrame::processBasicFrame(ConnectionImpl *connection) throw ProtocolException("unrecognized basic frame method " + std::to_string(methodID)); } +/** + * Process a confirm frame + * @param connection + * @return bool + */ +bool ReceivedFrame::processConfirmFrame(ConnectionImpl *connection) +{ + // read the method id + uint16_t methodID = nextUint16(); + + // construct frame based on method id + switch (methodID) + { + case 10: return ConfirmSelectFrame(*this).process(connection); + case 11: return ConfirmSelectOKFrame(*this).process(connection); + } + + // this is a problem + throw ProtocolException("unrecognized confirm frame method " + std::to_string(methodID)); +} + /** * Process a transaction frame * @param connection From 30e652c669683d90c875a21eb1577a6851ccf66a Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Thu, 8 Jun 2017 22:19:55 +0200 Subject: [PATCH 02/19] Add Channel::setConfirmMode() and Channel::messageCounter() methods --- include/channel.h | 22 ++++++++++++++++++++++ include/channelimpl.h | 27 +++++++++++++++++++++++++++ src/channelimpl.cpp | 17 +++++++++++++++++ src/confirmselectokframe.h | 3 +++ 4 files changed, 69 insertions(+) diff --git a/include/channel.h b/include/channel.h index af01f02..37baadb 100644 --- a/include/channel.h +++ b/include/channel.h @@ -123,6 +123,28 @@ public: return _implementation->connected(); } + /** + * Put channel in a confirm mode (RabbitMQ specific) + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + */ + Deferred &setConfirmMode() + { + return _implementation->setConfirmMode(); + } + + /** + * Return number of messages sent. + * + * This functions returns number of messages sent. It's incremented only + * when channel is in confirm mode. + */ + uint64_t messageCounter() const + { + return _implementation->messageCounter(); + } + /** * Start a transaction * diff --git a/include/channelimpl.h b/include/channelimpl.h index 7ddf1e3..c95b1f1 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -133,6 +133,12 @@ private: */ std::shared_ptr _consumer; + /** + * Number of messages sent. Used in confirm mode + * @var uint64_t + */ + uint64_t _messageCounter = 0; + /** * Attach the connection * @param connection @@ -244,6 +250,19 @@ public: return _state == state_connected || _state == state_ready; } + /** + * Put channel in a confirm mode (RabbitMQ specific) + */ + Deferred &setConfirmMode(); + + /** + * Return number of messages sent. + */ + uint64_t messageCounter() const + { + return _messageCounter; + } + /** * Start a transaction */ @@ -696,6 +715,14 @@ public: */ void complete(); + /** + * Start message counter + */ + void startMessageCounter() + { + _messageCounter = 1; + } + /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 1ddf54f..dde08af 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -15,6 +15,7 @@ #include "channelflowframe.h" #include "channelcloseokframe.h" #include "channelcloseframe.h" +#include "confirmselectframe.h" #include "transactionselectframe.h" #include "transactioncommitframe.h" #include "transactionrollbackframe.h" @@ -182,6 +183,18 @@ Deferred &ChannelImpl::resume() return push(ChannelFlowFrame(_id, true)); } +/** + * Put channel in a confirm mode + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + */ +Deferred &ChannelImpl::setConfirmMode() +{ + // send a transaction frame + return push(ConfirmSelectFrame(_id)); +} + /** * Start a transaction * @@ -492,6 +505,10 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin bytesleft -= chunksize; } + // increment message counter if we're in confirm mode + if (_messageCounter) + _messageCounter++; + // done return true; } diff --git a/src/confirmselectokframe.h b/src/confirmselectokframe.h index 4a20992..83ee8b8 100644 --- a/src/confirmselectokframe.h +++ b/src/confirmselectokframe.h @@ -74,6 +74,9 @@ public: // channel does not exist if(!channel) return false; + // start message counter + channel->startMessageCounter(); + // report that the channel is open if (channel->reportSuccess()) channel->onSynchronized(); From f0712cfdc25b76403fa712aa19162aced2499b16 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Thu, 8 Jun 2017 22:50:45 +0200 Subject: [PATCH 03/19] Add ACK and NACK callbacks to Channel --- include/callbacks.h | 2 ++ include/channel.h | 30 +++++++++++++++++++++++++++ include/channelimpl.h | 48 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+) diff --git a/include/callbacks.h b/include/callbacks.h index a7c85b9..8037a88 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -48,6 +48,8 @@ using DeleteCallback = std::function; using SizeCallback = std::function; using ConsumeCallback = std::function; using CancelCallback = std::function; +using AckCallback = std::function; +using NackCallback = std::function; /** * End namespace diff --git a/include/channel.h b/include/channel.h index 37baadb..bb51b8e 100644 --- a/include/channel.h +++ b/include/channel.h @@ -145,6 +145,36 @@ public: return _implementation->messageCounter(); } + /** + * Callback that is called when the broker confirmed message publication + * + * Only one callback can be registered. Calling this function multiple + * times will remove the old callback. + * + * For this callback to be called, the channel needs to be in confirm mode. + * + * @param callback the callback to execute + */ + void onAck(const AckCallback &callback) + { + _implementation->onAck(callback); + } + + /** + * Callback that is called when the broker denied message publication + * + * Only one callback can be registered. Calling this function multiple + * times will remove the old callback. + * + * For this callback to be called, the channel needs to be in confirm mode. + * + * @param callback the callback to execute + */ + void onNack(const NackCallback &callback) + { + _implementation->onNack(callback); + } + /** * Start a transaction * diff --git a/include/channelimpl.h b/include/channelimpl.h index c95b1f1..9267bc0 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -139,6 +139,18 @@ private: */ uint64_t _messageCounter = 0; + /** + * Callback when broker confirmed message publication + * @var SuccessCallback + */ + AckCallback _ackCallback; + + /** + * Callback when broker denied message publication + * @var ErrorCallback + */ + NackCallback _nackCallback; + /** * Attach the connection * @param connection @@ -221,6 +233,26 @@ public: */ void onError(const ErrorCallback &callback); + /** + * Callback that is called when the broker confirmed message publication + * @param callback the callback to execute + */ + void onAck(const AckCallback &callback) + { + // store callback + _ackCallback = callback; + } + + /** + * Callback that is called when the broker denied message publication + * @param callback the callback to execute + */ + void onNack(const NackCallback &callback) + { + // store callback + _nackCallback = callback; + } + /** * Pause deliveries on a channel * @@ -576,6 +608,22 @@ public: */ void onSynchronized(); + /** + * Report to the handler that message has been published + */ + void reportAck(uint64_t deliveryTag, bool multiple) + { + if (_ackCallback) _ackCallback(deliveryTag, multiple); + } + + /** + * Report to the handler that message has not been published + */ + void reportNack(uint64_t deliveryTag, bool multiple, bool requeue) + { + if (_nackCallback) _nackCallback(deliveryTag, multiple, requeue); + } + /** * Report to the handler that the channel is opened */ From a28ea01f0e8925a14af69b7579f76ab250d3e104 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Thu, 8 Jun 2017 22:57:29 +0200 Subject: [PATCH 04/19] Call ACK and NACK callbacks --- src/basicackframe.h | 20 ++++++++++++++++++++ src/basicnackframe.h | 20 ++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/src/basicackframe.h b/src/basicackframe.h index 1f3c577..9d10e13 100644 --- a/src/basicackframe.h +++ b/src/basicackframe.h @@ -110,6 +110,26 @@ public: { return _multiple.get(0); } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + auto channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // start message counter + channel->reportAck(deliveryTag(), multiple()); + + // done + return true; + } }; /** diff --git a/src/basicnackframe.h b/src/basicnackframe.h index 89344fe..f6e5fc1 100644 --- a/src/basicnackframe.h +++ b/src/basicnackframe.h @@ -108,6 +108,26 @@ public: { return _bits.get(1); } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + auto channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // start message counter + channel->reportNack(deliveryTag(), multiple(), requeue()); + + // done + return true; + } }; /** From 061c462ee0a5ed42cbe47d2657dc4655bb436d76 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Thu, 8 Jun 2017 23:18:40 +0200 Subject: [PATCH 05/19] Set correct message length --- src/confirmselectframe.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confirmselectframe.h b/src/confirmselectframe.h index 8a9f79d..26ef3ba 100644 --- a/src/confirmselectframe.h +++ b/src/confirmselectframe.h @@ -53,7 +53,7 @@ public: * @return newly created confirm select frame */ ConfirmSelectFrame(uint16_t channel, bool noWait = false) : - ConfirmFrame(channel, 0), + ConfirmFrame(channel, 1), //sizeof bool _noWait(noWait) {} From 3ab9eed268e8c8311909b70edb294d8567b92db1 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Thu, 8 Jun 2017 23:41:55 +0200 Subject: [PATCH 06/19] Simplify message counter logic --- include/channel.h | 4 ++-- include/channelimpl.h | 6 +++--- src/channelimpl.cpp | 5 ++--- src/confirmselectokframe.h | 4 ++-- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/include/channel.h b/include/channel.h index bb51b8e..839ca02 100644 --- a/include/channel.h +++ b/include/channel.h @@ -137,8 +137,8 @@ public: /** * Return number of messages sent. * - * This functions returns number of messages sent. It's incremented only - * when channel is in confirm mode. + * This functions returns number of messages sent. It's reset to zero when channel is + * put into confirm mode. */ uint64_t messageCounter() const { diff --git a/include/channelimpl.h b/include/channelimpl.h index 9267bc0..6770022 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -764,11 +764,11 @@ public: void complete(); /** - * Start message counter + * Reset message counter */ - void startMessageCounter() + void resetMessageCounter() { - _messageCounter = 1; + _messageCounter = 0; } /** diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index dde08af..11ec906 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -505,9 +505,8 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin bytesleft -= chunksize; } - // increment message counter if we're in confirm mode - if (_messageCounter) - _messageCounter++; + // increment message counter + _messageCounter++; // done return true; diff --git a/src/confirmselectokframe.h b/src/confirmselectokframe.h index 83ee8b8..098168b 100644 --- a/src/confirmselectokframe.h +++ b/src/confirmselectokframe.h @@ -74,8 +74,8 @@ public: // channel does not exist if(!channel) return false; - // start message counter - channel->startMessageCounter(); + // reset message counter + channel->resetMessageCounter(); // report that the channel is open if (channel->reportSuccess()) channel->onSynchronized(); From dcabf722b4ba809ee9ded8dbf89b139f21648657 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Fri, 9 Jun 2017 00:07:29 +0200 Subject: [PATCH 07/19] Add documentation --- README.md | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b324354..3d36a76 100644 --- a/README.md +++ b/README.md @@ -779,6 +779,42 @@ knows in the database world. It is not possible to wrap all sort of operations in a transaction, they are only meaningful for publishing and consuming. +PUBLISHER CONFIRMS +=================== + +RabbitMQ supports lightweight method of confirming that broker received and processed +a message. For this method to work, the channel needs to be put in so-called _confirm mode_. +This is done using setConfirmMode() method. When channel is successfully put in +confirm mode, the server and client count messages (starting from 1) and server sends +acknowledgments for every message it processed (it can also acknowledge multiple message at +once). + +If server is unable to process a message, it will send send negative acknowledgments. Both +positive and negative acknowledgments handling are implemented as callbacks for Channel object. +There is also helper method messageCounter() that returns number of messages send so far +(note that this value is reset when channel is put in confirm mode). + +````c++ +// put channel in confirm mode +channel.setConfirmMode().onSuccess([&]() { + channel.publish("my-exchange", "my-key", "my first message"); + // channel.messageCounter() is now 1 + + channel.publish("my-exchange", "my-key", "my second message"); + // channel.messageCounter() is now 2 +}) + +channel.onAck([&](uint64_t deliverTag, bool multiple) { + // called with deliverTag 1 and 2 if message is processed by broker +}); + +channel.onNack([&](uint64 deliveryTag, bool multiple, bool requeue) { + // called with deliveryTag 1 and 2 if message is not processed by broker +}); +```` + + +For more information, see http://www.rabbitmq.com/confirms.html. CONSUMING MESSAGES ================== @@ -912,7 +948,6 @@ need additional attention: - ability to set up secure connections (or is this fully done on the IO level) - login with other protocols than login/password - - publish confirms - returned messages We also need to add more safety checks so that strange or invalid data from From 389a1d1e7480a63c7e2c4d7f72d54819c76d7d9b Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Fri, 9 Jun 2017 00:18:00 +0200 Subject: [PATCH 08/19] Small documentation fixes --- README.md | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 3d36a76..9b0165a 100644 --- a/README.md +++ b/README.md @@ -795,26 +795,29 @@ There is also helper method messageCounter() that returns number of messages sen (note that this value is reset when channel is put in confirm mode). ````c++ -// put channel in confirm mode -channel.setConfirmMode().onSuccess([&]() { - channel.publish("my-exchange", "my-key", "my first message"); - // channel.messageCounter() is now 1 - - channel.publish("my-exchange", "my-key", "my second message"); - // channel.messageCounter() is now 2 -}) - +// setup ack and nack callbacks channel.onAck([&](uint64_t deliverTag, bool multiple) { - // called with deliverTag 1 and 2 if message is processed by broker + // deliverTag is message number + // multiple is set to true, if all messages UP TO deliverTag have been processed }); channel.onNack([&](uint64 deliveryTag, bool multiple, bool requeue) { - // called with deliveryTag 1 and 2 if message is not processed by broker + // deliverTag is message number + // multiple is set to true, if all messages UP TO deliverTag have not been processed + // requeue is to be ignored +}); + +// put channel in confirm mode +channel.setConfirmMode().onSuccess([&]() { + channel.publish("my-exchange", "my-key", "my first message"); + // channel.messageCounter() is now 1, will call onAck/onNack with deliverTag=1 + + channel.publish("my-exchange", "my-key", "my second message"); + // channel.messageCounter() is now 2, will call onAck/onNack with deliverTag=2 }); ```` - -For more information, see http://www.rabbitmq.com/confirms.html. +For more information, please see http://www.rabbitmq.com/confirms.html. CONSUMING MESSAGES ================== From e0b04ad7e0f0d3422eac09a11ec0faf2619e926a Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 12:55:15 +0200 Subject: [PATCH 09/19] Remove messageCounter() --- README.md | 6 ++---- include/amqpcpp/channel.h | 11 ----------- include/amqpcpp/channelimpl.h | 22 ---------------------- src/channelimpl.cpp | 3 --- src/confirmselectokframe.h | 3 --- 5 files changed, 2 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 1ee1bd3..b4b03c9 100644 --- a/README.md +++ b/README.md @@ -986,8 +986,6 @@ once). If server is unable to process a message, it will send send negative acknowledgments. Both positive and negative acknowledgments handling are implemented as callbacks for Channel object. -There is also helper method messageCounter() that returns number of messages send so far -(note that this value is reset when channel is put in confirm mode). ````c++ // setup ack and nack callbacks @@ -1005,10 +1003,10 @@ channel.onNack([&](uint64 deliveryTag, bool multiple, bool requeue) { // put channel in confirm mode channel.setConfirmMode().onSuccess([&]() { channel.publish("my-exchange", "my-key", "my first message"); - // channel.messageCounter() is now 1, will call onAck/onNack with deliverTag=1 + // message counter is now 1, will call onAck/onNack with deliverTag=1 channel.publish("my-exchange", "my-key", "my second message"); - // channel.messageCounter() is now 2, will call onAck/onNack with deliverTag=2 + // message counter is now 2, will call onAck/onNack with deliverTag=2 }); ```` diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index 3f70b40..29b3097 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -138,17 +138,6 @@ public: return _implementation->setConfirmMode(); } - /** - * Return number of messages sent. - * - * This functions returns number of messages sent. It's reset to zero when channel is - * put into confirm mode. - */ - uint64_t messageCounter() const - { - return _implementation->messageCounter(); - } - /** * Callback that is called when the broker confirmed message publication * diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 4229914..e9b680d 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -141,12 +141,6 @@ private: */ std::shared_ptr _receiver; - /** - * Number of messages sent. Used in confirm mode - * @var uint64_t - */ - uint64_t _messageCounter = 0; - /** * Callback when broker confirmed message publication * @var SuccessCallback @@ -295,14 +289,6 @@ public: */ Deferred &setConfirmMode(); - /** - * Return number of messages sent. - */ - uint64_t messageCounter() const - { - return _messageCounter; - } - /** * Start a transaction */ @@ -782,14 +768,6 @@ public: */ DeferredPublisher *publisher() const { return _publisher.get(); } - /** - * Reset message counter - */ - void resetMessageCounter() - { - _messageCounter = 0; - } - /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 21d191c..f52f330 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -515,9 +515,6 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std:: bytesleft -= chunksize; } - // increment message counter - _messageCounter++; - // done return *_publisher; } diff --git a/src/confirmselectokframe.h b/src/confirmselectokframe.h index 098168b..4a20992 100644 --- a/src/confirmselectokframe.h +++ b/src/confirmselectokframe.h @@ -74,9 +74,6 @@ public: // channel does not exist if(!channel) return false; - // reset message counter - channel->resetMessageCounter(); - // report that the channel is open if (channel->reportSuccess()) channel->onSynchronized(); From eeee1c3c5faf54aceb9b8f0884c37d1cfc899912 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 12:59:20 +0200 Subject: [PATCH 10/19] Rename setConfirmMode() to confirmSelect() --- README.md | 4 ++-- include/amqpcpp/channel.h | 4 ++-- include/amqpcpp/channelimpl.h | 2 +- src/channelimpl.cpp | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index b4b03c9..4573dfa 100644 --- a/README.md +++ b/README.md @@ -979,7 +979,7 @@ PUBLISHER CONFIRMS RabbitMQ supports lightweight method of confirming that broker received and processed a message. For this method to work, the channel needs to be put in so-called _confirm mode_. -This is done using setConfirmMode() method. When channel is successfully put in +This is done using confirmSelect() method. When channel is successfully put in confirm mode, the server and client count messages (starting from 1) and server sends acknowledgments for every message it processed (it can also acknowledge multiple message at once). @@ -1001,7 +1001,7 @@ channel.onNack([&](uint64 deliveryTag, bool multiple, bool requeue) { }); // put channel in confirm mode -channel.setConfirmMode().onSuccess([&]() { +channel.confirmSelect().onSuccess([&]() { channel.publish("my-exchange", "my-key", "my first message"); // message counter is now 1, will call onAck/onNack with deliverTag=1 diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index 29b3097..898bdf0 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -133,9 +133,9 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &setConfirmMode() + Deferred &confirmSelect() { - return _implementation->setConfirmMode(); + return _implementation->confirmSelect(); } /** diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index e9b680d..f1f4534 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -287,7 +287,7 @@ public: /** * Put channel in a confirm mode (RabbitMQ specific) */ - Deferred &setConfirmMode(); + Deferred &confirmSelect(); /** * Start a transaction diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index f52f330..ecdef51 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -187,7 +187,7 @@ Deferred &ChannelImpl::resume() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred &ChannelImpl::setConfirmMode() +Deferred &ChannelImpl::confirmSelect() { // send a transaction frame return push(ConfirmSelectFrame(_id)); From a90c6207094ec53ca0bcd45ace7bba3b7720df41 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 21:05:07 +0200 Subject: [PATCH 11/19] Add BasicAckFrame and BasicNackFrame to class list --- include/amqpcpp/classes.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/amqpcpp/classes.h b/include/amqpcpp/classes.h index 493cc12..0cbaafb 100644 --- a/include/amqpcpp/classes.h +++ b/include/amqpcpp/classes.h @@ -24,6 +24,8 @@ class BasicDeliverFrame; class BasicGetOKFrame; class BasicHeaderFrame; class BasicReturnFrame; +class BasicAckFrame; +class BasicNackFrame; class BodyFrame; class Channel; class Connection; From bf1caa1eae45cbffcc75908690514adb8f18b1ce Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 21:05:49 +0200 Subject: [PATCH 12/19] Add DeferredConfirm class --- include/amqpcpp/deferredconfirm.h | 93 +++++++++++++++++++++++++++++++ src/deferredconfirm.cpp | 40 +++++++++++++ src/includes.h | 1 + 3 files changed, 134 insertions(+) create mode 100644 include/amqpcpp/deferredconfirm.h create mode 100644 src/deferredconfirm.cpp diff --git a/include/amqpcpp/deferredconfirm.h b/include/amqpcpp/deferredconfirm.h new file mode 100644 index 0000000..724e7f8 --- /dev/null +++ b/include/amqpcpp/deferredconfirm.h @@ -0,0 +1,93 @@ +/** + * DeferredDelete.h + * + * Deferred callback for RabbitMQ-specific publisher confirms mechanism. + * + * @author Marcin Gibula + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * We extend from the default deferred and add extra functionality + */ +class DeferredConfirm : public Deferred +{ +private: + /** + * Callback to execute when server confirms that message is processed + * @var AckCallback + */ + AckCallback _ackCallback; + + /** + * Callback to execute when server sends negative acknowledgement + * @var NackCallback + */ + NackCallback _nackCallback; + + /** + * Process an ACK frame + * + * @param frame The frame to process + */ + void process(BasicAckFrame &frame); + + + /** + * Process an ACK frame + * + * @param frame The frame to process + */ + void process(BasicNackFrame &frame); + + /** + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + friend class BasicAckFrame; + friend class BasicNackFrame; + +public: + /** + * Protected constructor that can only be called + * from within the channel implementation + * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. + * + * @param boolean are we already failed? + */ + DeferredConfirm(bool failed = false) : Deferred(failed) {} + +public: + /** + * Register the function that is called when channel is put in publisher + * confirmed mode + * @param callback + */ + DeferredConfirm &onSuccess(const SuccessCallback &callback) + { + // call base + Deferred::onSuccess(callback); + + // allow chaining + return *this; + } +}; + +/** + * End namespace + */ +} diff --git a/src/deferredconfirm.cpp b/src/deferredconfirm.cpp new file mode 100644 index 0000000..4a473f5 --- /dev/null +++ b/src/deferredconfirm.cpp @@ -0,0 +1,40 @@ +/** + * DeferredConfirm.cpp + * + * Implementation file for the DeferredConfirm class + * + * @author Marcin Gibula + * @copyright 2018 Copernica BV + */ +#include "includes.h" + +/** + * Namespace + */ +namespace AMQP { + +/** + * Process an ACK frame + * + * @param frame The frame to process + */ +void DeferredConfirm::process(BasicAckFrame &frame) +{ + if (_ackCallback) _ackCallback(frame.deliveryTag(), frame.multiple()); +} + +/** + * Process a NACK frame + * + * @param frame The frame to process + */ +void DeferredConfirm::process(BasicNackFrame &frame) +{ + if (_nackCallback) _nackCallback(frame.deliveryTag(), frame.multiple(), frame.requeue()); +} + +/** + * End namespace + */ +} + diff --git a/src/includes.h b/src/includes.h index 795d501..2cc31bd 100644 --- a/src/includes.h +++ b/src/includes.h @@ -72,6 +72,7 @@ #include "amqpcpp/deferredqueue.h" #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h" +#include "amqpcpp/deferredconfirm.h" #include "amqpcpp/deferredget.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" From 2aa55b83c7ffed0b92b5005cbd85dc6030068c2e Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 21:12:34 +0200 Subject: [PATCH 13/19] Replace old API with DeferredConfirm class --- include/amqpcpp/channel.h | 32 +----------------- include/amqpcpp/channelimpl.h | 63 ++++++++--------------------------- src/basicackframe.h | 10 ++++-- src/basicnackframe.h | 10 ++++-- src/channelimpl.cpp | 15 +++++++-- 5 files changed, 43 insertions(+), 87 deletions(-) diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index 898bdf0..18f9f07 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -133,41 +133,11 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &confirmSelect() + DeferredConfirm &confirmSelect() { return _implementation->confirmSelect(); } - /** - * Callback that is called when the broker confirmed message publication - * - * Only one callback can be registered. Calling this function multiple - * times will remove the old callback. - * - * For this callback to be called, the channel needs to be in confirm mode. - * - * @param callback the callback to execute - */ - void onAck(const AckCallback &callback) - { - _implementation->onAck(callback); - } - - /** - * Callback that is called when the broker denied message publication - * - * Only one callback can be registered. Calling this function multiple - * times will remove the old callback. - * - * For this callback to be called, the channel needs to be in confirm mode. - * - * @param callback the callback to execute - */ - void onNack(const NackCallback &callback) - { - _implementation->onNack(callback); - } - /** * Start a transaction * diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index f1f4534..d3ef475 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -42,6 +42,7 @@ class ConsumedMessage; class ConnectionImpl; class DeferredDelete; class DeferredCancel; +class DeferredConfirm; class DeferredQueue; class DeferredGet; class DeferredPublisher; @@ -80,6 +81,12 @@ private: */ std::shared_ptr _publisher; + /** + * Handler that deals with publisher confirms frames + * @var std::shared_ptr + */ + std::shared_ptr _confirm; + /** * Handlers for all consumers that are active * @var std::map @@ -141,18 +148,6 @@ private: */ std::shared_ptr _receiver; - /** - * Callback when broker confirmed message publication - * @var SuccessCallback - */ - AckCallback _ackCallback; - - /** - * Callback when broker denied message publication - * @var ErrorCallback - */ - NackCallback _nackCallback; - /** * Attach the connection * @param connection @@ -235,26 +230,6 @@ public: */ void onError(const ErrorCallback &callback); - /** - * Callback that is called when the broker confirmed message publication - * @param callback the callback to execute - */ - void onAck(const AckCallback &callback) - { - // store callback - _ackCallback = callback; - } - - /** - * Callback that is called when the broker denied message publication - * @param callback the callback to execute - */ - void onNack(const NackCallback &callback) - { - // store callback - _nackCallback = callback; - } - /** * Pause deliveries on a channel * @@ -287,7 +262,7 @@ public: /** * Put channel in a confirm mode (RabbitMQ specific) */ - Deferred &confirmSelect(); + DeferredConfirm &confirmSelect(); /** * Start a transaction @@ -603,22 +578,6 @@ public: */ void onSynchronized(); - /** - * Report to the handler that message has been published - */ - void reportAck(uint64_t deliveryTag, bool multiple) - { - if (_ackCallback) _ackCallback(deliveryTag, multiple); - } - - /** - * Report to the handler that message has not been published - */ - void reportNack(uint64_t deliveryTag, bool multiple, bool requeue) - { - if (_nackCallback) _nackCallback(deliveryTag, multiple, requeue); - } - /** * Report to the handler that the channel is opened */ @@ -768,6 +727,12 @@ public: */ DeferredPublisher *publisher() const { return _publisher.get(); } + /** + * Retrieve the deferred confirm that handles publisher confirms + * @return The deferred confirm object + */ + DeferredConfirm *confirm() const { return _confirm.get(); } + /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/src/basicackframe.h b/src/basicackframe.h index 9d10e13..514d693 100644 --- a/src/basicackframe.h +++ b/src/basicackframe.h @@ -124,8 +124,14 @@ public: // channel does not exist if(!channel) return false; - // start message counter - channel->reportAck(deliveryTag(), multiple()); + // get the current confirm + auto confirm = channel->confirm(); + + // if there is no deferred confirm, we can just as well stop + if (confirm == nullptr) return false; + + // process the frame + confirm->process(*this); // done return true; diff --git a/src/basicnackframe.h b/src/basicnackframe.h index f6e5fc1..589fb15 100644 --- a/src/basicnackframe.h +++ b/src/basicnackframe.h @@ -122,8 +122,14 @@ public: // channel does not exist if(!channel) return false; - // start message counter - channel->reportNack(deliveryTag(), multiple(), requeue()); + // get the current confirm + auto confirm = channel->confirm(); + + // if there is no deferred confirm, we can just as well stop + if (confirm == nullptr) return false; + + // process the frame + confirm->process(*this); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index ecdef51..7e79149 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -187,10 +187,19 @@ Deferred &ChannelImpl::resume() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred &ChannelImpl::confirmSelect() +DeferredConfirm &ChannelImpl::confirmSelect() { - // send a transaction frame - return push(ConfirmSelectFrame(_id)); + // the frame to send + ConfirmSelectFrame frame(_id); + + // send the frame, and create deferred object + auto deferred = std::make_shared(!send(frame)); + + // push to list + push(deferred); + + // done + return *deferred; } /** From 76cf06d0ba90bf8dfb4a41cfd4c8c30019b8ef64 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 21:34:43 +0200 Subject: [PATCH 14/19] Comment fix --- include/amqpcpp/deferredconfirm.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/amqpcpp/deferredconfirm.h b/include/amqpcpp/deferredconfirm.h index 724e7f8..f9f24a4 100644 --- a/include/amqpcpp/deferredconfirm.h +++ b/include/amqpcpp/deferredconfirm.h @@ -1,5 +1,5 @@ /** - * DeferredDelete.h + * DeferredConfirm.h * * Deferred callback for RabbitMQ-specific publisher confirms mechanism. * From 1a955b39af6d2a4fc631b732442a5590534bd026 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 21:35:08 +0200 Subject: [PATCH 15/19] Add deferredconfirm.h to amqpcpp.h --- include/amqpcpp.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/amqpcpp.h b/include/amqpcpp.h index 8079b1f..3c5de03 100644 --- a/include/amqpcpp.h +++ b/include/amqpcpp.h @@ -70,6 +70,7 @@ #include "amqpcpp/deferredqueue.h" #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h" +#include "amqpcpp/deferredconfirm.h" #include "amqpcpp/deferredget.h" #include "amqpcpp/deferredpublisher.h" #include "amqpcpp/channelimpl.h" From ddee073278c940d49d82d0cfbcc98a6fad0ec034 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 21:40:30 +0200 Subject: [PATCH 16/19] Add onAck and onNack methods --- include/amqpcpp/deferredconfirm.h | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/include/amqpcpp/deferredconfirm.h b/include/amqpcpp/deferredconfirm.h index f9f24a4..fc89a06 100644 --- a/include/amqpcpp/deferredconfirm.h +++ b/include/amqpcpp/deferredconfirm.h @@ -42,7 +42,6 @@ private: */ void process(BasicAckFrame &frame); - /** * Process an ACK frame * @@ -85,6 +84,32 @@ public: // allow chaining return *this; } + + /** + * Callback that is called when the broker confirmed message publication + * @param callback the callback to execute + */ + DeferredConfirm &onAck(const AckCallback &callback) + { + // store callback + _ackCallback = callback; + + // allow chaining + return *this; + } + + /** + * Callback that is called when the broker denied message publication + * @param callback the callback to execute + */ + DeferredConfirm &onNack(const NackCallback &callback) + { + // store callback + _nackCallback = callback; + + // allow chaining + return *this; + } }; /** From 578abbf9738aa9670b136d83a1b1d3ab6ada4d13 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 21:44:01 +0200 Subject: [PATCH 17/19] Update src/CMakeLists.txt --- src/CMakeLists.txt | 1 + src/deferredconfirm.cpp | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b900c46..403993b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -45,6 +45,7 @@ add_sources( connectiontuneokframe.h consumedmessage.h deferredcancel.cpp + deferredconfirm.cpp deferredconsumer.cpp deferredreceiver.cpp deferredextreceiver.cpp diff --git a/src/deferredconfirm.cpp b/src/deferredconfirm.cpp index 4a473f5..7a1ce22 100644 --- a/src/deferredconfirm.cpp +++ b/src/deferredconfirm.cpp @@ -7,6 +7,8 @@ * @copyright 2018 Copernica BV */ #include "includes.h" +#include "basicackframe.h" +#include "basicnackframe.h" /** * Namespace From 26313cc7757bbb397b5fb6d75e50cf83026df130 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 22:04:36 +0200 Subject: [PATCH 18/19] Update documentation --- README.md | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 4573dfa..fb48b31 100644 --- a/README.md +++ b/README.md @@ -988,26 +988,25 @@ If server is unable to process a message, it will send send negative acknowledgm positive and negative acknowledgments handling are implemented as callbacks for Channel object. ````c++ -// setup ack and nack callbacks -channel.onAck([&](uint64_t deliverTag, bool multiple) { - // deliverTag is message number - // multiple is set to true, if all messages UP TO deliverTag have been processed -}); - -channel.onNack([&](uint64 deliveryTag, bool multiple, bool requeue) { - // deliverTag is message number - // multiple is set to true, if all messages UP TO deliverTag have not been processed - // requeue is to be ignored -}); - -// put channel in confirm mode +// setup confirm mode and ack/nack callbacks channel.confirmSelect().onSuccess([&]() { + // from this moment onwards ack/nack confirmations are comming in + channel.publish("my-exchange", "my-key", "my first message"); // message counter is now 1, will call onAck/onNack with deliverTag=1 channel.publish("my-exchange", "my-key", "my second message"); // message counter is now 2, will call onAck/onNack with deliverTag=2 + +}).onAck([&](uint64_t deliverTag, bool multiple) { + // deliverTag is message number + // multiple is set to true, if all messages UP TO deliverTag have been processed +}).onNack([&](uint64 deliveryTag, bool multiple, bool requeue) { + // deliverTag is message number + // multiple is set to true, if all messages UP TO deliverTag have not been processed + // requeue is to be ignored }); + ```` For more information, please see http://www.rabbitmq.com/confirms.html. From cab0602dbf4750a1705d2eb7d5dac87cfbc42609 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 22:36:49 +0200 Subject: [PATCH 19/19] Forgot to set ChannelImpl::_confirm --- src/channelimpl.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 7e79149..c024bc7 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -193,13 +193,13 @@ DeferredConfirm &ChannelImpl::confirmSelect() ConfirmSelectFrame frame(_id); // send the frame, and create deferred object - auto deferred = std::make_shared(!send(frame)); + _confirm = std::make_shared(!send(frame)); // push to list - push(deferred); + push(_confirm); // done - return *deferred; + return *_confirm; } /**