diff --git a/include/amqpcpp.h b/include/amqpcpp.h index feb9d7a..edfb92d 100644 --- a/include/amqpcpp.h +++ b/include/amqpcpp.h @@ -76,7 +76,7 @@ #include "amqpcpp/deferredrecall.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" -#include "amqpcpp/confirmed.h" +#include "amqpcpp/tagger.h" #include "amqpcpp/throttle.h" #include "amqpcpp/reliable.h" #include "amqpcpp/login.h" diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index 295c6e4..0bfd2b9 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -604,7 +604,7 @@ public: /** * Some internal classes may touch our implementation */ - friend class Confirmed; + friend class Tagger; }; /** diff --git a/include/amqpcpp/deferred.h b/include/amqpcpp/deferred.h index 2e4142c..a5323df 100644 --- a/include/amqpcpp/deferred.h +++ b/include/amqpcpp/deferred.h @@ -193,7 +193,7 @@ protected: * private members and construct us */ friend class ChannelImpl; - friend class Confirmed; + friend class Tagger; public: /** diff --git a/include/amqpcpp/reliable.h b/include/amqpcpp/reliable.h index ecd0100..5b4b9e7 100644 --- a/include/amqpcpp/reliable.h +++ b/include/amqpcpp/reliable.h @@ -17,7 +17,7 @@ * Includes */ #include "deferredpublish.h" -#include "confirmed.h" +#include "tagger.h" #include /** @@ -28,12 +28,12 @@ namespace AMQP { /** * Class definition */ -template +template class Reliable : public BASE { private: // make sure it is a proper channel - static_assert(std::is_base_of::value, "base should be derived from a confirmed channel."); + static_assert(std::is_base_of::value, "base should be derived from a confirmed channel."); /** * Set of open deliverytags. We want a normal set (not unordered_set) because @@ -218,10 +218,10 @@ public: virtual ~Reliable() = default; /** - * Method that can be accessed to check if there are still buffered messages. - * @return bool + * Method to check how many messages are still unacked. + * @return size_t */ - virtual bool waiting() const override { return _handlers.size() > 0 || BASE::waiting(); } + virtual size_t unacknowledged() const override { return _handlers.size(); } /** * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. diff --git a/include/amqpcpp/confirmed.h b/include/amqpcpp/tagger.h similarity index 88% rename from include/amqpcpp/confirmed.h rename to include/amqpcpp/tagger.h index 974bbef..6ddb733 100644 --- a/include/amqpcpp/confirmed.h +++ b/include/amqpcpp/tagger.h @@ -1,5 +1,5 @@ /** - * Confirmed.h + * Tagger.h * * Base class that enables publisher confirms and keeps track of the sent messages. * @@ -26,7 +26,7 @@ namespace AMQP { /** * Class definition */ -class Confirmed : public Watchable +class Tagger : public Watchable { protected: /** @@ -82,32 +82,32 @@ public: * Constructor * @param channel */ - Confirmed(AMQP::Channel &channel); + Tagger(AMQP::Channel &channel); /** * Deleted copy constructor, deleted move constructor * @param other */ - Confirmed(const Confirmed &other) = delete; - Confirmed(Confirmed &&other) = delete; + Tagger(const Tagger &other) = delete; + Tagger(Tagger &&other) = delete; /** * Deleted copy assignment, deleted move assignment * @param other */ - Confirmed &operator=(const Confirmed &other) = delete; - Confirmed &operator=(Confirmed &&other) = delete; + Tagger &operator=(const Tagger &other) = delete; + Tagger &operator=(Tagger &&other) = delete; /** * Virtual destructor */ - virtual ~Confirmed() = default; + virtual ~Tagger() = default; /** - * Method to check if there is anything still waiting - * @return bool + * Method to check how many messages are still unacked. + * @return size_t */ - virtual bool waiting() const { return false; } + virtual size_t unacknowledged() const { return 0; } /** * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. diff --git a/include/amqpcpp/throttle.h b/include/amqpcpp/throttle.h index e48fe93..80d1b49 100644 --- a/include/amqpcpp/throttle.h +++ b/include/amqpcpp/throttle.h @@ -20,7 +20,7 @@ #include #include "copiedbuffer.h" #include "channelimpl.h" -#include "confirmed.h" +#include "tagger.h" /** * Begin of namespaces @@ -35,7 +35,7 @@ class Channel; /** * Class definition */ -class Throttle : public Confirmed +class Throttle : public Tagger { protected: /** @@ -117,10 +117,10 @@ public: virtual ~Throttle() = default; /** - * Method that can be accessed to check if there are still buffered messages. - * @return bool + * Method to check how many messages are still unacked. + * @return size_t */ - virtual bool waiting() const override { return _queue.size() > 0; } + virtual size_t unacknowledged() const override { return _open.size() + (_current - _last - 1); } /** * Get the throttle diff --git a/src/includes.h b/src/includes.h index 8b139e9..449b5ba 100644 --- a/src/includes.h +++ b/src/includes.h @@ -78,7 +78,7 @@ #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" #include "amqpcpp/throttle.h" -#include "amqpcpp/confirmed.h" +#include "amqpcpp/tagger.h" #include "amqpcpp/reliable.h" #include "amqpcpp/login.h" #include "amqpcpp/address.h" diff --git a/src/confirmed.cpp b/src/tagger.cpp similarity index 88% rename from src/confirmed.cpp rename to src/tagger.cpp index 5ace838..00bc637 100644 --- a/src/confirmed.cpp +++ b/src/tagger.cpp @@ -1,7 +1,7 @@ /** - * Confirmed.cpp + * Tagger.cpp * - * Implementation for Confirmed class. + * Implementation for Tagger class. * * @author Michael van der Werve * @copyright 2020 Copernica BV @@ -24,7 +24,7 @@ namespace AMQP { * Constructor * @param channel */ -Confirmed::Confirmed(Channel &channel) : _implementation(channel._implementation) +Tagger::Tagger(Channel &channel) : _implementation(channel._implementation) { // activate confirm-select mode auto &deferred = channel.confirmSelect() @@ -43,7 +43,7 @@ Confirmed::Confirmed(Channel &channel) : _implementation(channel._implementation * @param id * @param frame */ -bool Confirmed::send(uint64_t id, const Frame &frame) +bool Tagger::send(uint64_t id, const Frame &frame) { // we're simply going to send it over the channel directly return _implementation->send(frame); @@ -54,10 +54,10 @@ bool Confirmed::send(uint64_t id, const Frame &frame) * @param deliveryTag * @param multiple */ -void Confirmed::onAck(uint64_t deliveryTag, bool multiple) +void Tagger::onAck(uint64_t deliveryTag, bool multiple) { // leap out if there are still messages or we shouldn't close yet - if (!_close || waiting()) return; + if (!_close || unacknowledged()) return; // close the channel, and forward the callbacks to the installed handler _implementation->close() @@ -70,10 +70,10 @@ void Confirmed::onAck(uint64_t deliveryTag, bool multiple) * @param deliveryTag * @param multiple */ -void Confirmed::onNack(uint64_t deliveryTag, bool multiple) +void Tagger::onNack(uint64_t deliveryTag, bool multiple) { // leap out if there are still messages or we shouldn't close yet - if (!_close || waiting()) return; + if (!_close || unacknowledged()) return; // close the channel, and forward the callbacks to the installed handler _implementation->close() @@ -85,7 +85,7 @@ void Confirmed::onNack(uint64_t deliveryTag, bool multiple) * Method that is called to report an error * @param message */ -void Confirmed::reportError(const char *message) +void Tagger::reportError(const char *message) { // reset tracking, since channel is fully broken _current = 1; @@ -106,7 +106,7 @@ void Confirmed::reportError(const char *message) * @param flags optional flags * @return uint64_t */ -uint64_t Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) +uint64_t Tagger::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) { // @todo do not copy the entire buffer to individual frames @@ -152,7 +152,7 @@ uint64_t Confirmed::publish(const std::string &exchange, const std::string &rout * Close the throttle channel (closes the underlying channel) * @return Deferred& */ -Deferred &Confirmed::close() +Deferred &Tagger::close() { // if this was already set to be closed, return that if (_close) return *_close; @@ -161,7 +161,7 @@ Deferred &Confirmed::close() _close = std::make_shared(_implementation->usable()); // if there are open messages or there is a queue, they will still get acked and we will then forward it - if (waiting()) return *_close; + if (unacknowledged()) return *_close; // there are no open messages, we can close the channel directly. _implementation->close() @@ -176,7 +176,7 @@ Deferred &Confirmed::close() * Install an error callback * @param callback */ -void Confirmed::onError(const ErrorCallback &callback) +void Tagger::onError(const ErrorCallback &callback) { // we store the callback _errorCallback = callback; diff --git a/src/throttle.cpp b/src/throttle.cpp index fc87f87..11c82a9 100644 --- a/src/throttle.cpp +++ b/src/throttle.cpp @@ -26,7 +26,7 @@ namespace AMQP { * @param channel * @param throttle */ -Throttle::Throttle(Channel &channel, size_t throttle) : Confirmed(channel), _throttle(throttle) {} +Throttle::Throttle(Channel &channel, size_t throttle) : Tagger(channel), _throttle(throttle) {} /** * Called when the deliverytag(s) are acked @@ -45,7 +45,7 @@ void Throttle::onAck(uint64_t deliveryTag, bool multiple) if (_open.size() < _throttle) flush(_throttle - _open.size()); // call base handler - Confirmed::onAck(deliveryTag, multiple); + Tagger::onAck(deliveryTag, multiple); } /** @@ -65,7 +65,7 @@ void Throttle::onNack(uint64_t deliveryTag, bool multiple) if (_open.size() < _throttle) flush(_throttle - _open.size()); // call base handler - Confirmed::onNack(deliveryTag, multiple); + Tagger::onNack(deliveryTag, multiple); } /** @@ -92,7 +92,7 @@ bool Throttle::send(uint64_t id, const Frame &frame) _open.insert(id); // we can finally actually send it - return Confirmed::send(id, frame); + return Tagger::send(id, frame); } /** @@ -111,7 +111,7 @@ void Throttle::reportError(const char *message) _last = 0; // call base method - Confirmed::reportError(message); + Tagger::reportError(message); } /**