From 921f24ae06b61bfbfd2a5045cd28759ed465dd8e Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Tue, 15 Apr 2014 10:43:33 +0200 Subject: [PATCH] de-templified the deferred objects, to make them easier to understand for other programmers --- amqpcpp.h | 5 +- include/callbacks.h | 149 +++----------------------------- include/channel.h | 74 ++++++++-------- include/channelimpl.h | 102 ++++++++++++++-------- include/deferred.h | 143 +++++++++++++++++++++++-------- include/deferredcancel.h | 91 ++++++++++++++++++++ include/deferredconsumer.h | 17 ++-- include/deferreddelete.h | 91 ++++++++++++++++++++ include/deferredqueue.h | 91 ++++++++++++++++++++ src/channelimpl.cpp | 168 +++++++++++++++++++++++-------------- src/queuedeclareokframe.h | 4 +- 11 files changed, 623 insertions(+), 312 deletions(-) create mode 100644 include/deferredcancel.h create mode 100644 include/deferreddelete.h create mode 100644 include/deferredqueue.h diff --git a/amqpcpp.h b/amqpcpp.h index 9f84823..06f19f8 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -53,9 +53,12 @@ // mid level includes #include #include +#include #include #include -#include +#include +#include +#include #include #include #include diff --git a/include/callbacks.h b/include/callbacks.h index 9d378df..50239a0 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -12,145 +12,18 @@ namespace AMQP { /** - * Class for managing deferred callbacks + * All the callbacks that are supported + * + * When someone registers a callback function for certain events, it should + * match one of the following signatures. */ -class Callbacks -{ -private: - /** - * Different callback types supported - */ - std::tuple< - std::deque>, - std::deque>, - std::deque>, - std::deque> - > _callbacks; - - /** - * If all else fails, we have gotten the wrong - * type, which is not present in the arguments. - * - * This should result in a compile error. - */ - template - struct getIndex - { - // if this structure is used, we went past the last argument - // and this static_assert should trigger a compile failure. - static_assert(N < sizeof...(Arguments), "Type T not found in Arguments"); - - // we still have to provide this member though - static constexpr std::size_t value = N; - }; - - /** - * This structure has one static member that represents - * the index of T in Arguments. This variant is used where U - * does equal T, so a match is found, meaning the current - * index given is the right one. - */ - template - struct getIndex - { - // element is same type as we are looking for - static constexpr std::size_t value = N; - }; - - /** - * This structure has one static member that represents - * the index of T in Arguments. This variant is used where U - * does not equal T, so we need to look at the next member. - */ - template - struct getIndex - { - // current N is not correct, unroll to next element - static constexpr std::size_t value = getIndex::value; - }; - - /** - * Retrieve the list of callbacks matching the type - * - * @param tuple tuple with callbacks - */ - template - T& get(std::tuple& tuple) - { - // retrieve the index at which the requested callbacks can be found - constexpr std::size_t index = getIndex::value; - - // retrieve the callbacks - return std::get(tuple); - } -public: - /** - * Add a deferred to the available callbacks - * - * @param deferred the deferred to add - * @return reference to the inserted deferred - */ - template - Deferred& push_back(Deferred&& item) - { - // retrieve the container - auto &container = get>>(_callbacks); - - // add the element - container.push_back(std::move(item)); - - // return reference to the new item - return container.back(); - } - - /** - * Report success to the relevant callback - * - * @param mixed... additional parameters - */ - template - void reportSuccess(Arguments ...parameters) - { - // retrieve the container and element - auto &container = get>>(_callbacks); - auto &callback = container.front(); - - // execute the callback - callback.success(parameters...); - - // remove the executed callback - container.pop_front(); - } - - /** - * Report a failure - * - * @param error a description of the error - */ - template - typename std::enable_if::value>::type - reportError(const std::string& message) - {} - - /** - * Report a failure - * - * @param error a description of the error - */ - template - typename std::enable_if::value>::type - reportError(const std::string& message) - { - // retrieve the callbacks at current index - auto &callbacks = std::get(_callbacks); - - // report errors to all callbacks of the current type - for (auto &callback : callbacks) callback.error(message); - - // execute the next type - reportError(message); - } -}; +using SuccessCallback = std::function; +using ErrorCallback = std::function; +using FinalizeCallback = std::function; +using ConsumeCallback = std::function; +using QueueCallback = std::function; +using DeleteCallback = std::function; +using CancelCallback = std::function; /** * End namespace diff --git a/include/channel.h b/include/channel.h index e5df3d3..0f315c4 100644 --- a/include/channel.h +++ b/include/channel.h @@ -42,7 +42,7 @@ public: * * @param callback the callback to execute */ - void onReady(const std::function& callback) + void onReady(const SuccessCallback &callback) { // store callback in implementation _implementation._readyCallback = callback; @@ -56,7 +56,7 @@ public: * * @param callback the callback to execute */ - void onError(const std::function& callback) + void onError(const ErrorCallback &callback) { // store callback in implementation _implementation._errorCallback = callback; @@ -70,7 +70,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& pause() + Deferred &pause() { return _implementation.pause(); } @@ -83,7 +83,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& resume() + Deferred &resume() { return _implementation.resume(); } @@ -103,7 +103,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& startTransaction() + Deferred &startTransaction() { return _implementation.startTransaction(); } @@ -114,7 +114,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& commitTransaction() + Deferred &commitTransaction() { return _implementation.commitTransaction(); } @@ -125,7 +125,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& rollbackTransaction() + Deferred &rollbackTransaction() { return _implementation.rollbackTransaction(); } @@ -149,12 +149,12 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); } - Deferred<>& declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); } - Deferred<>& declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); } - Deferred<>& declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); } - Deferred<>& declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); } - Deferred<>& declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); } + Deferred &declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); } + Deferred &declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); } + Deferred &declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); } + Deferred &declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); } + Deferred &declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); } + Deferred &declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); } /** * Remove an exchange @@ -169,7 +169,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); } + Deferred &removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); } /** * Bind two exchanges to each other @@ -187,9 +187,9 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); } - Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); } - Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); } + Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); } + Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); } + Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); } /** * Unbind two exchanges from one another @@ -207,9 +207,9 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); } - Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); } - Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); } + Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); } + Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); } + Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); } /** * Declare a queue @@ -240,12 +240,12 @@ public: * * }); */ - Deferred& declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); } - Deferred& declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); } - Deferred& declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); } - Deferred& declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); } - Deferred& declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); } - Deferred& declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); } + DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); } + DeferredQueue &declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); } + DeferredQueue &declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); } + DeferredQueue &declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); } + DeferredQueue &declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); } + DeferredQueue &declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); } /** * Bind a queue to an exchange @@ -263,9 +263,9 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); } - Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); } - Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); } + Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); } + Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); } + Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); } /** * Unbind a queue from an exchange @@ -277,8 +277,8 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); } - Deferred<>& unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); } + Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); } + Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); } /** * Purge a queue @@ -303,7 +303,7 @@ public: * * }); */ - Deferred& purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); } + DeferredDelete &purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); } /** * Remove a queue @@ -329,7 +329,7 @@ public: * * }); */ - Deferred& removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); } + DeferredDelete &removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); } /** * Publish a message to an exchange @@ -361,7 +361,7 @@ public: * @param prefetchCount maximum number of messages to prefetch * @return bool whether the Qos frame is sent. */ - Deferred<>& setQos(uint16_t prefetchCount) + Deferred &setQos(uint16_t prefetchCount) { return _implementation.setQos(prefetchCount); } @@ -440,7 +440,7 @@ public: * * }); */ - Deferred& cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } + DeferredCancel &cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } /** * Acknoldge a received message @@ -493,7 +493,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& recover(int flags = 0) { return _implementation.recover(flags); } + Deferred &recover(int flags = 0) { return _implementation.recover(flags); } /** * Close the current channel @@ -501,7 +501,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& close() { return _implementation.close(); } + Deferred &close() { return _implementation.close(); } /** * Get the channel we're working on diff --git a/include/channelimpl.h b/include/channelimpl.h index 37e8e07..43a87ba 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -34,24 +34,38 @@ private: /** * Callback when the channel is ready + * @var SuccessCallback */ - std::function _readyCallback; + SuccessCallback _readyCallback; /** * Callback when the channel errors out + * @var ErrorCallback */ - std::function _errorCallback; + ErrorCallback _errorCallback; /** * Callback to execute when a message arrives + * + * @todo do this different?? */ std::unique_ptr _consumer; /** - * The callbacks waiting to be called + * Pointer to the oldest deferred result (the first one that is going + * to be executed) + * + * @var Deferred */ - Callbacks _callbacks; - + Deferred *_oldestCallback = nullptr; + + /** + * Pointer to the newest deferred result (the last one to be added). + * + * @var Deferred + */ + Deferred *_newestCallback = nullptr; + /** * The channel number * @var uint16_t @@ -86,6 +100,14 @@ private: */ ChannelImpl(Channel *parent, Connection *connection); + /** + * Push a deferred result + * @param result The deferred result + * @param error Error message in case the result is not ok + */ + void push(Deferred *deferred, const char *error); + + public: /** * Destructor @@ -109,7 +131,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& pause(); + Deferred &pause(); /** * Resume a paused channel @@ -119,7 +141,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& resume(); + Deferred &resume(); /** * Is the channel connected? @@ -133,7 +155,7 @@ public: /** * Start a transaction */ - Deferred<>& startTransaction(); + Deferred &startTransaction(); /** * Commit the current transaction @@ -141,7 +163,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& commitTransaction(); + Deferred &commitTransaction(); /** * Rollback the current transaction @@ -149,7 +171,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& rollbackTransaction(); + Deferred &rollbackTransaction(); /** * declare an exchange @@ -162,7 +184,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments); + Deferred &declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments); /** * bind two exchanges @@ -176,7 +198,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); + Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); /** * unbind two exchanges @@ -190,7 +212,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); + Deferred &unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); /** * remove an exchange @@ -201,7 +223,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& removeExchange(const std::string &name, int flags); + Deferred &removeExchange(const std::string &name, int flags); /** * declare a queue @@ -212,7 +234,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred& declareQueue(const std::string &name, int flags, const Table &arguments); + DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments); /** * Bind a queue to an exchange @@ -226,7 +248,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments); + Deferred &bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments); /** * Unbind a queue from an exchange @@ -239,7 +261,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& unbindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments); + Deferred &unbindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments); /** * Purge a queue @@ -259,7 +281,7 @@ public: * * }); */ - Deferred& purgeQueue(const std::string &name, int flags); + DeferredDelete &purgeQueue(const std::string &name, int flags); /** * Remove a queue @@ -279,7 +301,7 @@ public: * * }); */ - Deferred& removeQueue(const std::string &name, int flags); + DeferredDelete &removeQueue(const std::string &name, int flags); /** * Publish a message to an exchange @@ -303,7 +325,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& setQos(uint16_t prefetchCount); + Deferred &setQos(uint16_t prefetchCount); /** * Tell the RabbitMQ server that we're ready to consume messages @@ -345,7 +367,7 @@ public: * * }); */ - Deferred& cancel(const std::string &tag, int flags); + DeferredCancel &cancel(const std::string &tag, int flags); /** * Acknowledge a message @@ -370,7 +392,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& recover(int flags); + Deferred &recover(int flags); /** * Close the current channel @@ -378,7 +400,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred<>& close(); + Deferred &close(); /** * Get the channel we're working on @@ -402,9 +424,9 @@ public: * * @param frame frame to send * @param message the message to trigger if the frame cannot be send at all + * @return Deferred the deferred object */ - template - Deferred& send(const Frame &frame, const char *message); + Deferred &send(const Frame &frame, const char *message); /** * Report to the handler that the channel is opened @@ -412,7 +434,7 @@ public: void reportReady() { // inform handler - if (_readyCallback) _readyCallback(_parent); + if (_readyCallback) _readyCallback(); } /** @@ -436,8 +458,13 @@ public: template void reportSuccess(Arguments ...parameters) { - // report success to the relevant callback - _callbacks.reportSuccess(std::forward(parameters)...); + // skip if there is no oldest callback + if (!_oldestCallback) return; + + // report to the oldest callback, and install a new oldest callback + _oldestCallback = _oldestCallback->reportSuccess(std::forward(parameters)...); + + // @todo destruct oldest callback } /** @@ -449,11 +476,19 @@ public: // change state _state = state_closed; - // inform handler - if (_errorCallback) _errorCallback(_parent, message); + // @todo multiple callbacks are called, this could break + // @todo should this be a std::string parameter? - // report to all waiting callbacks too - _callbacks.reportError(message); + // inform handler + if (_errorCallback) _errorCallback(message.c_str()); + + // skip if there is no oldest callback + if (!_oldestCallback) return; + + // report to the oldest callback, and install a new oldest callback + _oldestCallback = _oldestCallback->reportError(message); + + // @todo destruct oldest callback } /** @@ -488,7 +523,8 @@ public: if (!_consumer) reportError("Received basic consume ok frame, but no consumer was found"); // otherwise, we now report the consumer as started - else _consumer->success(consumerTag); + // @todo look at this implementation + //else _consumer->success(consumerTag); } /** diff --git a/include/deferred.h b/include/deferred.h index df4f9ed..777a370 100644 --- a/include/deferred.h +++ b/include/deferred.h @@ -20,47 +20,132 @@ class Callbacks; /** * Class definition */ -template class Deferred { -private: +protected: /** * Callback to execute on success + * @var SuccessCallback */ - std::function _successCallback; + SuccessCallback _successCallback; /** * Callback to execute on failure + * @var ErrorCallback */ - std::function _errorCallback; + ErrorCallback _errorCallback; /** * Callback to execute either way + * @var FinalizeCallback */ - std::function _finalizeCallback; + FinalizeCallback _finalizeCallback; + + /** + * Pointer to the next deferred object + * @var Deferred + */ + Deferred *_next = nullptr; + + /** + * Do we already know we failed? + * @var bool + */ + bool _failed; + /** * Indicate success - * - * @param parameters... the extra parameters relevant for this deferred handler + * @return Deferred Next deferred result */ - void success(Arguments ...parameters) const + Deferred *reportSuccess() const { // execute callbacks if registered - if (_successCallback) _successCallback(parameters...); - if (_finalizeCallback) _finalizeCallback(""); + if (_successCallback) _successCallback(); + if (_finalizeCallback) _finalizeCallback(); + + // return the next deferred result + return _next; + } + + /** + * Report success for queue declared messages + * @param name Name of the new queue + * @param messagecount Number of messages in the queue + * @param consumercount Number of consumers linked to the queue + * @return Deferred Next deferred result + */ + virtual Deferred *reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const + { + // this is the same as a regular success message + return reportSuccess(); + } + + /** + * Report success for frames that report delete operations + * @param messagecount Number of messages that were deleted + * @return Deferred + */ + virtual Deferred *reportSuccess(uint32_t messagecount) const + { + // this is the same as a regular success message + return reportSuccess(); + } + + /** + * Report success for frames that report cancel operations + * @param name Consumer tag that is cancelled + * @return Deferred + */ + virtual Deferred *reportSuccess(const std::string &name) const + { + // this is the same as a regular success message + return reportSuccess(); + } + + /** + * Indicate failure + * @param error Description of the error that occured + * @return Deferred Next deferred result + */ + Deferred *reportError(const std::string& error) + { + // from this moment on the object should be listed as failed + _failed = true; + + // execute callbacks if registered + if (_errorCallback) _errorCallback(error.c_str()); + if (_finalizeCallback) _finalizeCallback(); + + // return the next deferred result + return _next; } /** * Indicate failure - * * @param error description of the error that occured */ - void error(const std::string& error) const + Deferred *reportError(const char *error) { + // from this moment on the object should be listed as failed + _failed = true; + // execute callbacks if registered if (_errorCallback) _errorCallback(error); - if (_finalizeCallback) _finalizeCallback(error); + if (_finalizeCallback) _finalizeCallback(); + + // return the next deferred result + return _next; + } + + /** + * Add a pointer to the next deferred result + * @param deferred + */ + void add(Deferred *deferred) + { + // store pointer + _next = deferred; } /** @@ -71,12 +156,6 @@ private: friend class Callbacks; protected: - /** - * Do we already know we failed? - * @var bool - */ - bool _failed; - /** * Protected constructor that can only be called * from within the channel implementation @@ -87,18 +166,10 @@ protected: public: /** - * Deleted copy constructor + * Deleted copy and move constructors */ - Deferred(const Deferred& that) = delete; - - /** - * Move constructor - */ - Deferred(Deferred&& that) : - _successCallback(std::move(that._successCallback)), - _errorCallback(std::move(that._errorCallback)), - _finalizeCallback(std::move(that._finalizeCallback)) - {} + Deferred(const Deferred &that) = delete; + Deferred(Deferred &&that) = delete; /** * Cast to a boolean @@ -119,10 +190,12 @@ public: * * @param callback the callback to execute */ - Deferred& onSuccess(const std::function& callback) + Deferred &onSuccess(const SuccessCallback &callback) { // store callback _successCallback = callback; + + // allow chaining return *this; } @@ -136,10 +209,12 @@ public: * * @param callback the callback to execute */ - Deferred& onError(const std::function& callback) + Deferred &onError(const ErrorCallback &callback) { // store callback _errorCallback = callback; + + // allow chaining return *this; } @@ -158,10 +233,12 @@ public: * * @param callback the callback to execute */ - Deferred& onFinalize(const std::function& callback) + Deferred &onFinalize(const FinalizeCallback &callback) { // store callback _finalizeCallback = callback; + + // allow chaining return *this; } }; diff --git a/include/deferredcancel.h b/include/deferredcancel.h new file mode 100644 index 0000000..401716d --- /dev/null +++ b/include/deferredcancel.h @@ -0,0 +1,91 @@ +/** + * DeferredCancel.h + * + * Deferred callback for instructions that cancel a running consumer. This + * deferred object allows one to register a callback that also gets the + * consumer tag as one of its parameters. + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * We extend from the default deferred and add extra functionality + */ +class DeferredCancel : public Deferred +{ +private: + /** + * Callback to execute when the instruction is completed + * @var CancelCallback + */ + CancelCallback _cancelCallback; + + /** + * Report success for frames that report cancel operations + * @param name Consumer tag that is cancelled + * @return Deferred + */ + virtual Deferred *reportSuccess(const std::string &name) const override + { + // skip if no special callback was installed + if (!_cancelCallback) return Deferred::reportSuccess(); + + // call the callback + _cancelCallback(name); + + // call finalize callback + if (_finalizeCallback) _finalizeCallback(); + + // return next object + return _next; + } + + /** + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + friend class ConsumedMessage; + +protected: + /** + * Protected constructor that can only be called + * from within the channel implementation + * + * @param boolean are we already failed? + */ + DeferredCancel(bool failed = false) : Deferred(failed) {} + +public: + /** + * Register a function to be called when the cancel operation succeeded + * + * Only one callback can be registered. Successive calls + * to this function will clear callbacks registered before. + * + * @param callback the callback to execute + */ + DeferredCancel &onSuccess(const CancelCallback &callback) + { + // store callback + _cancelCallback = callback; + + // allow chaining + return *this; + } + + /** + * All the onSuccess() functions defined in the base class are accessible too + */ + using Deferred::onSuccess; +}; + +/** + * End namespace + */ +} diff --git a/include/deferredconsumer.h b/include/deferredconsumer.h index 8de39bd..c1555ad 100644 --- a/include/deferredconsumer.h +++ b/include/deferredconsumer.h @@ -14,13 +14,14 @@ namespace AMQP { /** * We extend from the default deferred and add extra functionality */ -class DeferredConsumer : public Deferred +class DeferredConsumer : public Deferred { private: /** * Callback to execute when a message arrives + * @var ConsumeCallbacl */ - std::function _messageCallback; + ConsumeCallback _consumeCallback; /** * Process a message @@ -33,7 +34,7 @@ private: void message(const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) const { // do we have a valid callback - if (_messageCallback) _messageCallback(message, deliveryTag, consumerTag, redelivered); + if (_consumeCallback) _consumeCallback(message, deliveryTag, consumerTag, redelivered); } /** @@ -48,7 +49,6 @@ protected: * Protected constructor that can only be called * from within the channel implementation * - * @param channel the channel we operate under * @param boolea are we already failed? */ DeferredConsumer(bool failed = false) : Deferred(failed) {} @@ -62,12 +62,17 @@ public: * * @param callback the callback to execute */ - DeferredConsumer& onReceived(const std::function& callback) + DeferredConsumer& onReceived(const ConsumeCallback &callback) { // store callback - _messageCallback = callback; + _consumeCallback = callback; return *this; } + + /** + * All the onSuccess() functions defined in the base class are accessible too + */ + using Deferred::onSuccess; }; /** diff --git a/include/deferreddelete.h b/include/deferreddelete.h new file mode 100644 index 0000000..43ada61 --- /dev/null +++ b/include/deferreddelete.h @@ -0,0 +1,91 @@ +/** + * DeferredDelete.h + * + * Deferred callback for instructions that delete or purge queues, and that + * want to report the number of deleted messages. + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * We extend from the default deferred and add extra functionality + */ +class DeferredDelete : public Deferred +{ +private: + /** + * Callback to execute when the instruction is completed + * @var DeleteCallback + */ + DeleteCallback _deleteCallback; + + /** + * Report success for queue delete and queue purge messages + * @param messagecount Number of messages that were deleted + * @return Deferred Next deferred result + */ + virtual Deferred *reportSuccess(uint32_t messagecount) const override + { + // skip if no special callback was installed + if (!_deleteCallback) return Deferred::reportSuccess(); + + // call the callback + _deleteCallback(messagecount); + + // call finalize callback + if (_finalizeCallback) _finalizeCallback(); + + // return next object + return _next; + } + + + /** + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + friend class ConsumedMessage; + +protected: + /** + * Protected constructor that can only be called + * from within the channel implementation + * + * @param boolean are we already failed? + */ + DeferredDelete(bool failed = false) : Deferred(failed) {} + +public: + /** + * Register a function to be called when the queue is deleted or purged + * + * Only one callback can be registered. Successive calls + * to this function will clear callbacks registered before. + * + * @param callback the callback to execute + */ + DeferredDelete &onSuccess(const DeleteCallback &callback) + { + // store callback + _deleteCallback = callback; + + // allow chaining + return *this; + } + + /** + * All the onSuccess() functions defined in the base class are accessible too + */ + using Deferred::onSuccess; +}; + +/** + * End namespace + */ +} diff --git a/include/deferredqueue.h b/include/deferredqueue.h new file mode 100644 index 0000000..085f095 --- /dev/null +++ b/include/deferredqueue.h @@ -0,0 +1,91 @@ +/** + * DeferredQueue.h + * + * Deferred callback for "declare-queue" instructions. + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * We extend from the default deferred and add extra functionality + */ +class DeferredQueue : public Deferred +{ +private: + /** + * Callback to execute when the queue is declared + * @var QueueCallback + */ + QueueCallback _queueCallback; + + /** + * Report success for queue declared messages + * @param name Name of the new queue + * @param messagecount Number of messages in the queue + * @param consumercount Number of consumers linked to the queue + * @return Deferred Next deferred result + */ + virtual Deferred *reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const override + { + // skip if no special callback was installed + if (!_queueCallback) return Deferred::reportSuccess(); + + // call the queue callback + _queueCallback(name, messagecount, consumercount); + + // call finalize callback + if (_finalizeCallback) _finalizeCallback(); + + // return next object + return _next; + } + + /** + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + friend class ConsumedMessage; + +protected: + /** + * Protected constructor that can only be called + * from within the channel implementation + * + * @param boolea are we already failed? + */ + DeferredQueue(bool failed = false) : Deferred(failed) {} + +public: + /** + * Register a function to be called when the queue is declared + * + * Only one callback can be registered. Successive calls + * to this function will clear callbacks registered before. + * + * @param callback the callback to execute + */ + DeferredQueue &onSuccess(const QueueCallback &callback) + { + // store callback + _queueCallback = callback; + + // allow chaining + return *this; + } + + /** + * All the onSuccess() functions defined in the base class are accessible too + */ + using Deferred::onSuccess; +}; + +/** + * End namespace + */ +} diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index a478dff..53e7438 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -85,6 +85,28 @@ ChannelImpl::~ChannelImpl() // close the channel now close(); + + + // @todo destruct deferred resutls +} + +/** + * Push a deferred result + * @param result The deferred object to push + * @param error Error message in case of error + */ +void ChannelImpl::push(Deferred *deferred, const char *error) +{ + // do we already have an oldest? + if (!_oldestCallback) _oldestCallback = deferred; + + // do we already have a newest? + if (_newestCallback) _newestCallback->add(deferred); + + // store newest callback + _newestCallback = deferred; + + // @todo in case of error we have to report the error with a timeout } /** @@ -95,10 +117,10 @@ ChannelImpl::~ChannelImpl() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::pause() +Deferred &ChannelImpl::pause() { // send a channel flow frame - return send<>(ChannelFlowFrame(_id, false), "Cannot send channel flow frame"); + return send(ChannelFlowFrame(_id, false), "Cannot send channel flow frame"); } /** @@ -109,10 +131,10 @@ Deferred<>& ChannelImpl::pause() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::resume() +Deferred &ChannelImpl::resume() { // send a channel flow frame - return send<>(ChannelFlowFrame(_id, true), "Cannot send channel flow frame"); + return send(ChannelFlowFrame(_id, true), "Cannot send channel flow frame"); } /** @@ -121,10 +143,10 @@ Deferred<>& ChannelImpl::resume() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::startTransaction() +Deferred &ChannelImpl::startTransaction() { // send a transaction frame - return send<>(TransactionSelectFrame(_id), "Cannot send transaction start frame"); + return send(TransactionSelectFrame(_id), "Cannot send transaction start frame"); } /** @@ -133,10 +155,10 @@ Deferred<>& ChannelImpl::startTransaction() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::commitTransaction() +Deferred &ChannelImpl::commitTransaction() { // send a transaction frame - return send<>(TransactionCommitFrame(_id), "Cannot send transaction commit frame"); + return send(TransactionCommitFrame(_id), "Cannot send transaction commit frame"); } /** @@ -145,10 +167,10 @@ Deferred<>& ChannelImpl::commitTransaction() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::rollbackTransaction() +Deferred &ChannelImpl::rollbackTransaction() { // send a transaction frame - return send<>(TransactionRollbackFrame(_id), "Cannot send transaction commit frame"); + return send(TransactionRollbackFrame(_id), "Cannot send transaction commit frame"); } /** @@ -157,13 +179,13 @@ Deferred<>& ChannelImpl::rollbackTransaction() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::close() +Deferred &ChannelImpl::close() { // channel could be dead after send operation, we need to monitor that Monitor monitor(this); // send a channel close frame - auto &handler = send<>(ChannelCloseFrame(_id), "Cannot send channel close frame"); + auto &handler = send(ChannelCloseFrame(_id), "Cannot send channel close frame"); // was the frame sent and are we still alive? if (handler && monitor.valid()) _state = state_closing; @@ -183,7 +205,7 @@ Deferred<>& ChannelImpl::close() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) +Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { // convert exchange type std::string exchangeType; @@ -193,7 +215,7 @@ Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType t if (type == ExchangeType::headers)exchangeType = "headers"; // send declare exchange frame - return send<>(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame"); + return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame"); } /** @@ -208,10 +230,10 @@ Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType t * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) +Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { // send exchange bind frame - return send<>(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame"); + return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame"); } /** @@ -226,10 +248,10 @@ Deferred<>& ChannelImpl::bindExchange(const std::string &source, const std::stri * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) +Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { // send exchange unbind frame - return send<>(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame"); + return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame"); } /** @@ -241,10 +263,10 @@ Deferred<>& ChannelImpl::unbindExchange(const std::string &source, const std::st * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::removeExchange(const std::string &name, int flags) +Deferred &ChannelImpl::removeExchange(const std::string &name, int flags) { // send delete exchange frame - return send<>(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame"); + return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame"); } /** @@ -256,10 +278,19 @@ Deferred<>& ChannelImpl::removeExchange(const std::string &name, int flags) * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred& ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments) +DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments) { + // the frame to send + QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments); + // send the queuedeclareframe - return send(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments), "Cannot send queue declare frame"); + auto *result = new DeferredQueue(send(frame)); + + // add the deferred result + push(result, "Cannot send queue declare frame"); + + // done + return *result; } /** @@ -274,10 +305,10 @@ Deferred& ChannelImpl::declareQueue(cons * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) +Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) { // send the bind queue frame - return send<>(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame"); + return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame"); } /** @@ -291,10 +322,10 @@ Deferred<>& ChannelImpl::bindQueue(const std::string &exchangeName, const std::s * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) +Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { // send the unbind queue frame - return send<>(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame"); + return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame"); } /** @@ -315,10 +346,19 @@ Deferred<>& ChannelImpl::unbindQueue(const std::string &exchange, const std::str * * }); */ -Deferred& ChannelImpl::purgeQueue(const std::string &name, int flags) +DeferredDelete &ChannelImpl::purgeQueue(const std::string &name, int flags) { - // send the queue purge frame - return send(QueuePurgeFrame(_id, name, flags & nowait), "Cannot send queue purge frame"); + // the frame to send + QueuePurgeFrame frame(_id, name, flags & nowait); + + // send the frame, and create deferred object + auto *deferred = new DeferredDelete(send(frame)); + + // push to list + push(deferred, "Cannot send queue purge frame"); + + // done + return *deferred; } /** @@ -339,10 +379,19 @@ Deferred& ChannelImpl::purgeQueue(const std::string &name, int flags) * * }); */ -Deferred& ChannelImpl::removeQueue(const std::string &name, int flags) +DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) { - // send the remove queue frame - return send(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait), "Cannot send remove queue frame"); + // the frame to send + QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, flags & nowait); + + // send the frame, and create deferred object + auto *deferred = new DeferredDelete(send(frame)); + + // push to list + push(deferred, "Cannot send remove queue frame"); + + // done + return *deferred; } /** @@ -410,7 +459,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::setQos(uint16_t prefetchCount) +Deferred &ChannelImpl::setQos(uint16_t prefetchCount) { // send a qos frame return send(BasicQosFrame(_id, prefetchCount, false), "Cannot send basic QOS frame"); @@ -473,10 +522,19 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri * * }); */ -Deferred& ChannelImpl::cancel(const std::string &tag, int flags) +DeferredCancel &ChannelImpl::cancel(const std::string &tag, int flags) { - // send a cancel frame - return send(BasicCancelFrame(_id, tag, flags & nowait), "Cannot send basic cancel frame"); + // the cancel frame to send + BasicCancelFrame frame(_id, tag, flags & nowait); + + // send the frame, and create deferred object + auto *deferred = new DeferredCancel(send(frame)); + + // push to list + push(deferred, "Cannot send basic cancel frame"); + + // done + return *deferred; } /** @@ -510,7 +568,7 @@ bool ChannelImpl::reject(uint64_t deliveryTag, int flags) * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred<>& ChannelImpl::recover(int flags) +Deferred &ChannelImpl::recover(int flags) { // send a nack frame return send(BasicRecoverFrame(_id, flags & requeue), "Cannot send basic recover frame"); @@ -536,32 +594,16 @@ bool ChannelImpl::send(const Frame &frame) * @param frame frame to send * @param message the message to trigger if the frame cannot be send at all */ -template -Deferred& ChannelImpl::send(const Frame &frame, const char *message) +Deferred &ChannelImpl::send(const Frame &frame, const char *message) { - // create a new deferred handler and get a pointer to it - // note: cannot use auto here or the lambda below chokes - // when compiling under gcc 4.8 - Deferred *handler = &_callbacks.push_back(Deferred()); - - // send the frame over the channel - if (!send(frame)) - { - // we can immediately put the handler in failed state - handler->_failed = true; - - // register an error on the deferred handler - // after a timeout, so it gets called only - // after a possible handler was installed. - _connection->_handler->setTimeout(_connection->_parent, 0, [handler, message]() { - - // emit an error on the handler - handler->error(message); - }); - } - - // return the new handler - return *handler; + // send the frame, and create deferred object + auto *deferred = new Deferred(send(frame)); + + // push to list + push(deferred, message); + + // done + return *deferred; } /** @@ -569,6 +611,8 @@ Deferred& ChannelImpl::send(const Frame &frame, const char *messag */ void ChannelImpl::reportMessage() { + // @todo what does this method do? + // skip if there is no message if (!_message) return; diff --git a/src/queuedeclareokframe.h b/src/queuedeclareokframe.h index 875ff2b..eb660d8 100644 --- a/src/queuedeclareokframe.h +++ b/src/queuedeclareokframe.h @@ -132,8 +132,8 @@ public: // what if channel doesn't exist? if (!channel) return false; - // report to the handler, we need to specify template arguments otherwise a string will lose const and reference - channel->reportSuccess(this->name(), this->messageCount(), this->consumerCount()); + // report success + channel->reportSuccess(name(), messageCount(), consumerCount()); // done return true;