diff --git a/README.md b/README.md index 114ad50..18e832a 100644 --- a/README.md +++ b/README.md @@ -974,6 +974,42 @@ knows in the database world. It is not possible to wrap all sort of operations in a transaction, they are only meaningful for publishing and consuming. +PUBLISHER CONFIRMS +=================== + +RabbitMQ supports lightweight method of confirming that broker received and processed +a message. For this method to work, the channel needs to be put in so-called _confirm mode_. +This is done using confirmSelect() method. When channel is successfully put in +confirm mode, the server and client count messages (starting from 1) and server sends +acknowledgments for every message it processed (it can also acknowledge multiple message at +once). + +If server is unable to process a message, it will send send negative acknowledgments. Both +positive and negative acknowledgments handling are implemented as callbacks for Channel object. + +````c++ +// setup confirm mode and ack/nack callbacks +channel.confirmSelect().onSuccess([&]() { + // from this moment onwards ack/nack confirmations are comming in + + channel.publish("my-exchange", "my-key", "my first message"); + // message counter is now 1, will call onAck/onNack with deliverTag=1 + + channel.publish("my-exchange", "my-key", "my second message"); + // message counter is now 2, will call onAck/onNack with deliverTag=2 + +}).onAck([&](uint64_t deliverTag, bool multiple) { + // deliverTag is message number + // multiple is set to true, if all messages UP TO deliverTag have been processed +}).onNack([&](uint64 deliveryTag, bool multiple, bool requeue) { + // deliverTag is message number + // multiple is set to true, if all messages UP TO deliverTag have not been processed + // requeue is to be ignored +}); + +```` + +For more information, please see http://www.rabbitmq.com/confirms.html. CONSUMING MESSAGES ================== @@ -1107,7 +1143,6 @@ need additional attention: - ability to set up secure connections (or is this fully done on the IO level) - login with other protocols than login/password - - publish confirms We also need to add more safety checks so that strange or invalid data from RabbitMQ does not break the library (although in reality RabbitMQ only sends diff --git a/include/amqpcpp.h b/include/amqpcpp.h index 8079b1f..3c5de03 100644 --- a/include/amqpcpp.h +++ b/include/amqpcpp.h @@ -70,6 +70,7 @@ #include "amqpcpp/deferredqueue.h" #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h" +#include "amqpcpp/deferredconfirm.h" #include "amqpcpp/deferredget.h" #include "amqpcpp/deferredpublisher.h" #include "amqpcpp/channelimpl.h" diff --git a/include/amqpcpp/callbacks.h b/include/amqpcpp/callbacks.h index a2d5ec4..76ac090 100644 --- a/include/amqpcpp/callbacks.h +++ b/include/amqpcpp/callbacks.h @@ -77,6 +77,13 @@ using ReturnedCallback = std::function; using MessageCallback = std::function; using BounceCallback = std::function; +/** + * When using publisher confirms, AckCallback is called when server confirms that message is received + * and processed. NackCallback is called otherwise. + */ +using AckCallback = std::function; +using NackCallback = std::function; + /** * End namespace */ diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index 2d484ca..253b68c 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -127,6 +127,17 @@ public: return _implementation->connected(); } + /** + * Put channel in a confirm mode (RabbitMQ specific) + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + */ + DeferredConfirm &confirmSelect() + { + return _implementation->confirmSelect(); + } + /** * Start a transaction * diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 40d0603..d3ef475 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -42,6 +42,7 @@ class ConsumedMessage; class ConnectionImpl; class DeferredDelete; class DeferredCancel; +class DeferredConfirm; class DeferredQueue; class DeferredGet; class DeferredPublisher; @@ -80,6 +81,12 @@ private: */ std::shared_ptr _publisher; + /** + * Handler that deals with publisher confirms frames + * @var std::shared_ptr + */ + std::shared_ptr _confirm; + /** * Handlers for all consumers that are active * @var std::map @@ -252,6 +259,11 @@ public: return _state == state_connected || _state == state_ready; } + /** + * Put channel in a confirm mode (RabbitMQ specific) + */ + DeferredConfirm &confirmSelect(); + /** * Start a transaction */ @@ -715,6 +727,12 @@ public: */ DeferredPublisher *publisher() const { return _publisher.get(); } + /** + * Retrieve the deferred confirm that handles publisher confirms + * @return The deferred confirm object + */ + DeferredConfirm *confirm() const { return _confirm.get(); } + /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/include/amqpcpp/classes.h b/include/amqpcpp/classes.h index 493cc12..0cbaafb 100644 --- a/include/amqpcpp/classes.h +++ b/include/amqpcpp/classes.h @@ -24,6 +24,8 @@ class BasicDeliverFrame; class BasicGetOKFrame; class BasicHeaderFrame; class BasicReturnFrame; +class BasicAckFrame; +class BasicNackFrame; class BodyFrame; class Channel; class Connection; diff --git a/include/amqpcpp/deferredconfirm.h b/include/amqpcpp/deferredconfirm.h new file mode 100644 index 0000000..fc89a06 --- /dev/null +++ b/include/amqpcpp/deferredconfirm.h @@ -0,0 +1,118 @@ +/** + * DeferredConfirm.h + * + * Deferred callback for RabbitMQ-specific publisher confirms mechanism. + * + * @author Marcin Gibula + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * We extend from the default deferred and add extra functionality + */ +class DeferredConfirm : public Deferred +{ +private: + /** + * Callback to execute when server confirms that message is processed + * @var AckCallback + */ + AckCallback _ackCallback; + + /** + * Callback to execute when server sends negative acknowledgement + * @var NackCallback + */ + NackCallback _nackCallback; + + /** + * Process an ACK frame + * + * @param frame The frame to process + */ + void process(BasicAckFrame &frame); + + /** + * Process an ACK frame + * + * @param frame The frame to process + */ + void process(BasicNackFrame &frame); + + /** + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + friend class BasicAckFrame; + friend class BasicNackFrame; + +public: + /** + * Protected constructor that can only be called + * from within the channel implementation + * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. + * + * @param boolean are we already failed? + */ + DeferredConfirm(bool failed = false) : Deferred(failed) {} + +public: + /** + * Register the function that is called when channel is put in publisher + * confirmed mode + * @param callback + */ + DeferredConfirm &onSuccess(const SuccessCallback &callback) + { + // call base + Deferred::onSuccess(callback); + + // allow chaining + return *this; + } + + /** + * Callback that is called when the broker confirmed message publication + * @param callback the callback to execute + */ + DeferredConfirm &onAck(const AckCallback &callback) + { + // store callback + _ackCallback = callback; + + // allow chaining + return *this; + } + + /** + * Callback that is called when the broker denied message publication + * @param callback the callback to execute + */ + DeferredConfirm &onNack(const NackCallback &callback) + { + // store callback + _nackCallback = callback; + + // allow chaining + return *this; + } +}; + +/** + * End namespace + */ +} diff --git a/include/amqpcpp/receivedframe.h b/include/amqpcpp/receivedframe.h index 7c1e1d5..98b9272 100644 --- a/include/amqpcpp/receivedframe.h +++ b/include/amqpcpp/receivedframe.h @@ -111,6 +111,13 @@ private: */ bool processBasicFrame(ConnectionImpl *connection); + /** + * Process a confirm frame + * @param connection + * @return bool + */ + bool processConfirmFrame(ConnectionImpl *connection); + /** * Process a transaction frame * @param connection diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a16912f..403993b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -29,6 +29,8 @@ add_sources( channelimpl.cpp channelopenframe.h channelopenokframe.h + confirmselectframe.h + confirmselectokframe.h connectioncloseframe.h connectioncloseokframe.h connectionframe.h @@ -43,6 +45,7 @@ add_sources( connectiontuneokframe.h consumedmessage.h deferredcancel.cpp + deferredconfirm.cpp deferredconsumer.cpp deferredreceiver.cpp deferredextreceiver.cpp diff --git a/src/basicackframe.h b/src/basicackframe.h index 1f3c577..514d693 100644 --- a/src/basicackframe.h +++ b/src/basicackframe.h @@ -110,6 +110,32 @@ public: { return _multiple.get(0); } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + auto channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // get the current confirm + auto confirm = channel->confirm(); + + // if there is no deferred confirm, we can just as well stop + if (confirm == nullptr) return false; + + // process the frame + confirm->process(*this); + + // done + return true; + } }; /** diff --git a/src/basicnackframe.h b/src/basicnackframe.h index 89344fe..589fb15 100644 --- a/src/basicnackframe.h +++ b/src/basicnackframe.h @@ -108,6 +108,32 @@ public: { return _bits.get(1); } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + auto channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // get the current confirm + auto confirm = channel->confirm(); + + // if there is no deferred confirm, we can just as well stop + if (confirm == nullptr) return false; + + // process the frame + confirm->process(*this); + + // done + return true; + } }; /** diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 053e479..c024bc7 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -14,6 +14,7 @@ #include "channelflowframe.h" #include "channelcloseokframe.h" #include "channelcloseframe.h" +#include "confirmselectframe.h" #include "transactionselectframe.h" #include "transactioncommitframe.h" #include "transactionrollbackframe.h" @@ -180,6 +181,27 @@ Deferred &ChannelImpl::resume() return push(ChannelFlowFrame(_id, true)); } +/** + * Put channel in a confirm mode + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + */ +DeferredConfirm &ChannelImpl::confirmSelect() +{ + // the frame to send + ConfirmSelectFrame frame(_id); + + // send the frame, and create deferred object + _confirm = std::make_shared(!send(frame)); + + // push to list + push(_confirm); + + // done + return *_confirm; +} + /** * Start a transaction * diff --git a/src/confirmframe.h b/src/confirmframe.h new file mode 100644 index 0000000..02377f9 --- /dev/null +++ b/src/confirmframe.h @@ -0,0 +1,56 @@ +/** + * Class describing an AMQP confirm frame + * + * @author Marcin Gibula + * @copyright 2017 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class implementation + */ +class ConfirmFrame : public MethodFrame +{ +protected: + /** + * Constructor + * @param channel channel identifier + * @param size frame size + */ + ConfirmFrame(uint16_t channel, uint32_t size) : + MethodFrame(channel, size) + {} + + /** + * Constructor based on incoming frame + * @param frame + */ + ConfirmFrame(ReceivedFrame &frame) : + MethodFrame(frame) + {} + +public: + /** + * Destructor + */ + virtual ~ConfirmFrame() {} + + /** + * Class id + * @return uint16_t + */ + virtual uint16_t classID() const override + { + return 85; + } +}; + +/** + * end namespace + */ +} + diff --git a/src/confirmselectframe.h b/src/confirmselectframe.h new file mode 100644 index 0000000..26ef3ba --- /dev/null +++ b/src/confirmselectframe.h @@ -0,0 +1,88 @@ +/** + * Class describing an AMQP confirm select frame + * + * @author Marcin Gibula + * @copyright 2017 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class implementation + */ +class ConfirmSelectFrame : public ConfirmFrame +{ +private: + + /** + * whether to wait for a response + * @var BooleanSet + */ + BooleanSet _noWait; + +protected: + /** + * Encode a frame on a string buffer + * + * @param buffer buffer to write frame to + */ + virtual void fill(OutBuffer& buffer) const override + { + // call base + ConfirmFrame::fill(buffer); + + // add boolean + _noWait.fill(buffer); + } + +public: + /** + * Decode a confirm select frame from a received frame + * + * @param frame received frame to decode + */ + ConfirmSelectFrame(ReceivedFrame& frame) : ConfirmFrame(frame), _noWait(frame) {} + + /** + * Construct a confirm select frame + * + * @param channel channel identifier + * @return newly created confirm select frame + */ + ConfirmSelectFrame(uint16_t channel, bool noWait = false) : + ConfirmFrame(channel, 1), //sizeof bool + _noWait(noWait) + {} + + /** + * Destructor + */ + virtual ~ConfirmSelectFrame() {} + + /** + * return the method id + * @return uint16_t + */ + virtual uint16_t methodID() const override + { + return 10; + } + + /** + * Return whether to wait for a response + * @return boolean + */ + bool noWait() const + { + return _noWait.get(0); + } +}; + +/** + * end namespace + */ +} + diff --git a/src/confirmselectokframe.h b/src/confirmselectokframe.h new file mode 100644 index 0000000..4a20992 --- /dev/null +++ b/src/confirmselectokframe.h @@ -0,0 +1,89 @@ +/** + * Class describing an AMQP confirm select ok frame + * + * @author Marcin Gibula + * @copyright 2017 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class implementation + */ +class ConfirmSelectOKFrame : public ConfirmFrame +{ +protected: + /** + * Encode a frame on a string buffer + * + * @param buffer buffer to write frame to + */ + virtual void fill(OutBuffer& buffer) const override + { + // call base + ConfirmFrame::fill(buffer); + } + +public: + /** + * Constructor for an incoming frame + * + * @param frame received frame to decode + */ + ConfirmSelectOKFrame(ReceivedFrame& frame) : + ConfirmFrame(frame) + {} + + /** + * Construct a confirm select ok frame + * + * @param channel channel identifier + * @return newly created confirm select ok frame + */ + ConfirmSelectOKFrame(uint16_t channel) : + ConfirmFrame(channel, 0) + {} + + /** + * Destructor + */ + virtual ~ConfirmSelectOKFrame() {} + + /** + * return the method id + * @return uint16_t + */ + virtual uint16_t methodID() const override + { + return 11; + } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + auto channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report that the channel is open + if (channel->reportSuccess()) channel->onSynchronized(); + + // done + return true; + } +}; + +/** + * end namespace + */ +} + diff --git a/src/deferredconfirm.cpp b/src/deferredconfirm.cpp new file mode 100644 index 0000000..7a1ce22 --- /dev/null +++ b/src/deferredconfirm.cpp @@ -0,0 +1,42 @@ +/** + * DeferredConfirm.cpp + * + * Implementation file for the DeferredConfirm class + * + * @author Marcin Gibula + * @copyright 2018 Copernica BV + */ +#include "includes.h" +#include "basicackframe.h" +#include "basicnackframe.h" + +/** + * Namespace + */ +namespace AMQP { + +/** + * Process an ACK frame + * + * @param frame The frame to process + */ +void DeferredConfirm::process(BasicAckFrame &frame) +{ + if (_ackCallback) _ackCallback(frame.deliveryTag(), frame.multiple()); +} + +/** + * Process a NACK frame + * + * @param frame The frame to process + */ +void DeferredConfirm::process(BasicNackFrame &frame) +{ + if (_nackCallback) _nackCallback(frame.deliveryTag(), frame.multiple(), frame.requeue()); +} + +/** + * End namespace + */ +} + diff --git a/src/includes.h b/src/includes.h index 89ae392..2cc31bd 100644 --- a/src/includes.h +++ b/src/includes.h @@ -72,6 +72,7 @@ #include "amqpcpp/deferredqueue.h" #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h" +#include "amqpcpp/deferredconfirm.h" #include "amqpcpp/deferredget.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" @@ -93,6 +94,7 @@ #include "exchangeframe.h" #include "queueframe.h" #include "basicframe.h" +#include "confirmframe.h" #include "transactionframe.h" diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index 31406f4..83b83aa 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -7,6 +7,8 @@ */ #include "includes.h" #include "heartbeatframe.h" +#include "confirmselectframe.h" +#include "confirmselectokframe.h" #include "connectionstartokframe.h" #include "connectionstartframe.h" #include "connectionsecureframe.h" @@ -336,6 +338,7 @@ bool ReceivedFrame::processMethodFrame(ConnectionImpl *connection) case 40: return processExchangeFrame(connection); case 50: return processQueueFrame(connection); case 60: return processBasicFrame(connection); + case 85: return processConfirmFrame(connection); case 90: return processTransactionFrame(connection); } @@ -493,6 +496,27 @@ bool ReceivedFrame::processBasicFrame(ConnectionImpl *connection) throw ProtocolException("unrecognized basic frame method " + std::to_string(methodID)); } +/** + * Process a confirm frame + * @param connection + * @return bool + */ +bool ReceivedFrame::processConfirmFrame(ConnectionImpl *connection) +{ + // read the method id + uint16_t methodID = nextUint16(); + + // construct frame based on method id + switch (methodID) + { + case 10: return ConfirmSelectFrame(*this).process(connection); + case 11: return ConfirmSelectOKFrame(*this).process(connection); + } + + // this is a problem + throw ProtocolException("unrecognized confirm frame method " + std::to_string(methodID)); +} + /** * Process a transaction frame * @param connection