From 2262d9293890bcd1578663632614a022483e2f1b Mon Sep 17 00:00:00 2001 From: Michael van der Werve Date: Wed, 21 Oct 2020 10:36:39 +0200 Subject: [PATCH] {auto} disconnected explicit requirement of reliable and throttle --- include/amqpcpp.h | 3 +- include/amqpcpp/channel.h | 2 +- include/amqpcpp/confirmed.h | 91 +++++++--- include/amqpcpp/deferred.h | 2 +- include/amqpcpp/deferredpublish.h | 3 +- include/amqpcpp/reliable.h | 272 ++++++++++++++++++++++++++++++ include/amqpcpp/throttle.h | 77 ++------- src/Makefile | 2 +- src/confirmed.cpp | 264 ++++++++++++++--------------- src/includes.h | 1 + src/throttle.cpp | 155 ++++------------- 11 files changed, 514 insertions(+), 358 deletions(-) create mode 100644 include/amqpcpp/reliable.h diff --git a/include/amqpcpp.h b/include/amqpcpp.h index a3ae8b6..feb9d7a 100644 --- a/include/amqpcpp.h +++ b/include/amqpcpp.h @@ -76,8 +76,9 @@ #include "amqpcpp/deferredrecall.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" -#include "amqpcpp/throttle.h" #include "amqpcpp/confirmed.h" +#include "amqpcpp/throttle.h" +#include "amqpcpp/reliable.h" #include "amqpcpp/login.h" #include "amqpcpp/address.h" #include "amqpcpp/connectionhandler.h" diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index ebc0313..295c6e4 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -604,7 +604,7 @@ public: /** * Some internal classes may touch our implementation */ - friend class Throttle; + friend class Confirmed; }; /** diff --git a/include/amqpcpp/confirmed.h b/include/amqpcpp/confirmed.h index 47b02e6..974bbef 100644 --- a/include/amqpcpp/confirmed.h +++ b/include/amqpcpp/confirmed.h @@ -1,8 +1,7 @@ /** * Confirmed.h * - * A channel wrapper based on AMQP::Throttle that allows message callbacks to be installed - * on the publishes, to be called when they are confirmed by the message broker. + * Base class that enables publisher confirms and keeps track of the sent messages. * * @author Michael van der Werve * @copyright 2020 Copernica BV @@ -27,37 +26,63 @@ namespace AMQP { /** * Class definition */ -class Confirmed : public Throttle, private Watchable +class Confirmed : public Watchable { -private: +protected: /** - * Set of open deliverytags. We want a normal set (not unordered_set) because - * removal will be cheaper for whole ranges. - * @var size_t + * The implementation for the channel + * @var std::shared_ptr */ - std::map> _handlers; + std::shared_ptr _implementation; /** - * Called when the deliverytag(s) are acked/nacked + * Current id, always starts at 1. + * @var uint64_t + */ + uint64_t _current = 1; + + /** + * Deferred to set up on the close + * @var std::shared_ptr + */ + std::shared_ptr _close; + + /** + * Callback to call when an error occurred + * @var ErrorCallback + */ + ErrorCallback _errorCallback; + + +protected: + /** + * Send method for a frame + * @param id + * @param frame + */ + virtual bool send(uint64_t id, const Frame &frame); + + /** + * Method that is called to report an error. + * @param message + */ + virtual void reportError(const char *message); + + /** + * Method that gets called on ack/nack. If these methods are overridden, make sure + * to also call the base class methods. * @param deliveryTag * @param multiple */ - virtual void onAck(uint64_t deliveryTag, bool multiple) override; - virtual void onNack(uint64_t deliveryTag, bool multiple) override; - - /** - * Method that is called to report an error - * @param message - */ - virtual void reportError(const char *message) override; + virtual void onAck(uint64_t deliveryTag, bool multiple); + virtual void onNack(uint64_t deliveryTag, bool multiple); public: /** * Constructor * @param channel - * @param throttle */ - Confirmed(AMQP::Channel &channel, size_t throttle) : Throttle(channel, throttle) {} + Confirmed(AMQP::Channel &channel); /** * Deleted copy constructor, deleted move constructor @@ -78,6 +103,12 @@ public: */ virtual ~Confirmed() = default; + /** + * Method to check if there is anything still waiting + * @return bool + */ + virtual bool waiting() const { return false; } + /** * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. @@ -88,12 +119,24 @@ public: * @param message the message to send * @param size size of the message * @param flags optional flags - * @return bool + * @return uint64_t */ - DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0); - DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } - DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); } - DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } + uint64_t publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0); + uint64_t publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } + uint64_t publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); } + uint64_t publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } + + /** + * Close underlying channel + * @return Deferred& + */ + Deferred &close(); + + /** + * Install an error callback + * @param callback + */ + void onError(const ErrorCallback &callback); }; /** diff --git a/include/amqpcpp/deferred.h b/include/amqpcpp/deferred.h index 05eb530..2e4142c 100644 --- a/include/amqpcpp/deferred.h +++ b/include/amqpcpp/deferred.h @@ -193,7 +193,7 @@ protected: * private members and construct us */ friend class ChannelImpl; - friend class Throttle; + friend class Confirmed; public: /** diff --git a/include/amqpcpp/deferredpublish.h b/include/amqpcpp/deferredpublish.h index 3775c72..28a8fea 100644 --- a/include/amqpcpp/deferredpublish.h +++ b/include/amqpcpp/deferredpublish.h @@ -82,7 +82,8 @@ private: * The wrapped confirmed channel implementation may call our * private members and construct us */ - friend class Confirmed; + template + friend class Reliable; public: diff --git a/include/amqpcpp/reliable.h b/include/amqpcpp/reliable.h new file mode 100644 index 0000000..ecd0100 --- /dev/null +++ b/include/amqpcpp/reliable.h @@ -0,0 +1,272 @@ +/** + * Reliable.h + * + * A channel wrapper based on AMQP::Throttle that allows message callbacks to be installed + * on the publishes, to be called when they are confirmed by the message broker. + * + * @author Michael van der Werve + * @copyright 2020 Copernica BV + */ + +/** + * Header guard + */ +#pragma once + +/** + * Includes + */ +#include "deferredpublish.h" +#include "confirmed.h" +#include + +/** + * Begin of namespaces + */ +namespace AMQP { + +/** + * Class definition + */ +template +class Reliable : public BASE +{ +private: + // make sure it is a proper channel + static_assert(std::is_base_of::value, "base should be derived from a confirmed channel."); + + /** + * Set of open deliverytags. We want a normal set (not unordered_set) because + * removal will be cheaper for whole ranges. + * @var size_t + */ + std::map> _handlers; + + /** + * Called when the deliverytag(s) are acked + * @param deliveryTag + * @param multiple + */ + void onAck(uint64_t deliveryTag, bool multiple) override + { + // monitor the object, watching for destruction since these ack/nack handlers + // could destruct the object + Monitor monitor(this); + + // single element is simple + if (!multiple) + { + // find the element + auto iter = _handlers.find(deliveryTag); + + // we did not find it (this should not be possible, unless somebody explicitly called) + // the base-class publish methods for some reason. + if (iter == _handlers.end()) return BASE::onAck(deliveryTag, multiple); + + // call the ack handler + iter->second->reportAck(); + + // if the monitor is no longer valid, we stop (we're done) + if (!monitor) return; + + // erase it from the map + _handlers.erase(iter); + } + + // do multiple at once + else + { + // find the last element, inclusive + auto upper = _handlers.upper_bound(deliveryTag); + + // call the handlers + for (auto iter = _handlers.begin(); iter != upper; iter++) + { + // call the handler + iter->second->reportAck(); + + // if we were destructed in the meantime, we leap out + if (!monitor) return; + } + + // erase all acknowledged items + _handlers.erase(_handlers.begin(), upper); + } + + // make sure the object is still valid + if (!monitor) return; + + // call base handler as well + BASE::onAck(deliveryTag, multiple); + } + + /** + * Called when the deliverytag(s) are nacked + * @param deliveryTag + * @param multiple + */ + void onNack(uint64_t deliveryTag, bool multiple) override + { + // monitor the object, watching for destruction since these ack/nack handlers + // could destruct the object + Monitor monitor(this); + + // single element is simple + if (!multiple) + { + // find the element + auto iter = _handlers.find(deliveryTag); + + // we did not find it (this should not be possible, unless somebody explicitly called) + // the base-class publish methods for some reason. + if (iter == _handlers.end()) return BASE::onNack(deliveryTag, multiple); + + // call the ack handler + iter->second->reportNack(); + + // if the monitor is no longer valid, we stop (we're done) + if (!monitor) return; + + // erase it from the map + _handlers.erase(iter); + } + + // nack multiple elements + else + { + // find the last element, inclusive + auto upper = _handlers.upper_bound(deliveryTag); + + // call the handlers + for (auto iter = _handlers.begin(); iter != upper; iter++) + { + // call the handler + iter->second->reportNack(); + + // if we were destructed in the meantime, we leap out + if (!monitor) return; + } + + // erase all acknowledged items + _handlers.erase(_handlers.begin(), upper); + } + + // if the object is no longer valid, return + if (!monitor) return; + + // call the base handler + BASE::onNack(deliveryTag, multiple); + } + + /** + * Method that is called to report an error + * @param message + */ + void reportError(const char *message) override + { + // monitor the object, watching for destruction since these ack/nack handlers + // could destruct the object + Monitor monitor(this); + + // move the handlers out + auto handlers = std::move(_handlers); + + // iterate over all the messages + // call the handlers + for (const auto &iter : handlers) + { + // call the handler + iter.second->reportError(message); + + // if we were destructed in the meantime, we leap out + if (!monitor) return; + } + + // if the monitor is no longer valid, leap out + if (!monitor) return; + + // call the base handler + BASE::reportError(message); + } + +public: + /** + * Constructor + * @param channel + * @param throttle + */ + template + Reliable(Args &&...args) : BASE(std::forward(args)...) {} + + /** + * Deleted copy constructor, deleted move constructor + * @param other + */ + Reliable(const Reliable &other) = delete; + Reliable(Reliable &&other) = delete; + + /** + * Deleted copy assignment, deleted move assignment + * @param other + */ + Reliable &operator=(const Reliable &other) = delete; + Reliable &operator=(Reliable &&other) = delete; + + /** + * Virtual destructor + */ + virtual ~Reliable() = default; + + /** + * Method that can be accessed to check if there are still buffered messages. + * @return bool + */ + virtual bool waiting() const override { return _handlers.size() > 0 || BASE::waiting(); } + + /** + * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. + * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. + * + * @param exchange the exchange to publish to + * @param routingkey the routing key + * @param envelope the full envelope to send + * @param message the message to send + * @param size size of the message + * @param flags optional flags + * @return bool + */ + DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } + DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); } + DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } + + /** + * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. + * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. + * + * @param exchange the exchange to publish to + * @param routingkey the routing key + * @param envelope the full envelope to send + * @param message the message to send + * @param size size of the message + * @param flags optional flags + */ + DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) + { + // publish the entire thing, and remember if it failed at any point + uint64_t tag = BASE::publish(exchange, routingKey, envelope, flags); + + // create the publish deferred object, if we got no tag we failed + auto handler = std::make_shared(tag == 0); + + // add it to the open handlers + _handlers[tag] = handler; + + // return the dereferenced handler + return *handler; + } +}; + +/** + * End of namespaces + */ +} diff --git a/include/amqpcpp/throttle.h b/include/amqpcpp/throttle.h index b2bc4ee..e48fe93 100644 --- a/include/amqpcpp/throttle.h +++ b/include/amqpcpp/throttle.h @@ -20,6 +20,7 @@ #include #include "copiedbuffer.h" #include "channelimpl.h" +#include "confirmed.h" /** * Begin of namespaces @@ -34,21 +35,9 @@ class Channel; /** * Class definition */ -class Throttle +class Throttle : public Confirmed { protected: - /** - * The implementation for the channel - * @var std::shared_ptr - */ - std::shared_ptr _implementation; - - /** - * Current id, always starts at 1. - * @var uint64_t - */ - uint64_t _current = 1; - /** * Last sent ID * @var uint64_t @@ -74,39 +63,28 @@ protected: */ std::set _open; - /** - * Deferred to set up on the close - * @var std::shared_ptr - */ - std::shared_ptr _close; - - /** - * Callback to call when an error occurred - * @var ErrorCallback - */ - ErrorCallback _errorCallback; +protected: /** * Send method for a frame * @param id * @param frame */ - bool send(uint64_t id, const Frame &frame); + virtual bool send(uint64_t id, const Frame &frame) override; /** * Method that is called to report an error * @param message */ - virtual void reportError(const char *message); + virtual void reportError(const char *message) override; -protected: /** - * Called when the deliverytag(s) are acked/nacked + * Method that is called to report an ack/nack * @param deliveryTag * @param multiple */ - virtual void onAck(uint64_t deliveryTag, bool multiple); - virtual void onNack(uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); } + virtual void onAck(uint64_t deliveryTag, bool multiple) override; + virtual void onNack(uint64_t deliveryTag, bool multiple) override; public: /** @@ -139,33 +117,10 @@ public: virtual ~Throttle() = default; /** - * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. - * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. - * - * @param exchange the exchange to publish to - * @param routingkey the routing key - * @param envelope the full envelope to send - * @param message the message to send - * @param size size of the message - * @param flags optional flags + * Method that can be accessed to check if there are still buffered messages. * @return bool */ - bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0); - bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } - - /** - * Get the number of messages that are waiting to be published - * @return uint64_t - */ - size_t waiting() const { return _current - _last - 1; } - - /** - * Number of messages already sent but unacknowledged by rabbit - * @return size_t - */ - size_t unacknowledged() const { return _open.size(); } + virtual bool waiting() const override { return _queue.size() > 0; } /** * Get the throttle @@ -186,18 +141,6 @@ public: * @param max optional maximum, 0 is flush all */ size_t flush(size_t max = 0); - - /** - * Close the throttle channel (closes the underlying channel when all messages have been sent) - * @return Deferred& - */ - Deferred &close(); - - /** - * Install an error callback - * @param callback - */ - void onError(const ErrorCallback &callback); }; /** diff --git a/src/Makefile b/src/Makefile index 3471fd5..aeabb51 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,6 +1,6 @@ CPP = g++ RM = rm -f -CPPFLAGS = -Wall -c -I../include -std=c++11 -MD +CPPFLAGS = -Wall -c -I../include -std=c++11 -MD -Wno-class-conversion LD = g++ LD_FLAGS = -Wall -shared SHARED_LIB = lib$(LIBRARY_NAME).so.$(VERSION) diff --git a/src/confirmed.cpp b/src/confirmed.cpp index 062e36e..5ace838 100644 --- a/src/confirmed.cpp +++ b/src/confirmed.cpp @@ -11,71 +11,58 @@ * Includes */ #include "includes.h" +#include "basicpublishframe.h" +#include "basicheaderframe.h" +#include "bodyframe.h" /** * Begin of namespaces */ namespace AMQP { +/** + * Constructor + * @param channel + */ +Confirmed::Confirmed(Channel &channel) : _implementation(channel._implementation) +{ + // activate confirm-select mode + auto &deferred = channel.confirmSelect() + .onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); }) + .onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onNack(deliveryTag, multiple); }); + + // we might have failed, in which case we throw + if (!deferred) throw std::runtime_error("could not enable publisher confirms"); + + // we wrap a handling error callback that calls our member function + _implementation->onError([this](const char *message) { reportError(message); }); +} + +/** + * Send method for a frame + * @param id + * @param frame + */ +bool Confirmed::send(uint64_t id, const Frame &frame) +{ + // we're simply going to send it over the channel directly + return _implementation->send(frame); +} + /** * Called when the deliverytag(s) are acked * @param deliveryTag * @param multiple */ -void Confirmed::onAck(uint64_t deliveryTag, bool multiple) +void Confirmed::onAck(uint64_t deliveryTag, bool multiple) { - // monitor the object, watching for destruction since these ack/nack handlers - // could destruct the object - Monitor monitor(this); + // leap out if there are still messages or we shouldn't close yet + if (!_close || waiting()) return; - // single element is simple - if (!multiple) - { - // find the element - auto iter = _handlers.find(deliveryTag); - - // we did not find it (this should not be possible, unless somebody explicitly called) - // the base-class publish methods for some reason. - if (iter == _handlers.end()) return Throttle::onAck(deliveryTag, multiple); - - // call the ack handler - iter->second->reportAck(); - - // if the monitor is no longer valid, we stop (we're done) - if (!monitor) return; - - // erase it from the map - _handlers.erase(iter); - } - - // do multiple at once - else - { - // find the last element, inclusive - auto upper = _handlers.upper_bound(deliveryTag); - - // call the handlers - for (auto iter = _handlers.begin(); iter != upper; iter++) - { - // call the handler - iter->second->reportAck(); - - // if we were destructed in the meantime, we leap out - if (!monitor) return; - } - - // erase all acknowledged items - _handlers.erase(_handlers.begin(), upper); - } - - // make sure the object is still valid - if (!monitor) return; - - // call base handler, will advance on the throttle if needed. we call this _after_ we're - // done processing the callbacks, since one of the callbacks might close the channel, or publish - // more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing - // for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed - Throttle::onAck(deliveryTag, multiple); + // close the channel, and forward the callbacks to the installed handler + _implementation->close() + .onSuccess([this]() { _close->reportSuccess(); }) + .onError([this](const char *message) { _close->reportError(message); }); } /** @@ -85,58 +72,13 @@ void Confirmed::onAck(uint64_t deliveryTag, bool multiple) */ void Confirmed::onNack(uint64_t deliveryTag, bool multiple) { - // monitor the object, watching for destruction since these ack/nack handlers - // could destruct the object - Monitor monitor(this); + // leap out if there are still messages or we shouldn't close yet + if (!_close || waiting()) return; - // single element is simple - if (!multiple) - { - // find the element - auto iter = _handlers.find(deliveryTag); - - // we did not find it (this should not be possible, unless somebody explicitly called) - // the base-class publish methods for some reason. - if (iter == _handlers.end()) return Throttle::onNack(deliveryTag, multiple); - - // call the ack handler - iter->second->reportNack(); - - // if the monitor is no longer valid, we stop (we're done) - if (!monitor) return; - - // erase it from the map - _handlers.erase(iter); - } - - // nack multiple elements - else - { - // find the last element, inclusive - auto upper = _handlers.upper_bound(deliveryTag); - - // call the handlers - for (auto iter = _handlers.begin(); iter != upper; iter++) - { - // call the handler - iter->second->reportNack(); - - // if we were destructed in the meantime, we leap out - if (!monitor) return; - } - - // erase all acknowledged items - _handlers.erase(_handlers.begin(), upper); - } - - // make sure the object is still valid - if (!monitor) return; - - // call base handler, will advance on the throttle if needed. we call this _after_ we're - // done processing the callbacks, since one of the callbacks might close the channel, or publish - // more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing - // for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed - Throttle::onNack(deliveryTag, multiple); + // close the channel, and forward the callbacks to the installed handler + _implementation->close() + .onSuccess([this]() { _close->reportSuccess(); }) + .onError([this](const char *message) { _close->reportError(message); }); } /** @@ -145,29 +87,11 @@ void Confirmed::onNack(uint64_t deliveryTag, bool multiple) */ void Confirmed::reportError(const char *message) { - // monitor the object, watching for destruction since these ack/nack handlers - // could destruct the object - Monitor monitor(this); + // reset tracking, since channel is fully broken + _current = 1; - // move the handlers out - auto handlers = std::move(_handlers); - - // iterate over all the messages - // call the handlers - for (const auto &iter : handlers) - { - // call the handler - iter.second->reportError(message); - - // if we were destructed in the meantime, we leap out - if (!monitor) return; - } - - // if the monitor is no longer valid, leap out - if (!monitor) return; - - // call base class to let it handle the errors - Throttle::reportError(message); + // if a callback is set, call the handler with the message + if (_errorCallback) _errorCallback(message); } /** @@ -180,23 +104,91 @@ void Confirmed::reportError(const char *message) * @param message the message to send * @param size size of the message * @param flags optional flags + * @return uint64_t */ -DeferredPublish &Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) +uint64_t Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) { - // copy the current identifier, this will be the ID that will come back - auto current = _current; + // @todo do not copy the entire buffer to individual frames - // publish the entire thing, and remember if it failed at any point - bool failed = !Throttle::publish(exchange, routingKey, envelope, flags); + // fail if we're closing the channel, no more publishes allowed + if (_close) return false; - // create the open - auto handler = std::make_shared(failed); + // send the publish frame + if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false; - // add it to the open handlers - _handlers[current] = handler; + // send header + if (!send(_current, BasicHeaderFrame(_implementation->id(), envelope))) return false; - // return the dereferenced handler - return *handler; + // connection and channel still usable? + if (!_implementation->usable()) return false; + + // the max payload size is the max frame size minus the bytes for headers and trailer + uint32_t maxpayload = _implementation->maxPayload(); + uint64_t bytessent = 0; + + // the buffer + const char *data = envelope.body(); + uint64_t bytesleft = envelope.bodySize(); + + // split up the body in multiple frames depending on the max frame size + while (bytesleft > 0) + { + // size of this chunk + uint64_t chunksize = std::min(static_cast(maxpayload), bytesleft); + + // send out a body frame + if (!send(_current, BodyFrame(_implementation->id(), data + bytessent, (uint32_t)chunksize))) return false; + + // update counters + bytessent += chunksize; + bytesleft -= chunksize; + } + + // we succeeded + return _current++; +} + +/** + * Close the throttle channel (closes the underlying channel) + * @return Deferred& + */ +Deferred &Confirmed::close() +{ + // if this was already set to be closed, return that + if (_close) return *_close; + + // create the deferred + _close = std::make_shared(_implementation->usable()); + + // if there are open messages or there is a queue, they will still get acked and we will then forward it + if (waiting()) return *_close; + + // there are no open messages, we can close the channel directly. + _implementation->close() + .onSuccess([this]() { _close->reportSuccess(); }) + .onError([this](const char *message) { _close->reportError(message); }); + + // return the created deferred + return *_close; +} + +/** + * Install an error callback + * @param callback + */ +void Confirmed::onError(const ErrorCallback &callback) +{ + // we store the callback + _errorCallback = callback; + + // check the callback + if (!callback) return; + + // if the channel is no longer usable, report that + if (!_implementation->usable()) return callback("Channel is no longer usable"); + + // specify that we're already closing + if (_close) callback("Wrapped channel is closing down"); } /** diff --git a/src/includes.h b/src/includes.h index f3251db..8b139e9 100644 --- a/src/includes.h +++ b/src/includes.h @@ -79,6 +79,7 @@ #include "amqpcpp/channel.h" #include "amqpcpp/throttle.h" #include "amqpcpp/confirmed.h" +#include "amqpcpp/reliable.h" #include "amqpcpp/login.h" #include "amqpcpp/address.h" #include "amqpcpp/connectionhandler.h" diff --git a/src/throttle.cpp b/src/throttle.cpp index a0f2599..fc87f87 100644 --- a/src/throttle.cpp +++ b/src/throttle.cpp @@ -26,22 +26,10 @@ namespace AMQP { * @param channel * @param throttle */ -Throttle::Throttle(Channel &channel, size_t throttle) : _implementation(channel._implementation), _throttle(throttle) -{ - // activate confirm-select mode - auto &deferred = channel.confirmSelect() - .onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); }) - .onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onNack(deliveryTag, multiple); }); - - // we might have failed, in which case we throw - if (!deferred) throw std::runtime_error("could not enable publisher confirms"); - - // we wrap a handling error callback that calls our member function - _implementation->onError([this](const char *message) { reportError(message); }); -} +Throttle::Throttle(Channel &channel, size_t throttle) : Confirmed(channel), _throttle(throttle) {} /** - * Called when the deliverytag(s) are acked/nacked + * Called when the deliverytag(s) are acked * @param deliveryTag * @param multiple */ @@ -53,16 +41,31 @@ void Throttle::onAck(uint64_t deliveryTag, bool multiple) // otherwise, we remove the single element else _open.erase(deliveryTag); - // if there is more room now, we can flush some items + // if there is room, flush part of the queue if (_open.size() < _throttle) flush(_throttle - _open.size()); - // leap out if there are still messages or we shouldn't close yet - if (!_open.empty() || !_close) return; + // call base handler + Confirmed::onAck(deliveryTag, multiple); +} - // close the channel, and forward the callbacks to the installed handler - _implementation->close() - .onSuccess([this]() { _close->reportSuccess(); }) - .onError([this](const char *message) { _close->reportError(message); }); +/** + * Called when the deliverytag(s) are nacked + * @param deliveryTag + * @param multiple + */ +void Throttle::onNack(uint64_t deliveryTag, bool multiple) +{ + // number of messages exposed + if (multiple) _open.erase(_open.begin(), _open.upper_bound(deliveryTag)); + + // otherwise, we remove the single element + else _open.erase(deliveryTag); + + // if there is room, flush part of the queue + if (_open.size() < _throttle) flush(_throttle - _open.size()); + + // call base handler + Confirmed::onNack(deliveryTag, multiple); } /** @@ -88,8 +91,8 @@ bool Throttle::send(uint64_t id, const Frame &frame) // we have now send this id _open.insert(id); - // and we're going to send it over the channel directly - return _implementation->send(frame); + // we can finally actually send it + return Confirmed::send(id, frame); } /** @@ -104,68 +107,11 @@ void Throttle::reportError(const char *message) // we can also forget all open messages, won't hear from them any more _open.clear(); - // reset tracking, since channel is fully broken + // we have no last seen message any more _last = 0; - _current = 1; - // if a callback is set, call the handler with the message - if (_errorCallback) _errorCallback(message); -} - -/** - * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. - * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. - * - * @param exchange the exchange to publish to - * @param routingkey the routing key - * @param envelope the full envelope to send - * @param message the message to send - * @param size size of the message - * @param flags optional flags - */ -bool Throttle::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) -{ - // @todo do not copy the entire buffer to individual frames - - // fail if we're closing the channel, no more publishes allowed - if (_close) return false; - - // send the publish frame - if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false; - - // send header - if (!send(_current, BasicHeaderFrame(_implementation->id(), envelope))) return false; - - // connection and channel still usable? - if (!_implementation->usable()) return false; - - // the max payload size is the max frame size minus the bytes for headers and trailer - uint32_t maxpayload = _implementation->maxPayload(); - uint64_t bytessent = 0; - - // the buffer - const char *data = envelope.body(); - uint64_t bytesleft = envelope.bodySize(); - - // split up the body in multiple frames depending on the max frame size - while (bytesleft > 0) - { - // size of this chunk - uint64_t chunksize = std::min(static_cast(maxpayload), bytesleft); - - // send out a body frame - if (!send(_current, BodyFrame(_implementation->id(), data + bytessent, (uint32_t)chunksize))) return false; - - // update counters - bytessent += chunksize; - bytesleft -= chunksize; - } - - // we're done, we move to the next deliverytag - ++_current; - - // we succeeded - return true; + // call base method + Confirmed::reportError(message); } /** @@ -211,49 +157,6 @@ size_t Throttle::flush(size_t max) return published; } -/** - * Close the throttle channel (closes the underlying channel) - * @return Deferred& - */ -Deferred &Throttle::close() -{ - // if this was already set to be closed, return that - if (_close) return *_close; - - // create the deferred - _close = std::make_shared(_implementation->usable()); - - // if there are open messages or there is a queue, they will still get acked and we will then forward it - if (_open.size() > 0 || !_queue.empty()) return *_close; - - // there are no open messages, we can close the channel directly. - _implementation->close() - .onSuccess([this]() { _close->reportSuccess(); }) - .onError([this](const char *message) { _close->reportError(message); }); - - // return the created deferred - return *_close; -} - -/** - * Install an error callback - * @param callback - */ -void Throttle::onError(const ErrorCallback &callback) -{ - // we store the callback - _errorCallback = callback; - - // check the callback - if (!callback) return; - - // if the channel is no longer usable, report that - if (!_implementation->usable()) return callback("Channel is no longer usable"); - - // specify that we're already closing - if (_close) callback("Wrapped channel is closing down"); -} - /** * End of namespaces */