From 9653578d4ad74e61cb2ef480ba17da47389e8f35 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Tue, 2 Sep 2014 10:32:55 +0200 Subject: [PATCH] The Channel.get().onFinalize() function was called before the Channel.get().onSuccess() method, which is strange. This has been fixed. To fix this, we had to refactor the Deferred* into shared_ptrs --- include/channelimpl.h | 12 ++++++------ include/deferred.h | 26 ++++++++++++++------------ include/deferredcancel.h | 8 ++++++-- include/deferredconsumer.h | 8 ++++++-- include/deferreddelete.h | 8 ++++++-- include/deferredget.h | 17 ++++++++++++----- include/deferredqueue.h | 10 +++++++--- src/channelimpl.cpp | 31 ++++++++++++++----------------- src/deferredcancel.cpp | 2 +- src/deferredconsumer.cpp | 2 +- src/deferredget.cpp | 21 +++++++++------------ 11 files changed, 82 insertions(+), 63 deletions(-) diff --git a/include/channelimpl.h b/include/channelimpl.h index 284edff..56f1e75 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -55,14 +55,14 @@ private: * * @var Deferred */ - std::shared_ptr _oldestCallback = nullptr; + std::shared_ptr _oldestCallback; /** * Pointer to the newest deferred result (the last one to be added). * * @var Deferred */ - Deferred *_newestCallback = nullptr; + std::shared_ptr _newestCallback; /** * The channel number @@ -113,7 +113,7 @@ private: * @param result The deferred result * @return Deferred The object just pushed */ - Deferred &push(Deferred *deferred); + Deferred &push(const std::shared_ptr &deferred); /** * Send a framen and push a deferred result @@ -596,19 +596,19 @@ public: // we are going to call callbacks that could destruct the channel Monitor monitor(this); - + // copy the callback (so that it will not be destructed during // the "reportSuccess" call auto cb = _oldestCallback; // call the callback - auto *next = cb->reportSuccess(std::forward(parameters)...); + auto next = cb->reportSuccess(std::forward(parameters)...); // leap out if channel no longer exists if (!monitor.valid()) return false; // set the oldest callback - _oldestCallback.reset(next); + _oldestCallback = next; // if there was no next callback, the newest callback was just used if (!next) _newestCallback = nullptr; diff --git a/include/deferred.h b/include/deferred.h index 5edafec..34e8da3 100644 --- a/include/deferred.h +++ b/include/deferred.h @@ -15,7 +15,6 @@ namespace AMQP { // forward declaration class ChannelImpl; -class Callbacks; /** * Class definition @@ -46,7 +45,7 @@ protected: * Pointer to the next deferred object * @var Deferred */ - Deferred *_next = nullptr; + std::shared_ptr _next; /** * Do we already know we failed? @@ -59,7 +58,7 @@ protected: * The next deferred object * @return Deferred */ - Deferred *next() const + const std::shared_ptr &next() const { return _next; } @@ -68,7 +67,7 @@ protected: * Indicate success * @return Deferred Next deferred result */ - virtual Deferred *reportSuccess() const + virtual const std::shared_ptr &reportSuccess() const { // execute callbacks if registered if (_successCallback) _successCallback(); @@ -84,7 +83,7 @@ protected: * @param consumercount Number of consumers linked to the queue * @return Deferred Next deferred result */ - virtual Deferred *reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const + virtual const std::shared_ptr &reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const { // this is the same as a regular success message return reportSuccess(); @@ -95,7 +94,7 @@ protected: * @param messagecount Number of messages that were deleted * @return Deferred */ - virtual Deferred *reportSuccess(uint32_t messagecount) const + virtual const std::shared_ptr &reportSuccess(uint32_t messagecount) const { // this is the same as a regular success message return reportSuccess(); @@ -106,7 +105,7 @@ protected: * @param name Consumer tag that is cancelled * @return Deferred */ - virtual Deferred *reportSuccess(const std::string &name) const + virtual const std::shared_ptr &reportSuccess(const std::string &name) const { // this is the same as a regular success message return reportSuccess(); @@ -117,7 +116,7 @@ protected: * @param error Description of the error that occured * @return Deferred Next deferred result */ - Deferred *reportError(const char *error) + const std::shared_ptr &reportError(const char *error) { // from this moment on the object should be listed as failed _failed = true; @@ -133,7 +132,7 @@ protected: * Add a pointer to the next deferred result * @param deferred */ - void add(Deferred *deferred) + void add(const std::shared_ptr &deferred) { // store pointer _next = deferred; @@ -144,17 +143,20 @@ protected: * private members and construct us */ friend class ChannelImpl; - friend class Callbacks; -protected: +public: /** * Protected constructor that can only be called * from within the channel implementation + * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. * * @param failed are we already failed? */ Deferred(bool failed = false) : _failed(failed) {} - + public: /** * Deleted copy and move constructors diff --git a/include/deferredcancel.h b/include/deferredcancel.h index 0c1f8e6..c114709 100644 --- a/include/deferredcancel.h +++ b/include/deferredcancel.h @@ -36,7 +36,7 @@ private: * @param name Consumer tag that is cancelled * @return Deferred */ - virtual Deferred *reportSuccess(const std::string &name) const override; + virtual const std::shared_ptr &reportSuccess(const std::string &name) const override; /** * The channel implementation may call our @@ -45,11 +45,15 @@ private: friend class ChannelImpl; friend class ConsumedMessage; -protected: +public: /** * Protected constructor that can only be called * from within the channel implementation * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. + * * @param channel Pointer to the channel * @param failed Are we already failed? */ diff --git a/include/deferredconsumer.h b/include/deferredconsumer.h index c656728..659120c 100644 --- a/include/deferredconsumer.h +++ b/include/deferredconsumer.h @@ -41,7 +41,7 @@ private: * @param name Consumer tag that is started * @return Deferred */ - virtual Deferred *reportSuccess(const std::string &name) const override; + virtual const std::shared_ptr &reportSuccess(const std::string &name) const override; /** * The channel implementation may call our @@ -50,10 +50,14 @@ private: friend class ChannelImpl; friend class ConsumedMessage; -protected: +public: /** * Protected constructor that can only be called * from within the channel implementation + * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. * * @param channel the channel implementation * @param failed are we already failed? diff --git a/include/deferreddelete.h b/include/deferreddelete.h index d75614b..62c33cf 100644 --- a/include/deferreddelete.h +++ b/include/deferreddelete.h @@ -29,7 +29,7 @@ private: * @param messagecount Number of messages that were deleted * @return Deferred Next deferred result */ - virtual Deferred *reportSuccess(uint32_t messagecount) const override + virtual const std::shared_ptr &reportSuccess(uint32_t messagecount) const override { // skip if no special callback was installed if (!_deleteCallback) return Deferred::reportSuccess(); @@ -49,10 +49,14 @@ private: friend class ChannelImpl; friend class ConsumedMessage; -protected: +public: /** * Protected constructor that can only be called * from within the channel implementation + * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. * * @param boolean are we already failed? */ diff --git a/include/deferredget.h b/include/deferredget.h index db74621..72322a6 100644 --- a/include/deferredget.h +++ b/include/deferredget.h @@ -12,8 +12,12 @@ namespace AMQP { /** * Class definition + * + * This class implements the 'shared_from_this' functionality, because + * it grabs a self-pointer when the callback is running, otherwise the onFinalize() + * is called before the actual message is consumed. */ -class DeferredGet : public Deferred +class DeferredGet : public Deferred, public std::enable_shared_from_this { private: /** @@ -45,13 +49,13 @@ private: * @param count number of messages in the queue * @return Deferred */ - virtual Deferred *reportSuccess(uint32_t messagecount) const override; + virtual const std::shared_ptr &reportSuccess(uint32_t messagecount) const override; /** * Report success when queue was empty * @return Deferred */ - virtual Deferred *reportSuccess() const override; + virtual const std::shared_ptr &reportSuccess() const override; /** * The channel implementation may call our @@ -60,12 +64,15 @@ private: friend class ChannelImpl; friend class ConsumedMessage; - -protected: +public: /** * Protected constructor that can only be called * from within the channel implementation * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. + * * @param channel the channel implementation * @param failed are we already failed? */ diff --git a/include/deferredqueue.h b/include/deferredqueue.h index 05878f0..9001655 100644 --- a/include/deferredqueue.h +++ b/include/deferredqueue.h @@ -30,7 +30,7 @@ private: * @param consumercount Number of consumers linked to the queue * @return Deferred Next deferred result */ - virtual Deferred *reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const override + virtual const std::shared_ptr &reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const override { // skip if no special callback was installed if (!_queueCallback) return Deferred::reportSuccess(); @@ -49,12 +49,16 @@ private: friend class ChannelImpl; friend class ConsumedMessage; -protected: +public: /** * Protected constructor that can only be called * from within the channel implementation + * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. * - * @param boolea are we already failed? + * @param bool are we already failed? */ DeferredQueue(bool failed = false) : Deferred(failed) {} diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 4490176..8ab04f8 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -57,9 +57,6 @@ ChannelImpl::~ChannelImpl() // remove this channel from the connection (but not if the connection is already destructed) if (_connection) _connection->remove(this); - - // destruct deferred results - while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next()); } /** @@ -97,10 +94,10 @@ void ChannelImpl::attach(Connection *connection) * Push a deferred result * @param result The deferred object to push */ -Deferred &ChannelImpl::push(Deferred *deferred) +Deferred &ChannelImpl::push(const std::shared_ptr &deferred) { // do we already have an oldest? - if (!_oldestCallback) _oldestCallback.reset(deferred); + if (!_oldestCallback) _oldestCallback = deferred; // do we already have a newest? if (_newestCallback) _newestCallback->add(deferred); @@ -119,7 +116,7 @@ Deferred &ChannelImpl::push(Deferred *deferred) Deferred &ChannelImpl::push(const Frame &frame) { // send the frame, and push the result - return push(new Deferred(!send(frame))); + return push(std::make_shared(!send(frame))); } /** @@ -195,7 +192,7 @@ Deferred &ChannelImpl::rollbackTransaction() Deferred &ChannelImpl::close() { // this is completely pointless if not connected - if (_state != state_connected) return push(new Deferred(_state == state_closing)); + if (_state != state_connected) return push(std::make_shared(_state == state_closing)); // send a channel close frame auto &handler = push(ChannelCloseFrame(_id)); @@ -295,7 +292,7 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, false, arguments); // send the queuedeclareframe - auto *result = new DeferredQueue(!send(frame)); + auto result = std::make_shared(!send(frame)); // add the deferred result push(result); @@ -361,7 +358,7 @@ DeferredDelete &ChannelImpl::purgeQueue(const std::string &name) QueuePurgeFrame frame(_id, name, false); // send the frame, and create deferred object - auto *deferred = new DeferredDelete(!send(frame)); + auto deferred = std::make_shared(!send(frame)); // push to list push(deferred); @@ -394,7 +391,7 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, false); // send the frame, and create deferred object - auto *deferred = new DeferredDelete(!send(frame)); + auto deferred = std::make_shared(!send(frame)); // push to list push(deferred); @@ -503,7 +500,7 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, false, arguments); // send the frame, and create deferred object - auto *deferred = new DeferredConsumer(this, !send(frame)); + auto deferred = std::make_shared(this, !send(frame)); // push to list push(deferred); @@ -535,7 +532,7 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag) BasicCancelFrame frame(_id, tag, false); // send the frame, and create deferred object - auto *deferred = new DeferredCancel(this, !send(frame)); + auto deferred = std::make_shared(this, !send(frame)); // push to list push(deferred); @@ -582,7 +579,7 @@ DeferredGet &ChannelImpl::get(const std::string &queue, int flags) BasicGetFrame frame(_id, queue, flags & noack); // send the frame, and create deferred object - auto *deferred = new DeferredGet(this, !send(frame)); + auto deferred = std::make_shared(this, !send(frame)); // push to list push(deferred); @@ -765,13 +762,13 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler) auto cb = _oldestCallback; // call the callback - auto *next = cb->reportError(message); + auto next = cb->reportError(message); // leap out if channel no longer exists if (!monitor.valid()) return; // set the oldest callback - _oldestCallback.reset(next); + _oldestCallback = next; } // clean up all deferred other objects @@ -782,13 +779,13 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler) auto cb = _oldestCallback; // call the callback - auto *next = cb->reportError("Channel is in error state"); + auto next = cb->reportError("Channel is in error state"); // leap out if channel no longer exists if (!monitor.valid()) return; // set the oldest callback - _oldestCallback.reset(next); + _oldestCallback = next; } // all callbacks have been processed, so we also can reset the pointer to the newest diff --git a/src/deferredcancel.cpp b/src/deferredcancel.cpp index 384ce9b..e747e69 100644 --- a/src/deferredcancel.cpp +++ b/src/deferredcancel.cpp @@ -17,7 +17,7 @@ namespace AMQP { * @param name Consumer tag that is cancelled * @return Deferred */ -Deferred *DeferredCancel::reportSuccess(const std::string &name) const +const std::shared_ptr &DeferredCancel::reportSuccess(const std::string &name) const { // in the channel, we should uninstall the consumer _channel->uninstall(name); diff --git a/src/deferredconsumer.cpp b/src/deferredconsumer.cpp index 63e3f1c..1661168 100644 --- a/src/deferredconsumer.cpp +++ b/src/deferredconsumer.cpp @@ -17,7 +17,7 @@ namespace AMQP { * @param name Consumer tag that is started * @return Deferred */ -Deferred *DeferredConsumer::reportSuccess(const std::string &name) const +const std::shared_ptr &DeferredConsumer::reportSuccess(const std::string &name) const { // we now know the name, so we can install the message callback on the channel _channel->install(name, _messageCallback); diff --git a/src/deferredget.cpp b/src/deferredget.cpp index 0a551db..cbfe51b 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -22,29 +22,25 @@ namespace AMQP { * @param messageCount Message count * @return Deferred */ -Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const +const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messageCount) const { - // make copies of the callbacks - auto messageCallback = _messageCallback; - auto *channel = _channel; - - // install a monitor because the channel could be destructed - Monitor monitor(channel); + // we grab a self pointer to ensure that the deferred object stays alive + auto self = shared_from_this(); // report the size (technically, the channel object could be destructed now, but we ignore that case) if (_sizeCallback) _sizeCallback(messageCount); // we now know the name, so we can install the message callback on the channel - _channel->install("", [channel, messageCallback](const Message &message, uint64_t deliveryTag, bool redelivered) { + _channel->install("", [self, this](const Message &message, uint64_t deliveryTag, bool redelivered) { // install a monitor to deal with the case that the channel is removed - Monitor monitor(channel); + Monitor monitor(_channel); // call the callbacks - if (messageCallback) messageCallback(message, deliveryTag, redelivered); + if (_messageCallback) _messageCallback(message, deliveryTag, redelivered); // we can remove the callback now from the channel - if (monitor.valid()) channel->uninstall(""); + if (monitor.valid()) _channel->uninstall(""); }); // return next object @@ -53,8 +49,9 @@ Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const /** * Report success, although no message could be get + * @return Deferred */ -Deferred *DeferredGet::reportSuccess() const +const std::shared_ptr &DeferredGet::reportSuccess() const { // report the size if (_sizeCallback) _sizeCallback(0);