The Channel.get().onFinalize() function was called before the Channel.get().onSuccess() method, which is strange. This has been fixed. To fix this, we had to refactor the Deferred* into shared_ptrs

This commit is contained in:
Emiel Bruijntjes 2014-09-02 10:32:55 +02:00
parent 6cc25c4b27
commit 9653578d4a
11 changed files with 82 additions and 63 deletions

View File

@ -55,14 +55,14 @@ private:
* *
* @var Deferred * @var Deferred
*/ */
std::shared_ptr<Deferred> _oldestCallback = nullptr; std::shared_ptr<Deferred> _oldestCallback;
/** /**
* Pointer to the newest deferred result (the last one to be added). * Pointer to the newest deferred result (the last one to be added).
* *
* @var Deferred * @var Deferred
*/ */
Deferred *_newestCallback = nullptr; std::shared_ptr<Deferred> _newestCallback;
/** /**
* The channel number * The channel number
@ -113,7 +113,7 @@ private:
* @param result The deferred result * @param result The deferred result
* @return Deferred The object just pushed * @return Deferred The object just pushed
*/ */
Deferred &push(Deferred *deferred); Deferred &push(const std::shared_ptr<Deferred> &deferred);
/** /**
* Send a framen and push a deferred result * Send a framen and push a deferred result
@ -596,19 +596,19 @@ public:
// we are going to call callbacks that could destruct the channel // we are going to call callbacks that could destruct the channel
Monitor monitor(this); Monitor monitor(this);
// copy the callback (so that it will not be destructed during // copy the callback (so that it will not be destructed during
// the "reportSuccess" call // the "reportSuccess" call
auto cb = _oldestCallback; auto cb = _oldestCallback;
// call the callback // call the callback
auto *next = cb->reportSuccess(std::forward<Arguments>(parameters)...); auto next = cb->reportSuccess(std::forward<Arguments>(parameters)...);
// leap out if channel no longer exists // leap out if channel no longer exists
if (!monitor.valid()) return false; if (!monitor.valid()) return false;
// set the oldest callback // set the oldest callback
_oldestCallback.reset(next); _oldestCallback = next;
// if there was no next callback, the newest callback was just used // if there was no next callback, the newest callback was just used
if (!next) _newestCallback = nullptr; if (!next) _newestCallback = nullptr;

View File

@ -15,7 +15,6 @@ namespace AMQP {
// forward declaration // forward declaration
class ChannelImpl; class ChannelImpl;
class Callbacks;
/** /**
* Class definition * Class definition
@ -46,7 +45,7 @@ protected:
* Pointer to the next deferred object * Pointer to the next deferred object
* @var Deferred * @var Deferred
*/ */
Deferred *_next = nullptr; std::shared_ptr<Deferred> _next;
/** /**
* Do we already know we failed? * Do we already know we failed?
@ -59,7 +58,7 @@ protected:
* The next deferred object * The next deferred object
* @return Deferred * @return Deferred
*/ */
Deferred *next() const const std::shared_ptr<Deferred> &next() const
{ {
return _next; return _next;
} }
@ -68,7 +67,7 @@ protected:
* Indicate success * Indicate success
* @return Deferred Next deferred result * @return Deferred Next deferred result
*/ */
virtual Deferred *reportSuccess() const virtual const std::shared_ptr<Deferred> &reportSuccess() const
{ {
// execute callbacks if registered // execute callbacks if registered
if (_successCallback) _successCallback(); if (_successCallback) _successCallback();
@ -84,7 +83,7 @@ protected:
* @param consumercount Number of consumers linked to the queue * @param consumercount Number of consumers linked to the queue
* @return Deferred Next deferred result * @return Deferred Next deferred result
*/ */
virtual Deferred *reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const
{ {
// this is the same as a regular success message // this is the same as a regular success message
return reportSuccess(); return reportSuccess();
@ -95,7 +94,7 @@ protected:
* @param messagecount Number of messages that were deleted * @param messagecount Number of messages that were deleted
* @return Deferred * @return Deferred
*/ */
virtual Deferred *reportSuccess(uint32_t messagecount) const virtual const std::shared_ptr<Deferred> &reportSuccess(uint32_t messagecount) const
{ {
// this is the same as a regular success message // this is the same as a regular success message
return reportSuccess(); return reportSuccess();
@ -106,7 +105,7 @@ protected:
* @param name Consumer tag that is cancelled * @param name Consumer tag that is cancelled
* @return Deferred * @return Deferred
*/ */
virtual Deferred *reportSuccess(const std::string &name) const virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) const
{ {
// this is the same as a regular success message // this is the same as a regular success message
return reportSuccess(); return reportSuccess();
@ -117,7 +116,7 @@ protected:
* @param error Description of the error that occured * @param error Description of the error that occured
* @return Deferred Next deferred result * @return Deferred Next deferred result
*/ */
Deferred *reportError(const char *error) const std::shared_ptr<Deferred> &reportError(const char *error)
{ {
// from this moment on the object should be listed as failed // from this moment on the object should be listed as failed
_failed = true; _failed = true;
@ -133,7 +132,7 @@ protected:
* Add a pointer to the next deferred result * Add a pointer to the next deferred result
* @param deferred * @param deferred
*/ */
void add(Deferred *deferred) void add(const std::shared_ptr<Deferred> &deferred)
{ {
// store pointer // store pointer
_next = deferred; _next = deferred;
@ -144,17 +143,20 @@ protected:
* private members and construct us * private members and construct us
*/ */
friend class ChannelImpl; friend class ChannelImpl;
friend class Callbacks;
protected: public:
/** /**
* Protected constructor that can only be called * Protected constructor that can only be called
* from within the channel implementation * from within the channel implementation
*
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code.
* *
* @param failed are we already failed? * @param failed are we already failed?
*/ */
Deferred(bool failed = false) : _failed(failed) {} Deferred(bool failed = false) : _failed(failed) {}
public: public:
/** /**
* Deleted copy and move constructors * Deleted copy and move constructors

View File

@ -36,7 +36,7 @@ private:
* @param name Consumer tag that is cancelled * @param name Consumer tag that is cancelled
* @return Deferred * @return Deferred
*/ */
virtual Deferred *reportSuccess(const std::string &name) const override; virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) const override;
/** /**
* The channel implementation may call our * The channel implementation may call our
@ -45,11 +45,15 @@ private:
friend class ChannelImpl; friend class ChannelImpl;
friend class ConsumedMessage; friend class ConsumedMessage;
protected: public:
/** /**
* Protected constructor that can only be called * Protected constructor that can only be called
* from within the channel implementation * from within the channel implementation
* *
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code.
*
* @param channel Pointer to the channel * @param channel Pointer to the channel
* @param failed Are we already failed? * @param failed Are we already failed?
*/ */

View File

@ -41,7 +41,7 @@ private:
* @param name Consumer tag that is started * @param name Consumer tag that is started
* @return Deferred * @return Deferred
*/ */
virtual Deferred *reportSuccess(const std::string &name) const override; virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) const override;
/** /**
* The channel implementation may call our * The channel implementation may call our
@ -50,10 +50,14 @@ private:
friend class ChannelImpl; friend class ChannelImpl;
friend class ConsumedMessage; friend class ConsumedMessage;
protected: public:
/** /**
* Protected constructor that can only be called * Protected constructor that can only be called
* from within the channel implementation * from within the channel implementation
*
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code.
* *
* @param channel the channel implementation * @param channel the channel implementation
* @param failed are we already failed? * @param failed are we already failed?

View File

@ -29,7 +29,7 @@ private:
* @param messagecount Number of messages that were deleted * @param messagecount Number of messages that were deleted
* @return Deferred Next deferred result * @return Deferred Next deferred result
*/ */
virtual Deferred *reportSuccess(uint32_t messagecount) const override virtual const std::shared_ptr<Deferred> &reportSuccess(uint32_t messagecount) const override
{ {
// skip if no special callback was installed // skip if no special callback was installed
if (!_deleteCallback) return Deferred::reportSuccess(); if (!_deleteCallback) return Deferred::reportSuccess();
@ -49,10 +49,14 @@ private:
friend class ChannelImpl; friend class ChannelImpl;
friend class ConsumedMessage; friend class ConsumedMessage;
protected: public:
/** /**
* Protected constructor that can only be called * Protected constructor that can only be called
* from within the channel implementation * from within the channel implementation
*
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code.
* *
* @param boolean are we already failed? * @param boolean are we already failed?
*/ */

View File

@ -12,8 +12,12 @@ namespace AMQP {
/** /**
* Class definition * Class definition
*
* This class implements the 'shared_from_this' functionality, because
* it grabs a self-pointer when the callback is running, otherwise the onFinalize()
* is called before the actual message is consumed.
*/ */
class DeferredGet : public Deferred class DeferredGet : public Deferred, public std::enable_shared_from_this<DeferredGet>
{ {
private: private:
/** /**
@ -45,13 +49,13 @@ private:
* @param count number of messages in the queue * @param count number of messages in the queue
* @return Deferred * @return Deferred
*/ */
virtual Deferred *reportSuccess(uint32_t messagecount) const override; virtual const std::shared_ptr<Deferred> &reportSuccess(uint32_t messagecount) const override;
/** /**
* Report success when queue was empty * Report success when queue was empty
* @return Deferred * @return Deferred
*/ */
virtual Deferred *reportSuccess() const override; virtual const std::shared_ptr<Deferred> &reportSuccess() const override;
/** /**
* The channel implementation may call our * The channel implementation may call our
@ -60,12 +64,15 @@ private:
friend class ChannelImpl; friend class ChannelImpl;
friend class ConsumedMessage; friend class ConsumedMessage;
public:
protected:
/** /**
* Protected constructor that can only be called * Protected constructor that can only be called
* from within the channel implementation * from within the channel implementation
* *
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code.
*
* @param channel the channel implementation * @param channel the channel implementation
* @param failed are we already failed? * @param failed are we already failed?
*/ */

View File

@ -30,7 +30,7 @@ private:
* @param consumercount Number of consumers linked to the queue * @param consumercount Number of consumers linked to the queue
* @return Deferred Next deferred result * @return Deferred Next deferred result
*/ */
virtual Deferred *reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const override virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const override
{ {
// skip if no special callback was installed // skip if no special callback was installed
if (!_queueCallback) return Deferred::reportSuccess(); if (!_queueCallback) return Deferred::reportSuccess();
@ -49,12 +49,16 @@ private:
friend class ChannelImpl; friend class ChannelImpl;
friend class ConsumedMessage; friend class ConsumedMessage;
protected: public:
/** /**
* Protected constructor that can only be called * Protected constructor that can only be called
* from within the channel implementation * from within the channel implementation
*
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code.
* *
* @param boolea are we already failed? * @param bool are we already failed?
*/ */
DeferredQueue(bool failed = false) : Deferred(failed) {} DeferredQueue(bool failed = false) : Deferred(failed) {}

View File

@ -57,9 +57,6 @@ ChannelImpl::~ChannelImpl()
// remove this channel from the connection (but not if the connection is already destructed) // remove this channel from the connection (but not if the connection is already destructed)
if (_connection) _connection->remove(this); if (_connection) _connection->remove(this);
// destruct deferred results
while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next());
} }
/** /**
@ -97,10 +94,10 @@ void ChannelImpl::attach(Connection *connection)
* Push a deferred result * Push a deferred result
* @param result The deferred object to push * @param result The deferred object to push
*/ */
Deferred &ChannelImpl::push(Deferred *deferred) Deferred &ChannelImpl::push(const std::shared_ptr<Deferred> &deferred)
{ {
// do we already have an oldest? // do we already have an oldest?
if (!_oldestCallback) _oldestCallback.reset(deferred); if (!_oldestCallback) _oldestCallback = deferred;
// do we already have a newest? // do we already have a newest?
if (_newestCallback) _newestCallback->add(deferred); if (_newestCallback) _newestCallback->add(deferred);
@ -119,7 +116,7 @@ Deferred &ChannelImpl::push(Deferred *deferred)
Deferred &ChannelImpl::push(const Frame &frame) Deferred &ChannelImpl::push(const Frame &frame)
{ {
// send the frame, and push the result // send the frame, and push the result
return push(new Deferred(!send(frame))); return push(std::make_shared<Deferred>(!send(frame)));
} }
/** /**
@ -195,7 +192,7 @@ Deferred &ChannelImpl::rollbackTransaction()
Deferred &ChannelImpl::close() Deferred &ChannelImpl::close()
{ {
// this is completely pointless if not connected // this is completely pointless if not connected
if (_state != state_connected) return push(new Deferred(_state == state_closing)); if (_state != state_connected) return push(std::make_shared<Deferred>(_state == state_closing));
// send a channel close frame // send a channel close frame
auto &handler = push(ChannelCloseFrame(_id)); auto &handler = push(ChannelCloseFrame(_id));
@ -295,7 +292,7 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con
QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, false, arguments); QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, false, arguments);
// send the queuedeclareframe // send the queuedeclareframe
auto *result = new DeferredQueue(!send(frame)); auto result = std::make_shared<DeferredQueue>(!send(frame));
// add the deferred result // add the deferred result
push(result); push(result);
@ -361,7 +358,7 @@ DeferredDelete &ChannelImpl::purgeQueue(const std::string &name)
QueuePurgeFrame frame(_id, name, false); QueuePurgeFrame frame(_id, name, false);
// send the frame, and create deferred object // send the frame, and create deferred object
auto *deferred = new DeferredDelete(!send(frame)); auto deferred = std::make_shared<DeferredDelete>(!send(frame));
// push to list // push to list
push(deferred); push(deferred);
@ -394,7 +391,7 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, false); QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, false);
// send the frame, and create deferred object // send the frame, and create deferred object
auto *deferred = new DeferredDelete(!send(frame)); auto deferred = std::make_shared<DeferredDelete>(!send(frame));
// push to list // push to list
push(deferred); push(deferred);
@ -503,7 +500,7 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri
BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, false, arguments); BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, false, arguments);
// send the frame, and create deferred object // send the frame, and create deferred object
auto *deferred = new DeferredConsumer(this, !send(frame)); auto deferred = std::make_shared<DeferredConsumer>(this, !send(frame));
// push to list // push to list
push(deferred); push(deferred);
@ -535,7 +532,7 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag)
BasicCancelFrame frame(_id, tag, false); BasicCancelFrame frame(_id, tag, false);
// send the frame, and create deferred object // send the frame, and create deferred object
auto *deferred = new DeferredCancel(this, !send(frame)); auto deferred = std::make_shared<DeferredCancel>(this, !send(frame));
// push to list // push to list
push(deferred); push(deferred);
@ -582,7 +579,7 @@ DeferredGet &ChannelImpl::get(const std::string &queue, int flags)
BasicGetFrame frame(_id, queue, flags & noack); BasicGetFrame frame(_id, queue, flags & noack);
// send the frame, and create deferred object // send the frame, and create deferred object
auto *deferred = new DeferredGet(this, !send(frame)); auto deferred = std::make_shared<DeferredGet>(this, !send(frame));
// push to list // push to list
push(deferred); push(deferred);
@ -765,13 +762,13 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler)
auto cb = _oldestCallback; auto cb = _oldestCallback;
// call the callback // call the callback
auto *next = cb->reportError(message); auto next = cb->reportError(message);
// leap out if channel no longer exists // leap out if channel no longer exists
if (!monitor.valid()) return; if (!monitor.valid()) return;
// set the oldest callback // set the oldest callback
_oldestCallback.reset(next); _oldestCallback = next;
} }
// clean up all deferred other objects // clean up all deferred other objects
@ -782,13 +779,13 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler)
auto cb = _oldestCallback; auto cb = _oldestCallback;
// call the callback // call the callback
auto *next = cb->reportError("Channel is in error state"); auto next = cb->reportError("Channel is in error state");
// leap out if channel no longer exists // leap out if channel no longer exists
if (!monitor.valid()) return; if (!monitor.valid()) return;
// set the oldest callback // set the oldest callback
_oldestCallback.reset(next); _oldestCallback = next;
} }
// all callbacks have been processed, so we also can reset the pointer to the newest // all callbacks have been processed, so we also can reset the pointer to the newest

View File

@ -17,7 +17,7 @@ namespace AMQP {
* @param name Consumer tag that is cancelled * @param name Consumer tag that is cancelled
* @return Deferred * @return Deferred
*/ */
Deferred *DeferredCancel::reportSuccess(const std::string &name) const const std::shared_ptr<Deferred> &DeferredCancel::reportSuccess(const std::string &name) const
{ {
// in the channel, we should uninstall the consumer // in the channel, we should uninstall the consumer
_channel->uninstall(name); _channel->uninstall(name);

View File

@ -17,7 +17,7 @@ namespace AMQP {
* @param name Consumer tag that is started * @param name Consumer tag that is started
* @return Deferred * @return Deferred
*/ */
Deferred *DeferredConsumer::reportSuccess(const std::string &name) const const std::shared_ptr<Deferred> &DeferredConsumer::reportSuccess(const std::string &name) const
{ {
// we now know the name, so we can install the message callback on the channel // we now know the name, so we can install the message callback on the channel
_channel->install(name, _messageCallback); _channel->install(name, _messageCallback);

View File

@ -22,29 +22,25 @@ namespace AMQP {
* @param messageCount Message count * @param messageCount Message count
* @return Deferred * @return Deferred
*/ */
Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messageCount) const
{ {
// make copies of the callbacks // we grab a self pointer to ensure that the deferred object stays alive
auto messageCallback = _messageCallback; auto self = shared_from_this();
auto *channel = _channel;
// install a monitor because the channel could be destructed
Monitor monitor(channel);
// report the size (technically, the channel object could be destructed now, but we ignore that case) // report the size (technically, the channel object could be destructed now, but we ignore that case)
if (_sizeCallback) _sizeCallback(messageCount); if (_sizeCallback) _sizeCallback(messageCount);
// we now know the name, so we can install the message callback on the channel // we now know the name, so we can install the message callback on the channel
_channel->install("", [channel, messageCallback](const Message &message, uint64_t deliveryTag, bool redelivered) { _channel->install("", [self, this](const Message &message, uint64_t deliveryTag, bool redelivered) {
// install a monitor to deal with the case that the channel is removed // install a monitor to deal with the case that the channel is removed
Monitor monitor(channel); Monitor monitor(_channel);
// call the callbacks // call the callbacks
if (messageCallback) messageCallback(message, deliveryTag, redelivered); if (_messageCallback) _messageCallback(message, deliveryTag, redelivered);
// we can remove the callback now from the channel // we can remove the callback now from the channel
if (monitor.valid()) channel->uninstall(""); if (monitor.valid()) _channel->uninstall("");
}); });
// return next object // return next object
@ -53,8 +49,9 @@ Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const
/** /**
* Report success, although no message could be get * Report success, although no message could be get
* @return Deferred
*/ */
Deferred *DeferredGet::reportSuccess() const const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
{ {
// report the size // report the size
if (_sizeCallback) _sizeCallback(0); if (_sizeCallback) _sizeCallback(0);