From 0b9f4e4af5905e56f44283e9d671db140f1cd4c0 Mon Sep 17 00:00:00 2001 From: Michael van der Werve Date: Wed, 7 Oct 2020 11:49:38 +0200 Subject: [PATCH] {auto} initial implementation of the confirmed wrapper --- include/amqpcpp.h | 1 + include/amqpcpp/callbacks.h | 6 + include/amqpcpp/confirmed.h | 96 +++++++++++++ include/amqpcpp/deferredconfirmedpublish.h | 107 ++++++++++++++ include/amqpcpp/throttle.h | 18 +-- src/confirmed.cpp | 160 +++++++++++++++++++++ src/includes.h | 1 + src/throttle.cpp | 2 +- 8 files changed, 382 insertions(+), 9 deletions(-) create mode 100644 include/amqpcpp/confirmed.h create mode 100644 include/amqpcpp/deferredconfirmedpublish.h create mode 100644 src/confirmed.cpp diff --git a/include/amqpcpp.h b/include/amqpcpp.h index 8376725..b4645b0 100644 --- a/include/amqpcpp.h +++ b/include/amqpcpp.h @@ -76,6 +76,7 @@ #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" #include "amqpcpp/throttle.h" +#include "amqpcpp/confirmed.h" #include "amqpcpp/login.h" #include "amqpcpp/address.h" #include "amqpcpp/connectionhandler.h" diff --git a/include/amqpcpp/callbacks.h b/include/amqpcpp/callbacks.h index 7b52653..6b708f0 100644 --- a/include/amqpcpp/callbacks.h +++ b/include/amqpcpp/callbacks.h @@ -85,6 +85,12 @@ using BounceCallback = std::function; using NackCallback = std::function; +/** + * When using a confirm wrapped channel, these callbacks are called when a message is acknowledged/nacked. + */ +using PublishAckCallback = std::function; +using PublishNackCallback = std::function; + /** * End namespace */ diff --git a/include/amqpcpp/confirmed.h b/include/amqpcpp/confirmed.h new file mode 100644 index 0000000..b0f1c54 --- /dev/null +++ b/include/amqpcpp/confirmed.h @@ -0,0 +1,96 @@ +/** + * Confirmed.h + * + * A channel wrapper based on AMQP::Throttle that allows message callbacks to be installed + * on the publishes, to be called when they are confirmed by the message broker. + * + * @author Michael van der Werve + * @copyright 2020 Copernica BV + */ + +/** + * Header guard + */ +#pragma once + +/** + * Includes + */ +#include "deferredconfirmedpublish.h" +#include + +/** + * Begin of namespaces + */ +namespace AMQP { + +/** + * Class definition + */ +class Confirmed : public Throttle, private Watchable +{ +private: + /** + * Set of open deliverytags. We want a normal set (not unordered_set) because + * removal will be cheaper for whole ranges. + * @var size_t + */ + std::map> _handlers; + + /** + * Called when the deliverytag(s) are acked/nacked + * @param deliveryTag + * @param multiple + */ + virtual void onAck(uint64_t deliveryTag, bool multiple) override; + virtual void onNack(uint64_t deliveryTag, bool multiple) override; + +public: + /** + * Constructor + * @param channel + * @param throttle + */ + Confirmed(AMQP::Channel &channel, size_t throttle) : Throttle(channel, throttle) {} + + /** + * Deleted copy constructor, deleted move constructor + * @param other + */ + Confirmed(const Confirmed &other) = delete; + Confirmed(Confirmed &&other) = delete; + + /** + * Deleted copy assignment, deleted move assignment + * @param other + */ + Confirmed &operator=(const Confirmed &other) = delete; + Confirmed &operator=(Confirmed &&other) = delete; + + /** + * Virtual destructor + */ + virtual ~Confirmed() = default; + + /** + * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. + * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. + * + * @param exchange the exchange to publish to + * @param routingkey the routing key + * @param envelope the full envelope to send + * @param message the message to send + * @param size size of the message + * @param flags optional flags + * @return bool + */ + DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0); + DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } + DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); } + DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } +}; + +/** + * End of namespaces + */ +} diff --git a/include/amqpcpp/deferredconfirmedpublish.h b/include/amqpcpp/deferredconfirmedpublish.h new file mode 100644 index 0000000..56193c6 --- /dev/null +++ b/include/amqpcpp/deferredconfirmedpublish.h @@ -0,0 +1,107 @@ +/** + * DeferredConfirmedPublish.h + * + * Deferred callback for RabbitMQ-specific publisher confirms mechanism per-message. + * + * @author Michael van der Werve + * @copyright 2020 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * We extend from the default deferred and add extra functionality + */ +class DeferredConfirmedPublish : public Deferred +{ +private: + /** + * Callback to execute when server confirms that message is processed + * @var AckCallback + */ + PublishAckCallback _ackCallback; + + /** + * Callback to execute when server sends negative acknowledgement + * @var NackCallback + */ + PublishNackCallback _nackCallback; + + /** + * Report an ack, calls the callback. + */ + void reportAck() + { + // check if the callback is set + if (_ackCallback) _ackCallback(); + } + + /** + * Report an nack, calls the callback if set. + */ + void reportNack() + { + // check if the callback is set + if (_nackCallback) _nackCallback(); + } + + /** + * The wrapped confirmed channel implementation may call our + * private members and construct us + */ + friend class Confirmed; + + +public: + /** + * Protected constructor that can only be called + * from within the channel implementation + * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. + * + * @param boolean are we already failed? + */ + DeferredConfirmedPublish(bool failed = false) : Deferred(failed) {} + +public: + /** + * Callback that is called when the broker confirmed message publication + * @param callback the callback to execute + */ + DeferredConfirmedPublish &onAck(const PublishAckCallback &callback) + { + // store callback + _ackCallback = callback; + + // allow chaining + return *this; + } + + /** + * Callback that is called when the broker denied message publication + * @param callback the callback to execute + */ + DeferredConfirmedPublish &onNack(const PublishNackCallback &callback) + { + // store callback + _nackCallback = callback; + + // allow chaining + return *this; + } +}; + +/** + * End namespace + */ +} diff --git a/include/amqpcpp/throttle.h b/include/amqpcpp/throttle.h index 32089f7..ceda5b1 100644 --- a/include/amqpcpp/throttle.h +++ b/include/amqpcpp/throttle.h @@ -36,7 +36,7 @@ class Channel; */ class Throttle { -private: +protected: /** * The implementation for the channel * @var std::shared_ptr @@ -74,13 +74,6 @@ private: */ std::set _open; - /** - * Called when the deliverytag(s) are acked/nacked - * @param deliveryTag - * @param multiple - */ - void onAck(uint64_t deliveryTag, bool multiple); - /** * Send method for a frame * @param id @@ -88,6 +81,15 @@ private: */ bool send(uint64_t id, const Frame &frame); +protected: + /** + * Called when the deliverytag(s) are acked/nacked + * @param deliveryTag + * @param multiple + */ + virtual void onAck(uint64_t deliveryTag, bool multiple); + virtual void onNack(uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); } + public: /** * Constructor diff --git a/src/confirmed.cpp b/src/confirmed.cpp new file mode 100644 index 0000000..c893722 --- /dev/null +++ b/src/confirmed.cpp @@ -0,0 +1,160 @@ +/** + * Confirmed.cpp + * + * Implementation for Confirmed class. + * + * @author Michael van der Werve + * @copyright 2020 Copernica BV + */ + +/** + * Includes + */ +#include "includes.h" + +/** + * Begin of namespaces + */ +namespace AMQP { + +/** + * Called when the deliverytag(s) are acked + * @param deliveryTag + * @param multiple + */ +void Confirmed::onAck(uint64_t deliveryTag, bool multiple) +{ + // call base handler, will advance on the throttle if needed + Throttle::onAck(deliveryTag, multiple); + + // monitor the object, watching for destruction since these ack/nack handlers + // could destruct the object + Monitor monitor(this); + + // single element is simple + if (!multiple) + { + // find the element + auto iter = _handlers.find(deliveryTag); + + // we did not find it (this should not be possible, unless somebody explicitly called) + // the base-class publish methods for some reason. + if (iter == _handlers.end()) return; + + // call the ack handler + iter->second->reportAck(); + + // if the monitor is no longer valid, we stop (we're done) + if (!monitor) return; + + // erase it from the map + _handlers.erase(iter); + + // we are done + return; + } + + // find the last element, inclusive + auto upper = _handlers.upper_bound(deliveryTag); + + // call the handlers + for (auto iter = _handlers.begin(); iter != upper; iter++) + { + // call the handler + iter->second->reportAck(); + + // if we were destructed in the meantime, we leap out + if (!monitor) return; + } + + // erase all acknowledged items + _handlers.erase(_handlers.begin(), upper); +} + +/** + * Called when the deliverytag(s) are nacked + * @param deliveryTag + * @param multiple + */ +void Confirmed::onNack(uint64_t deliveryTag, bool multiple) +{ + // call base handler, will advance on the throttle if needed + Throttle::onNack(deliveryTag, multiple); + + // monitor the object, watching for destruction since these ack/nack handlers + // could destruct the object + Monitor monitor(this); + + // single element is simple + if (!multiple) + { + // find the element + auto iter = _handlers.find(deliveryTag); + + // we did not find it (this should not be possible, unless somebody explicitly called) + // the base-class publish methods for some reason. + if (iter == _handlers.end()) return; + + // call the ack handler + iter->second->reportNack(); + + // if the monitor is no longer valid, we stop (we're done) + if (!monitor) return; + + // erase it from the map + _handlers.erase(iter); + + // we are done + return; + } + + // find the last element, inclusive + auto upper = _handlers.upper_bound(deliveryTag); + + // call the handlers + for (auto iter = _handlers.begin(); iter != upper; iter++) + { + // call the handler + iter->second->reportNack(); + + // if we were destructed in the meantime, we leap out + if (!monitor) return; + } + + // erase all acknowledged items + _handlers.erase(_handlers.begin(), upper); +} + +/** + * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. + * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. + * + * @param exchange the exchange to publish to + * @param routingkey the routing key + * @param envelope the full envelope to send + * @param message the message to send + * @param size size of the message + * @param flags optional flags + */ +DeferredConfirmedPublish &Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) +{ + // copy the current identifier, this will be the ID that will come back + auto current = _current; + + // publish the entire thing, and remember if it failed at any point + bool failed = !Throttle::publish(exchange, routingKey, envelope, flags); + + // create the open + auto handler = std::make_shared(failed); + + // add it to the open handlers + _handlers[current] = handler; + + // return the dereferenced handler + return *handler; +} + +/** + * End of namespaces + */ +} diff --git a/src/includes.h b/src/includes.h index 3bf28fe..739fdb1 100644 --- a/src/includes.h +++ b/src/includes.h @@ -77,6 +77,7 @@ #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" #include "amqpcpp/throttle.h" +#include "amqpcpp/confirmed.h" #include "amqpcpp/login.h" #include "amqpcpp/address.h" #include "amqpcpp/connectionhandler.h" diff --git a/src/throttle.cpp b/src/throttle.cpp index 93428b3..c6083fb 100644 --- a/src/throttle.cpp +++ b/src/throttle.cpp @@ -31,7 +31,7 @@ Throttle::Throttle(Channel &channel, size_t throttle) : _implementation(channel. // activate confirm-select mode auto &deferred = channel.confirmSelect() .onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); }) - .onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onAck(deliveryTag, multiple); }); + .onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onNack(deliveryTag, multiple); }); // we might have failed, in which case we throw if (!deferred) throw std::runtime_error("could not enable publisher confirms");