diff --git a/amqpcpp.h b/amqpcpp.h index 06f19f8..0065da6 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -34,6 +34,7 @@ #include #include #include +#include // amqp types #include diff --git a/include/channelimpl.h b/include/channelimpl.h index 8666240..91d666c 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -61,7 +61,7 @@ private: * * @var Deferred */ - Deferred *_oldestCallback = nullptr; + std::unique_ptr _oldestCallback = nullptr; /** * Pointer to the newest deferred result (the last one to be added). @@ -464,11 +464,21 @@ public: { // skip if there is no oldest callback if (!_oldestCallback) return; + + // we are going to call callbacks that could destruct the channel + Monitor monitor(this); + + // call the callback + auto *next = _oldestCallback->reportSuccess(std::forward(parameters)...); - // report to the oldest callback, and install a new oldest callback - _oldestCallback = _oldestCallback->reportSuccess(std::forward(parameters)...); + // leap out if channel no longer exists + if (!monitor.valid()) return; - // @todo destruct oldest callback + // set the oldest callback + _oldestCallback.reset(next); + + // if there was no next callback, the newest callback was just used + if (!next) _newestCallback = nullptr; } /** @@ -480,19 +490,28 @@ public: // change state _state = state_closed; - // @todo multiple callbacks are called, this could break + // we are going to call callbacks that could destruct the channel + Monitor monitor(this); + // @todo should this be a std::string parameter? // inform handler if (_errorCallback) _errorCallback(message.c_str()); + + // leap out if channel is already destructed, or when there are no further callbacks + if (!monitor.valid() || !_oldestCallback) return; - // skip if there is no oldest callback - if (!_oldestCallback) return; - - // report to the oldest callback, and install a new oldest callback - _oldestCallback = _oldestCallback->reportError(message); - - // @todo destruct oldest callback + // call the callback + auto *next = _oldestCallback->reportError(message); + + // leap out if channel no longer exists + if (!monitor.valid()) return; + + // set the oldest callback + _oldestCallback.reset(next); + + // if there was no next callback, the newest callback was just used + if (!next) _newestCallback = nullptr; } /** diff --git a/include/deferred.h b/include/deferred.h index 777a370..f2e849c 100644 --- a/include/deferred.h +++ b/include/deferred.h @@ -54,6 +54,15 @@ protected: bool _failed; + /** + * The next deferred object + * @return Deferred + */ + Deferred *next() const + { + return _next; + } + /** * Indicate success * @return Deferred Next deferred result diff --git a/src/monitor.h b/include/monitor.h similarity index 100% rename from src/monitor.h rename to include/monitor.h diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index a28bf77..8d93563 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -86,8 +86,8 @@ ChannelImpl::~ChannelImpl() // close the channel now close(); - - // @todo destruct deferred resutls + // destruct deferred results + while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next()); } /** @@ -98,7 +98,7 @@ ChannelImpl::~ChannelImpl() void ChannelImpl::push(Deferred *deferred, const char *error) { // do we already have an oldest? - if (!_oldestCallback) _oldestCallback = deferred; + if (!_oldestCallback) _oldestCallback.reset(deferred); // do we already have a newest? if (_newestCallback) _newestCallback->add(deferred); diff --git a/src/includes.h b/src/includes.h index 52b31d0..2a75192 100644 --- a/src/includes.h +++ b/src/includes.h @@ -11,7 +11,6 @@ #include "../amqpcpp.h" // classes that are very commonly used -#include "monitor.h" #include "exception.h" #include "protocolexception.h" #include "frame.h"