From c58cb6748dd8ed711631c9f78ab4007dea541ae4 Mon Sep 17 00:00:00 2001 From: Michael van der Werve Date: Mon, 12 Oct 2020 12:02:04 +0200 Subject: [PATCH] onError handler can be set on throttle, and onError & onLost can also be set on messages sent via confirmed. --- include/amqpcpp/callbacks.h | 1 + include/amqpcpp/confirmed.h | 6 ++++ include/amqpcpp/deferredconfirmedpublish.h | 39 ++++++++++++++++++++++ include/amqpcpp/throttle.h | 23 ++++++++++++- src/confirmed.cpp | 31 +++++++++++++++++ src/throttle.cpp | 32 ++++++++++++++++++ 6 files changed, 131 insertions(+), 1 deletion(-) diff --git a/include/amqpcpp/callbacks.h b/include/amqpcpp/callbacks.h index 6b708f0..edeb1b3 100644 --- a/include/amqpcpp/callbacks.h +++ b/include/amqpcpp/callbacks.h @@ -90,6 +90,7 @@ using NackCallback = std::function; using PublishNackCallback = std::function; +using PublishLostCallback = std::function; /** * End namespace diff --git a/include/amqpcpp/confirmed.h b/include/amqpcpp/confirmed.h index b0f1c54..77296cc 100644 --- a/include/amqpcpp/confirmed.h +++ b/include/amqpcpp/confirmed.h @@ -45,6 +45,12 @@ private: virtual void onAck(uint64_t deliveryTag, bool multiple) override; virtual void onNack(uint64_t deliveryTag, bool multiple) override; + /** + * Method that is called to report an error + * @param message + */ + virtual void reportError(const char *message) override; + public: /** * Constructor diff --git a/include/amqpcpp/deferredconfirmedpublish.h b/include/amqpcpp/deferredconfirmedpublish.h index 56193c6..9d36c3f 100644 --- a/include/amqpcpp/deferredconfirmedpublish.h +++ b/include/amqpcpp/deferredconfirmedpublish.h @@ -35,6 +35,12 @@ private: */ PublishNackCallback _nackCallback; + /** + * Callback to execute when message is lost (nack / error) + * @var LostCallback + */ + PublishLostCallback _lostCallback; + /** * Report an ack, calls the callback. */ @@ -51,6 +57,25 @@ private: { // check if the callback is set if (_nackCallback) _nackCallback(); + + // message is 'lost' + if (_lostCallback) _lostCallback(); + } + + /** + * Indicate failure + * @param error Description of the error that occured + */ + void reportError(const char *error) + { + // from this moment on the object should be listed as failed + _failed = true; + + // message is lost + if (_lostCallback) _lostCallback(); + + // execute callbacks if registered + if (_errorCallback) _errorCallback(error); } /** @@ -99,6 +124,20 @@ public: // allow chaining return *this; } + + /** + * Callback that is called when a message is lost, either through RabbitMQ + * rejecting it or because of a channel error + * @param callback the callback to execute + */ + DeferredConfirmedPublish &onLost(const PublishLostCallback &callback) + { + // store callback + _lostCallback = callback; + + // allow chaining + return *this; + } }; /** diff --git a/include/amqpcpp/throttle.h b/include/amqpcpp/throttle.h index c0f972e..b2bc4ee 100644 --- a/include/amqpcpp/throttle.h +++ b/include/amqpcpp/throttle.h @@ -80,6 +80,12 @@ protected: */ std::shared_ptr _close; + /** + * Callback to call when an error occurred + * @var ErrorCallback + */ + ErrorCallback _errorCallback; + /** * Send method for a frame * @param id @@ -87,6 +93,12 @@ protected: */ bool send(uint64_t id, const Frame &frame); + /** + * Method that is called to report an error + * @param message + */ + virtual void reportError(const char *message); + protected: /** * Called when the deliverytag(s) are acked/nacked @@ -98,7 +110,10 @@ protected: public: /** - * Constructor + * Constructor. Warning: this takes control of the channel, there should be no extra + * handlers set on the channel (onError) and no further publishes should be done on the + * raw channel either. Doing this will cause the throttle to work incorrectly, as the + * counters are not properly updated. * @param channel * @param throttle */ @@ -177,6 +192,12 @@ public: * @return Deferred& */ Deferred &close(); + + /** + * Install an error callback + * @param callback + */ + void onError(const ErrorCallback &callback); }; /** diff --git a/src/confirmed.cpp b/src/confirmed.cpp index 70e3ae9..83a6571 100644 --- a/src/confirmed.cpp +++ b/src/confirmed.cpp @@ -131,6 +131,37 @@ void Confirmed::onNack(uint64_t deliveryTag, bool multiple) Throttle::onNack(deliveryTag, multiple); } +/** + * Method that is called to report an error + * @param message + */ +void Confirmed::reportError(const char *message) +{ + // monitor the object, watching for destruction since these ack/nack handlers + // could destruct the object + Monitor monitor(this); + + // move the handlers out + auto handlers = std::move(_handlers); + + // iterate over all the messages + // call the handlers + for (const auto &iter : handlers) + { + // call the handler + iter.second->reportError(message); + + // if we were destructed in the meantime, we leap out + if (!monitor) return; + } + + // if the monitor is no longer valid, leap out + if (!monitor) return; + + // call base class to let it handle the errors + Throttle::reportError(message); +} + /** * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. diff --git a/src/throttle.cpp b/src/throttle.cpp index d8d58a5..ad09258 100644 --- a/src/throttle.cpp +++ b/src/throttle.cpp @@ -35,6 +35,9 @@ Throttle::Throttle(Channel &channel, size_t throttle) : _implementation(channel. // we might have failed, in which case we throw if (!deferred) throw std::runtime_error("could not enable publisher confirms"); + + // we wrap a handling error callback that calls our member function + _implementation->onError([this](const char *message) { reportError(message); }); } /** @@ -89,6 +92,16 @@ bool Throttle::send(uint64_t id, const Frame &frame) return _implementation->send(frame); } +/** + * Method that is called to report an error + * @param message + */ +void Throttle::reportError(const char *message) +{ + // if a callback is set, call the handler with the message + if (_errorCallback) _errorCallback(message); +} + /** * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. @@ -212,6 +225,25 @@ Deferred &Throttle::close() return *_close; } +/** + * Install an error callback + * @param callback + */ +void Throttle::onError(const ErrorCallback &callback) +{ + // we store the callback + _errorCallback = callback; + + // check the callback + if (!callback) return; + + // if the channel is no longer usable, report that + if (!_implementation->usable()) return callback("Channel is no longer usable"); + + // specify that we're already closing + if (_close) callback("Wrapped channel is closing down"); +} + /** * End of namespaces */