From ae7a32a8bf7c602171c510fdd0871d6dd10dbd26 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Tue, 15 Apr 2014 12:36:11 +0200 Subject: [PATCH] when a consumer is cancelled, it is also removed from the map of active consumers in the the ChannelImpl object --- include/channelimpl.h | 12 ++++++++++-- include/deferredcancel.h | 27 +++++++++++--------------- src/channelimpl.cpp | 2 +- src/deferredcancel.cpp | 42 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 64 insertions(+), 19 deletions(-) create mode 100644 src/deferredcancel.cpp diff --git a/include/channelimpl.h b/include/channelimpl.h index ba0d9e4..0f607bf 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -493,8 +493,6 @@ public: // we are going to call callbacks that could destruct the channel Monitor monitor(this); - // @todo should this be a std::string parameter? - // inform handler if (_errorCallback) _errorCallback(message); @@ -528,6 +526,16 @@ public: else _consumers.erase(consumertag); } + /** + * Uninstall a consumer callback + * @param consumertag The consumer tag + */ + void uninstall(const std::string &consumertag) + { + // erase the callback + _consumers.erase(consumertag); + } + /** * Report that a message was received */ diff --git a/include/deferredcancel.h b/include/deferredcancel.h index 401716d..244e145 100644 --- a/include/deferredcancel.h +++ b/include/deferredcancel.h @@ -19,6 +19,12 @@ namespace AMQP { class DeferredCancel : public Deferred { private: + /** + * Pointer to the channel + * @var ChannelImpl + */ + ChannelImpl *_channel; + /** * Callback to execute when the instruction is completed * @var CancelCallback @@ -30,20 +36,7 @@ private: * @param name Consumer tag that is cancelled * @return Deferred */ - virtual Deferred *reportSuccess(const std::string &name) const override - { - // skip if no special callback was installed - if (!_cancelCallback) return Deferred::reportSuccess(); - - // call the callback - _cancelCallback(name); - - // call finalize callback - if (_finalizeCallback) _finalizeCallback(); - - // return next object - return _next; - } + virtual Deferred *reportSuccess(const std::string &name) const override; /** * The channel implementation may call our @@ -57,9 +50,11 @@ protected: * Protected constructor that can only be called * from within the channel implementation * - * @param boolean are we already failed? + * @param channel Pointer to the channel + * @param failed Are we already failed? */ - DeferredCancel(bool failed = false) : Deferred(failed) {} + DeferredCancel(ChannelImpl *channel, bool failed = false) : + Deferred(failed), _channel(channel) {} public: /** diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 8d93563..aac9ce6 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -524,7 +524,7 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag, int flags) BasicCancelFrame frame(_id, tag, flags & nowait); // send the frame, and create deferred object - auto *deferred = new DeferredCancel(send(frame)); + auto *deferred = new DeferredCancel(this, send(frame)); // push to list push(deferred, "Cannot send basic cancel frame"); diff --git a/src/deferredcancel.cpp b/src/deferredcancel.cpp new file mode 100644 index 0000000..aaecebe --- /dev/null +++ b/src/deferredcancel.cpp @@ -0,0 +1,42 @@ +/** + * DeferredCancel.cpp + * + * Implementation file for the DeferredCancel class + * + * @copyright 2014 Copernica BV + */ +#include "includes.h" + +/** + * Namespace + */ +namespace AMQP { + +/** + * Report success for frames that report cancel operations + * @param name Consumer tag that is cancelled + * @return Deferred + */ +Deferred *DeferredCancel::reportSuccess(const std::string &name) const +{ + // in the channel, we should uninstall the consumer + _channel->uninstall(name); + + // skip if no special callback was installed + if (!_cancelCallback) return Deferred::reportSuccess(); + + // call the callback + _cancelCallback(name); + + // call finalize callback + if (_finalizeCallback) _finalizeCallback(); + + // return next object + return _next; +} + +/** + * End namespace + */ +} +