deferred objects are now correctly destructed + added extra checks so that no crashes occur when someone destructs a channel inside a callback function
This commit is contained in:
parent
745ab512a5
commit
301b8153e3
|
|
@ -34,6 +34,7 @@
|
||||||
#include <amqpcpp/receivedframe.h>
|
#include <amqpcpp/receivedframe.h>
|
||||||
#include <amqpcpp/outbuffer.h>
|
#include <amqpcpp/outbuffer.h>
|
||||||
#include <amqpcpp/watchable.h>
|
#include <amqpcpp/watchable.h>
|
||||||
|
#include <amqpcpp/monitor.h>
|
||||||
|
|
||||||
// amqp types
|
// amqp types
|
||||||
#include <amqpcpp/field.h>
|
#include <amqpcpp/field.h>
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ private:
|
||||||
*
|
*
|
||||||
* @var Deferred
|
* @var Deferred
|
||||||
*/
|
*/
|
||||||
Deferred *_oldestCallback = nullptr;
|
std::unique_ptr<Deferred> _oldestCallback = nullptr;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pointer to the newest deferred result (the last one to be added).
|
* Pointer to the newest deferred result (the last one to be added).
|
||||||
|
|
@ -464,11 +464,21 @@ public:
|
||||||
{
|
{
|
||||||
// skip if there is no oldest callback
|
// skip if there is no oldest callback
|
||||||
if (!_oldestCallback) return;
|
if (!_oldestCallback) return;
|
||||||
|
|
||||||
|
// we are going to call callbacks that could destruct the channel
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
// call the callback
|
||||||
|
auto *next = _oldestCallback->reportSuccess(std::forward<Arguments>(parameters)...);
|
||||||
|
|
||||||
// report to the oldest callback, and install a new oldest callback
|
// leap out if channel no longer exists
|
||||||
_oldestCallback = _oldestCallback->reportSuccess(std::forward<Arguments>(parameters)...);
|
if (!monitor.valid()) return;
|
||||||
|
|
||||||
// @todo destruct oldest callback
|
// set the oldest callback
|
||||||
|
_oldestCallback.reset(next);
|
||||||
|
|
||||||
|
// if there was no next callback, the newest callback was just used
|
||||||
|
if (!next) _newestCallback = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -480,19 +490,28 @@ public:
|
||||||
// change state
|
// change state
|
||||||
_state = state_closed;
|
_state = state_closed;
|
||||||
|
|
||||||
// @todo multiple callbacks are called, this could break
|
// we are going to call callbacks that could destruct the channel
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
// @todo should this be a std::string parameter?
|
// @todo should this be a std::string parameter?
|
||||||
|
|
||||||
// inform handler
|
// inform handler
|
||||||
if (_errorCallback) _errorCallback(message.c_str());
|
if (_errorCallback) _errorCallback(message.c_str());
|
||||||
|
|
||||||
|
// leap out if channel is already destructed, or when there are no further callbacks
|
||||||
|
if (!monitor.valid() || !_oldestCallback) return;
|
||||||
|
|
||||||
// skip if there is no oldest callback
|
// call the callback
|
||||||
if (!_oldestCallback) return;
|
auto *next = _oldestCallback->reportError(message);
|
||||||
|
|
||||||
// report to the oldest callback, and install a new oldest callback
|
// leap out if channel no longer exists
|
||||||
_oldestCallback = _oldestCallback->reportError(message);
|
if (!monitor.valid()) return;
|
||||||
|
|
||||||
// @todo destruct oldest callback
|
// set the oldest callback
|
||||||
|
_oldestCallback.reset(next);
|
||||||
|
|
||||||
|
// if there was no next callback, the newest callback was just used
|
||||||
|
if (!next) _newestCallback = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -54,6 +54,15 @@ protected:
|
||||||
bool _failed;
|
bool _failed;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The next deferred object
|
||||||
|
* @return Deferred
|
||||||
|
*/
|
||||||
|
Deferred *next() const
|
||||||
|
{
|
||||||
|
return _next;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Indicate success
|
* Indicate success
|
||||||
* @return Deferred Next deferred result
|
* @return Deferred Next deferred result
|
||||||
|
|
|
||||||
|
|
@ -86,8 +86,8 @@ ChannelImpl::~ChannelImpl()
|
||||||
// close the channel now
|
// close the channel now
|
||||||
close();
|
close();
|
||||||
|
|
||||||
|
// destruct deferred results
|
||||||
// @todo destruct deferred resutls
|
while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -98,7 +98,7 @@ ChannelImpl::~ChannelImpl()
|
||||||
void ChannelImpl::push(Deferred *deferred, const char *error)
|
void ChannelImpl::push(Deferred *deferred, const char *error)
|
||||||
{
|
{
|
||||||
// do we already have an oldest?
|
// do we already have an oldest?
|
||||||
if (!_oldestCallback) _oldestCallback = deferred;
|
if (!_oldestCallback) _oldestCallback.reset(deferred);
|
||||||
|
|
||||||
// do we already have a newest?
|
// do we already have a newest?
|
||||||
if (_newestCallback) _newestCallback->add(deferred);
|
if (_newestCallback) _newestCallback->add(deferred);
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,6 @@
|
||||||
#include "../amqpcpp.h"
|
#include "../amqpcpp.h"
|
||||||
|
|
||||||
// classes that are very commonly used
|
// classes that are very commonly used
|
||||||
#include "monitor.h"
|
|
||||||
#include "exception.h"
|
#include "exception.h"
|
||||||
#include "protocolexception.h"
|
#include "protocolexception.h"
|
||||||
#include "frame.h"
|
#include "frame.h"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue