From 6997a70cf1ca904cca162493367a48291c59f03d Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Wed, 20 Aug 2014 11:47:16 +0200 Subject: [PATCH] when a connection was closed, the commands already given to the channels were sometimes by-passed by the closing channel. This has been fixed. Also solved the problem that calls executed on channel objects that fall out of scope were not always completed --- include/channel.h | 108 +++++++++++++------------- include/channelimpl.h | 126 +++++++++++++------------------ include/connectionimpl.h | 15 +++- src/basiccancelokframe.h | 4 +- src/basicconsumeokframe.h | 4 +- src/basicdeliverframe.h | 2 +- src/basicgetemptyframe.h | 4 +- src/basicgetokframe.h | 2 +- src/basicheaderframe.h | 2 +- src/basicqosokframe.h | 4 +- src/basicrecoverokframe.h | 2 +- src/bodyframe.h | 2 +- src/channelcloseframe.h | 2 +- src/channelcloseokframe.h | 4 +- src/channelflowokframe.h | 4 +- src/channelimpl.cpp | 125 +++++++++++++++++++++++++++--- src/channelopenokframe.h | 2 +- src/connectioncloseframe.h | 9 +++ src/connectionimpl.cpp | 64 ++++++++++++---- src/exchangebindokframe.h | 4 +- src/exchangedeclareokframe.h | 4 +- src/exchangedeleteokframe.h | 4 +- src/exchangeunbindokframe.h | 4 +- src/frame.h | 6 ++ src/queuebindokframe.h | 4 +- src/queuedeclareokframe.h | 4 +- src/queuedeleteokframe.h | 4 +- src/queuepurgeokframe.h | 4 +- src/queueunbindokframe.h | 4 +- src/transactioncommitokframe.h | 4 +- src/transactionrollbackokframe.h | 4 +- src/transactionselectokframe.h | 4 +- 32 files changed, 343 insertions(+), 196 deletions(-) diff --git a/include/channel.h b/include/channel.h index 0473c5e..83ecfcf 100644 --- a/include/channel.h +++ b/include/channel.h @@ -20,19 +20,23 @@ private: * The implementation for the channel * @var ChannelImpl */ - ChannelImpl _implementation; + std::shared_ptr _implementation; public: /** * Construct a channel object * @param connection */ - Channel(Connection *connection) : _implementation(this, connection) {} + Channel(Connection *connection) : _implementation(ChannelImpl::instantiate(this, connection)) {} /** * Destructor */ - virtual ~Channel() {} + virtual ~Channel() + { + // close the channel (this will eventually destruct the channel) + _implementation->close(); + } /** * Callback that is called when the channel was succesfully created. @@ -45,7 +49,7 @@ public: void onReady(const SuccessCallback &callback) { // store callback in implementation - _implementation._readyCallback = callback; + _implementation->_readyCallback = callback; } /** @@ -59,7 +63,7 @@ public: void onError(const ErrorCallback &callback) { // store callback in implementation - _implementation._errorCallback = callback; + _implementation->_errorCallback = callback; } /** @@ -72,7 +76,7 @@ public: */ Deferred &pause() { - return _implementation.pause(); + return _implementation->pause(); } /** @@ -85,7 +89,7 @@ public: */ Deferred &resume() { - return _implementation.resume(); + return _implementation->resume(); } /** @@ -94,7 +98,7 @@ public: */ bool connected() { - return _implementation.connected(); + return _implementation->connected(); } /** @@ -105,7 +109,7 @@ public: */ Deferred &startTransaction() { - return _implementation.startTransaction(); + return _implementation->startTransaction(); } /** @@ -116,7 +120,7 @@ public: */ Deferred &commitTransaction() { - return _implementation.commitTransaction(); + return _implementation->commitTransaction(); } /** @@ -127,7 +131,7 @@ public: */ Deferred &rollbackTransaction() { - return _implementation.rollbackTransaction(); + return _implementation->rollbackTransaction(); } /** @@ -149,12 +153,12 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); } - Deferred &declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); } - Deferred &declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); } - Deferred &declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); } - Deferred &declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); } - Deferred &declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); } + Deferred &declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation->declareExchange(name, type, flags, arguments); } + Deferred &declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation->declareExchange(name, type, 0, arguments); } + Deferred &declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation->declareExchange(name, type, flags, Table()); } + Deferred &declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation->declareExchange(std::string(), type, flags, arguments); } + Deferred &declareExchange(ExchangeType type, const Table &arguments) { return _implementation->declareExchange(std::string(), type, 0, arguments); } + Deferred &declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation->declareExchange(std::string(), type, flags, Table()); } /** * Remove an exchange @@ -169,7 +173,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); } + Deferred &removeExchange(const std::string &name, int flags = 0) { return _implementation->removeExchange(name, flags); } /** * Bind two exchanges to each other @@ -182,8 +186,8 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, arguments); } - Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey) { return _implementation.bindExchange(source, target, routingkey, Table()); } + Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation->bindExchange(source, target, routingkey, arguments); } + Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey) { return _implementation->bindExchange(source, target, routingkey, Table()); } /** * Unbind two exchanges from one another @@ -196,8 +200,8 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, arguments); } - Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey) { return _implementation.unbindExchange(target, source, routingkey, Table()); } + Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation->unbindExchange(target, source, routingkey, arguments); } + Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey) { return _implementation->unbindExchange(target, source, routingkey, Table()); } /** * Declare a queue @@ -228,12 +232,12 @@ public: * * }); */ - DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); } - DeferredQueue &declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); } - DeferredQueue &declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); } - DeferredQueue &declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); } - DeferredQueue &declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); } - DeferredQueue &declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); } + DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation->declareQueue(name, flags, arguments); } + DeferredQueue &declareQueue(const std::string &name, const Table &arguments) { return _implementation->declareQueue(name, 0, arguments); } + DeferredQueue &declareQueue(const std::string &name, int flags = 0) { return _implementation->declareQueue(name, flags, Table()); } + DeferredQueue &declareQueue(int flags, const Table &arguments) { return _implementation->declareQueue(std::string(), flags, arguments); } + DeferredQueue &declareQueue(const Table &arguments) { return _implementation->declareQueue(std::string(), 0, arguments); } + DeferredQueue &declareQueue(int flags = 0) { return _implementation->declareQueue(std::string(), flags, Table()); } /** * Bind a queue to an exchange @@ -246,8 +250,8 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, arguments); } - Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.bindQueue(exchange, queue, routingkey, Table()); } + Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation->bindQueue(exchange, queue, routingkey, arguments); } + Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation->bindQueue(exchange, queue, routingkey, Table()); } /** * Unbind a queue from an exchange @@ -259,8 +263,8 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); } - Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); } + Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation->unbindQueue(exchange, queue, routingkey, arguments); } + Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation->unbindQueue(exchange, queue, routingkey, Table()); } /** * Purge a queue @@ -280,7 +284,7 @@ public: * * }); */ - DeferredDelete &purgeQueue(const std::string &name){ return _implementation.purgeQueue(name); } + DeferredDelete &purgeQueue(const std::string &name){ return _implementation->purgeQueue(name); } /** * Remove a queue @@ -306,7 +310,7 @@ public: * * }); */ - DeferredDelete &removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); } + DeferredDelete &removeQueue(const std::string &name, int flags = 0) { return _implementation->removeQueue(name, flags); } /** * Publish a message to an exchange @@ -317,10 +321,10 @@ public: * @param message the message to send * @param size size of the message */ - bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, envelope); } - bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, Envelope(message)); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, Envelope(message, size)); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation.publish(exchange, routingKey, Envelope(message, strlen(message))); } + bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); } + bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message)); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); } /** * Set the Quality of Service (QOS) for this channel @@ -337,7 +341,7 @@ public: */ Deferred &setQos(uint16_t prefetchCount, bool global = false) { - return _implementation.setQos(prefetchCount, global); + return _implementation->setQos(prefetchCount, global); } /** @@ -373,12 +377,12 @@ public: * * }); */ - DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation.consume(queue, tag, flags, arguments); } - DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation.consume(queue, tag, flags, Table()); } - DeferredConsumer &consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation.consume(queue, tag, 0, arguments); } - DeferredConsumer &consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); } - DeferredConsumer &consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); } - DeferredConsumer &consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); } + DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation->consume(queue, tag, flags, arguments); } + DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation->consume(queue, tag, flags, Table()); } + DeferredConsumer &consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation->consume(queue, tag, 0, arguments); } + DeferredConsumer &consume(const std::string &queue, int flags, const Table &arguments) { return _implementation->consume(queue, std::string(), flags, arguments); } + DeferredConsumer &consume(const std::string &queue, int flags = 0) { return _implementation->consume(queue, std::string(), flags, Table()); } + DeferredConsumer &consume(const std::string &queue, const Table &arguments) { return _implementation->consume(queue, std::string(), 0, arguments); } /** * Cancel a running consume call @@ -400,7 +404,7 @@ public: * * }); */ - DeferredCancel &cancel(const std::string &tag) { return _implementation.cancel(tag); } + DeferredCancel &cancel(const std::string &tag) { return _implementation->cancel(tag); } /** * Retrieve a single message from RabbitMQ @@ -434,7 +438,7 @@ public: * * }); */ - DeferredGet &get(const std::string &queue, int flags = 0) { return _implementation.get(queue, flags); } + DeferredGet &get(const std::string &queue, int flags = 0) { return _implementation->get(queue, flags); } /** * Acknoldge a received message @@ -452,7 +456,7 @@ public: * @param flags optional flags * @return bool */ - bool ack(uint64_t deliveryTag, int flags=0) { return _implementation.ack(deliveryTag, flags); } + bool ack(uint64_t deliveryTag, int flags=0) { return _implementation->ack(deliveryTag, flags); } /** * Reject or nack a message @@ -470,7 +474,7 @@ public: * @param flags optional flags * @return bool */ - bool reject(uint64_t deliveryTag, int flags=0) { return _implementation.reject(deliveryTag, flags); } + bool reject(uint64_t deliveryTag, int flags=0) { return _implementation->reject(deliveryTag, flags); } /** * Recover all messages that were not yet acked @@ -487,7 +491,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &recover(int flags = 0) { return _implementation.recover(flags); } + Deferred &recover(int flags = 0) { return _implementation->recover(flags); } /** * Close the current channel @@ -495,7 +499,7 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &close() { return _implementation.close(); } + Deferred &close() { return _implementation->close(); } /** * Get the channel we're working on @@ -503,7 +507,7 @@ public: */ const uint16_t id() const { - return _implementation.id(); + return _implementation->id(); } }; diff --git a/include/channelimpl.h b/include/channelimpl.h index cf8bfd0..3153478 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -22,7 +22,7 @@ class ConsumedMessage; /** * Class definition */ -class ChannelImpl : public Watchable +class ChannelImpl : public Watchable, public std::enable_shared_from_this { private: /** @@ -91,11 +91,14 @@ private: * * We store the data as well as whether they * should be handled synchronously. + * + * @var std::queue */ std::queue> _queue; /** * Are we currently operating in synchronous mode? + * @var bool */ bool _synchronous = false; @@ -106,16 +109,11 @@ private: ConsumedMessage *_message = nullptr; /** - * Construct a channel object - * - * Note that the constructor is private, and that the Channel class is - * a friend. By doing this we ensure that nobody can instantiate this - * object, and that it can thus only be used inside the library. - * - * @param parent the public channel object + * Constructor to make a shared pointer + * @param parent the publis channel object * @param connection pointer to the connection */ - ChannelImpl(Channel *parent, Connection *connection); + static std::shared_ptr instantiate(Channel *parent, Connection *connection); /** * Push a deferred result @@ -131,6 +129,18 @@ private: */ Deferred &push(const Frame &frame); +protected: + /** + * Construct a channel object + * + * Note that the constructor is private, and that the Channel class is + * a friend. By doing this we ensure that nobody can instantiate this + * object, and that it can thus only be used inside the library. + * + * @param parent the public channel object + * @param connection pointer to the connection + */ + ChannelImpl(Channel *parent, Connection *connection); public: /** @@ -475,11 +485,19 @@ public: bool send(const Frame &frame); /** - * Signal the channel that a synchronous operation - * was completed. After this operation, waiting - * frames can be sent out. + * Is this channel waiting for an answer before it can send furher instructions + * @return bool */ - void synchronized(); + bool waiting() const + { + return _synchronous; + } + + /** + * Signal the channel that a synchronous operation was completed. + * After this operation, waiting frames can be sent out. + */ + void onSynchronized(); /** * Report to the handler that the channel is opened @@ -493,36 +511,48 @@ public: if (_readyCallback) _readyCallback(); // if the monitor is still valid, we exit synchronous mode now - if (monitor.valid()) synchronized(); + if (monitor.valid()) onSynchronized(); } /** * Report to the handler that the channel is closed * * Returns whether the channel object is still valid + * + * @return bool */ bool reportClosed() { // change state _state = state_closed; + _synchronous = false; + + // create a monitor, because the callbacks could destruct the current object + Monitor monitor(this); // and pass on to the reportSuccess() method which will call the // appropriate deferred object to report the successful operation - return reportSuccess(); - - // technically, we should exit synchronous method now - // since the synchronous channel close frame has been - // acknowledged by the server. - // - // but since the channel was just closed, there is no - // real point in doing this, as we cannot send frames - // out anymore. + bool result = reportSuccess(); + + // leap out if object no longer exists + if (!monitor.valid()) return result; + + // all later deferred objects should report an error, because it + // was not possible to complete the instruction as the channel is + // now closed + reportError("Channel has been closed", false); + + // done + return result; } /** * Report success * * Returns whether the channel object is still valid + * + * @param mixed + * @return bool */ template bool reportSuccess(Arguments ...parameters) @@ -558,54 +588,7 @@ public: * @param message the error message * @param notifyhandler should the channel-wide handler also be called? */ - void reportError(const char *message, bool notifyhandler = true) - { - // change state - _state = state_closed; - - // we are going to call callbacks that could destruct the channel - Monitor monitor(this); - - // call the oldest - if (_oldestCallback) - { - // copy the callback (so that it can not be destructed during - // the "reportError" call - auto cb = _oldestCallback; - - // call the callback - auto *next = cb->reportError(message); - - // leap out if channel no longer exists - if (!monitor.valid()) return; - - // set the oldest callback - _oldestCallback.reset(next); - } - - // clean up all deferred other objects - while (_oldestCallback) - { - // copy the callback (so that it can not be destructed during - // the "reportError" call - auto cb = _oldestCallback; - - // call the callback - auto *next = cb->reportError("Channel is in error state"); - - // leap out if channel no longer exists - if (!monitor.valid()) return; - - // set the oldest callback - _oldestCallback.reset(next); - } - - // all callbacks have been processed, so we also can reset the pointer to the newest - _newestCallback = nullptr; - - // inform handler - if (notifyhandler && _errorCallback) _errorCallback(message); - } + void reportError(const char *message, bool notifyhandler = true); /** * Install a consumer callback @@ -657,7 +640,6 @@ public: * The channel class is its friend, thus can it instantiate this object */ friend class Channel; - }; /** diff --git a/include/connectionimpl.h b/include/connectionimpl.h index b1e31c2..75a3ce4 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -55,7 +55,7 @@ protected: * All channels that are active * @var map */ - std::map _channels; + std::map> _channels; /** * The last unused channel ID @@ -100,6 +100,13 @@ protected: */ bool sendClose(); + /** + * Is any channel waiting for an answer on a synchronous call? + * @return bool + */ + bool waiting() const; + + private: /** * Construct an AMQP object based on full login data @@ -210,13 +217,13 @@ public: * @param channel * @return uint16_t */ - uint16_t add(ChannelImpl *channel); + uint16_t add(const std::shared_ptr &channel); /** * Remove a channel * @param channel */ - void remove(ChannelImpl *channel); + void remove(const ChannelImpl *channel); /** * Parse the buffer into a recognized frame @@ -269,7 +276,7 @@ public: * @param number channel identifier * @return channel the channel object, or nullptr if not yet created */ - ChannelImpl *channel(int number) + std::shared_ptr channel(int number) { auto iter = _channels.find(number); return iter == _channels.end() ? nullptr : iter->second; diff --git a/src/basiccancelokframe.h b/src/basiccancelokframe.h index 6592166..192417f 100644 --- a/src/basiccancelokframe.h +++ b/src/basiccancelokframe.h @@ -88,13 +88,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; // report - if (channel->reportSuccess(consumerTag())) channel->synchronized(); + if (channel->reportSuccess(consumerTag())) channel->onSynchronized(); // done return true; diff --git a/src/basicconsumeokframe.h b/src/basicconsumeokframe.h index e767f0f..927fa38 100644 --- a/src/basicconsumeokframe.h +++ b/src/basicconsumeokframe.h @@ -88,13 +88,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; // report - if (channel->reportSuccess(consumerTag())) channel->synchronized(); + if (channel->reportSuccess(consumerTag())) channel->onSynchronized(); // done return true; diff --git a/src/basicdeliverframe.h b/src/basicdeliverframe.h index ac6128d..ec6cd78 100644 --- a/src/basicdeliverframe.h +++ b/src/basicdeliverframe.h @@ -175,7 +175,7 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; diff --git a/src/basicgetemptyframe.h b/src/basicgetemptyframe.h index 6ba5ace..656c2df 100644 --- a/src/basicgetemptyframe.h +++ b/src/basicgetemptyframe.h @@ -78,13 +78,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; // report - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index ebccacc..42d33c1 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -165,7 +165,7 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index f6cc056..6ed9518 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -109,7 +109,7 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; diff --git a/src/basicqosokframe.h b/src/basicqosokframe.h index 9c235e2..6698668 100644 --- a/src/basicqosokframe.h +++ b/src/basicqosokframe.h @@ -61,13 +61,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; // report - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/basicrecoverokframe.h b/src/basicrecoverokframe.h index 1bcd771..b1a8397 100644 --- a/src/basicrecoverokframe.h +++ b/src/basicrecoverokframe.h @@ -66,7 +66,7 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; diff --git a/src/bodyframe.h b/src/bodyframe.h index 15500a6..c07edde 100644 --- a/src/bodyframe.h +++ b/src/bodyframe.h @@ -100,7 +100,7 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; diff --git a/src/channelcloseframe.h b/src/channelcloseframe.h index 0d9c1d6..d47d88d 100644 --- a/src/channelcloseframe.h +++ b/src/channelcloseframe.h @@ -147,7 +147,7 @@ public: virtual bool process(ConnectionImpl *connection) override { // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // send back an ok frame connection->send(ChannelCloseOKFrame(this->channel())); diff --git a/src/channelcloseokframe.h b/src/channelcloseokframe.h index d06e845..1c8bd1e 100644 --- a/src/channelcloseokframe.h +++ b/src/channelcloseokframe.h @@ -66,13 +66,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; // report that the channel is closed - if (channel->reportClosed()) channel->synchronized(); + if (channel->reportClosed()) channel->onSynchronized(); // done return true; diff --git a/src/channelflowokframe.h b/src/channelflowokframe.h index c335564..2111253 100644 --- a/src/channelflowokframe.h +++ b/src/channelflowokframe.h @@ -87,13 +87,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if (!channel) return false; // report success for the call - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 8eb2e0f..1d74ff5 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -46,6 +46,37 @@ */ namespace AMQP { +/** + * Derived class with public constructor + * + * We need this because std::make_shared is not possible + */ +struct PublicChannelImpl : public ChannelImpl +{ + /** + * Constructor + * @param parent + * @param connection + */ + PublicChannelImpl(Channel *parent, Connection *connection) : ChannelImpl(parent, connection) {} + + /** + * Destructor + */ + virtual ~PublicChannelImpl() {} +}; + +/** + * Constructor to make a shared pointer + * @param parent the publis channel object + * @param connection pointer to the connection + */ +std::shared_ptr ChannelImpl::instantiate(Channel *parent, Connection *connection) +{ + // we can only use std::make_shared with a PublicChannelImpl + return std::make_shared(parent, connection); +} + /** * Construct a channel object * @param parent @@ -57,7 +88,7 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection) : _connection(&connection->_implementation) { // add the channel to the connection - _id = _connection->add(this); + _id = _connection->add(shared_from_this()); // check if the id is valid if (_id == 0) @@ -87,9 +118,6 @@ ChannelImpl::~ChannelImpl() // remove this channel from the connection (but not if the connection is already destructed) if (_connection) _connection->remove(this); - // close the channel now - close(); - // destruct deferred results while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next()); } @@ -195,6 +223,9 @@ Deferred &ChannelImpl::rollbackTransaction() */ Deferred &ChannelImpl::close() { + // this is completely pointless if not connected + if (_state != state_connected) return push(new Deferred(_state == state_closing)); + // send a channel close frame auto &handler = push(ChannelCloseFrame(_id)); @@ -643,7 +674,13 @@ Deferred &ChannelImpl::recover(int flags) bool ChannelImpl::send(const Frame &frame) { // skip if channel is not connected - if (_state != state_connected || !_connection) return false; + if (_state == state_closed || !_connection) return false; + + // if we're busy closing, we pretend that the send operation was a + // success. this causes the deferred object to be created, and to be + // added to the list of deferred objects. it will be notified about + // the error when the close operation succeeds + if (_state == state_closing) return true; // are we currently in synchronous mode or are there // other frames waiting for their turn to be sent? @@ -666,11 +703,10 @@ bool ChannelImpl::send(const Frame &frame) } /** - * Signal the channel that a synchronous operation - * was completed. After this operation, waiting - * frames can be sent out. + * Signal the channel that a synchronous operation was completed. After + * this operation, waiting frames can be sent out. */ -void ChannelImpl::synchronized() +void ChannelImpl::onSynchronized() { // we are no longer waiting for synchronous operations _synchronous = false; @@ -679,7 +715,7 @@ void ChannelImpl::synchronized() Monitor monitor(this); // send all frames while not in synchronous mode - while (monitor.valid() && !_synchronous && !_queue.empty()) + while (monitor.valid() && _connection && !_synchronous && !_queue.empty()) { // retrieve the first buffer and synchronous auto pair = std::move(_queue.front()); @@ -707,7 +743,7 @@ void ChannelImpl::reportMessage() Monitor monitor(this); // synchronize the channel if this comes from a basic.get frame - if (_message->consumer().empty()) synchronized(); + if (_message->consumer().empty()) onSynchronized(); // syncing the channel may destruct the channel if (!monitor.valid()) return; @@ -729,6 +765,73 @@ void ChannelImpl::reportMessage() delete _message; _message = nullptr; } +/** + * Report an error message on a channel + * @param message the error message + * @param notifyhandler should the channel-wide handler also be called? + */ +void ChannelImpl::reportError(const char *message, bool notifyhandler) +{ + // change state + _state = state_closed; + _synchronous = false; + + // the queue of messages that still have to sent can be emptied now + // (we do this by moving the current queue into an unused variable) + auto queue(std::move(_queue)); + + // we are going to call callbacks that could destruct the channel + Monitor monitor(this); + + // call the oldest + if (_oldestCallback) + { + // copy the callback (so that it can not be destructed during + // the "reportError" call + auto cb = _oldestCallback; + + // call the callback + auto *next = cb->reportError(message); + + // leap out if channel no longer exists + if (!monitor.valid()) return; + + // set the oldest callback + _oldestCallback.reset(next); + } + + // clean up all deferred other objects + while (_oldestCallback) + { + // copy the callback (so that it can not be destructed during + // the "reportError" call + auto cb = _oldestCallback; + + // call the callback + auto *next = cb->reportError("Channel is in error state"); + + // leap out if channel no longer exists + if (!monitor.valid()) return; + + // set the oldest callback + _oldestCallback.reset(next); + } + + // all callbacks have been processed, so we also can reset the pointer to the newest + _newestCallback = nullptr; + + // inform handler + if (notifyhandler && _errorCallback) _errorCallback(message); + + // leap out if object no longer exists + if (!monitor.valid()) return; + + // the connection now longer has to know that this channel exists, + // because the channel ID is no longer in use + if (_connection) _connection->remove(this); +} + + /** * Create an incoming message from a consume call * @param frame diff --git a/src/channelopenokframe.h b/src/channelopenokframe.h index 44cd57a..6c93c10 100644 --- a/src/channelopenokframe.h +++ b/src/channelopenokframe.h @@ -71,7 +71,7 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; diff --git a/src/connectioncloseframe.h b/src/connectioncloseframe.h index 1c5059f..cd3eb71 100644 --- a/src/connectioncloseframe.h +++ b/src/connectioncloseframe.h @@ -139,6 +139,15 @@ public: return _failingMethod; } + /** + * This frame is part of the shutdown operation + * @return bool + */ + virtual bool partOfShutdown() const override + { + return true; + } + /** * Process the frame * @param connection The connection over which it was received diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index 6d91c04..b4b23c8 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -54,7 +54,7 @@ ConnectionImpl::~ConnectionImpl() * @param channel * @return uint16_t */ -uint16_t ConnectionImpl::add(ChannelImpl *channel) +uint16_t ConnectionImpl::add(const std::shared_ptr &channel) { // check if we have exceeded the limit already if (_maxChannels > 0 && _channels.size() >= _maxChannels) return 0; @@ -80,7 +80,7 @@ uint16_t ConnectionImpl::add(ChannelImpl *channel) * Remove a channel * @param channel */ -void ConnectionImpl::remove(ChannelImpl *channel) +void ConnectionImpl::remove(const ChannelImpl *channel) { // skip zero channel if (channel->id() == 0) return; @@ -144,6 +144,13 @@ size_t ConnectionImpl::parse(const Buffer &buffer) return processed; } } + + // leap out if the connection object no longer exists + if (!monitor.valid() || !_closed || _state == state_connected) return processed; + + // the close() function was called, but if the close frame was not yet sent + // if there are no waiting channels, we can do that right now + if (!waiting()) sendClose(); // done return processed; @@ -162,8 +169,27 @@ bool ConnectionImpl::close() // mark that the object is closed _closed = true; + // after the send operation the object could be dead + Monitor monitor(this); + + // number of channels that is waiting for an answer and that has further data + int waiters = 0; + + // loop over all channels, and close them + for (auto iter = _channels.begin(); iter != _channels.end(); iter++) + { + // close the channel + iter->second->close(); + + // we could be dead now + if (!monitor.valid()) return true; + + // is this channel waiting for an answer + if (iter->second->waiting()) waiters++; + } + // if still busy with handshake, we delay closing for a while - if (_state == state_handshake || _state == state_protocol) return true; + if (waiters > 0 || _state == state_handshake || _state == state_protocol) return true; // perform the close operation sendClose(); @@ -182,16 +208,6 @@ bool ConnectionImpl::sendClose() // after the send operation the object could be dead Monitor monitor(this); - // loop over all channels - for (auto iter = _channels.begin(); iter != _channels.end(); iter++) - { - // close the channel - iter->second->close(); - - // we could be dead now - if (!monitor.valid()) return false; - } - // send the close frame send(ConnectionCloseFrame(0, "shutdown")); @@ -216,7 +232,7 @@ void ConnectionImpl::setConnected() // if the close method was called before, the frame was not // sent. append it to the end of the queue to make sure we // are correctly closed down. - if (_closed && !sendClose()) return; + if (_closed && !waiting() && !sendClose()) return; // we're going to call the handler, which can destruct the connection, // so we must monitor if the queue object is still valid after calling @@ -239,6 +255,23 @@ void ConnectionImpl::setConnected() } } +/** + * Is any channel waiting for an answer on a synchronous call? + * @return bool + */ +bool ConnectionImpl::waiting() const +{ + // loop through the channels + for (auto &iter : _channels) + { + // is this a waiting channel + if (iter.second->waiting()) return true; + } + + // no waiting channel found + return false; +} + /** * Send a frame over the connection * @param frame The frame to send @@ -249,6 +282,9 @@ bool ConnectionImpl::send(const Frame &frame) // its not possible to send anything if closed or closing down if (_state == state_closing || _state == state_closed) return false; + // some frames can be sent _after_ the close() function was called + if (_closed && !frame.partOfShutdown()) return false; + // we need an output buffer OutBuffer buffer(frame.buffer()); diff --git a/src/exchangebindokframe.h b/src/exchangebindokframe.h index 157d9e4..6116eca 100644 --- a/src/exchangebindokframe.h +++ b/src/exchangebindokframe.h @@ -61,13 +61,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report to handler - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/exchangedeclareokframe.h b/src/exchangedeclareokframe.h index 1938f73..8334757 100644 --- a/src/exchangedeclareokframe.h +++ b/src/exchangedeclareokframe.h @@ -64,13 +64,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report exchange declare ok - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/exchangedeleteokframe.h b/src/exchangedeleteokframe.h index b8ec41b..7f6dba0 100644 --- a/src/exchangedeleteokframe.h +++ b/src/exchangedeleteokframe.h @@ -65,13 +65,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report to handler - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/exchangeunbindokframe.h b/src/exchangeunbindokframe.h index d25e2a1..511b8fe 100644 --- a/src/exchangeunbindokframe.h +++ b/src/exchangeunbindokframe.h @@ -62,13 +62,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report to handler - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/frame.h b/src/frame.h index 2522e7c..372ed77 100644 --- a/src/frame.h +++ b/src/frame.h @@ -48,6 +48,12 @@ public: */ virtual bool partOfHandshake() const { return false; } + /** + * Is this a frame that is part of the connection close operation? + * @return bool + */ + virtual bool partOfShutdown() const { return false; } + /** * Does this frame need an end-of-frame seperator? * @return bool diff --git a/src/queuebindokframe.h b/src/queuebindokframe.h index bf27b87..b80b11b 100644 --- a/src/queuebindokframe.h +++ b/src/queuebindokframe.h @@ -63,13 +63,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report to handler - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/queuedeclareokframe.h b/src/queuedeclareokframe.h index 852c4ec..44821a5 100644 --- a/src/queuedeclareokframe.h +++ b/src/queuedeclareokframe.h @@ -127,13 +127,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // what if channel doesn't exist? if (!channel) return false; // report success - if (channel->reportSuccess(name(), messageCount(), consumerCount())) channel->synchronized(); + if (channel->reportSuccess(name(), messageCount(), consumerCount())) channel->onSynchronized(); // done return true; diff --git a/src/queuedeleteokframe.h b/src/queuedeleteokframe.h index c214aa3..e8bc711 100644 --- a/src/queuedeleteokframe.h +++ b/src/queuedeleteokframe.h @@ -88,13 +88,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report queue deletion success - if (channel->reportSuccess(this->messageCount())) channel->synchronized(); + if (channel->reportSuccess(this->messageCount())) channel->onSynchronized(); // done return true; diff --git a/src/queuepurgeokframe.h b/src/queuepurgeokframe.h index aaa939d..22cf119 100644 --- a/src/queuepurgeokframe.h +++ b/src/queuepurgeokframe.h @@ -88,13 +88,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report queue purge success - if (channel->reportSuccess(this->messageCount())) channel->synchronized(); + if (channel->reportSuccess(this->messageCount())) channel->onSynchronized(); // done return true; diff --git a/src/queueunbindokframe.h b/src/queueunbindokframe.h index cdcb570..893908d 100644 --- a/src/queueunbindokframe.h +++ b/src/queueunbindokframe.h @@ -67,13 +67,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // check if we have a channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report queue unbind success - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/transactioncommitokframe.h b/src/transactioncommitokframe.h index 3993a8f..a02857d 100644 --- a/src/transactioncommitokframe.h +++ b/src/transactioncommitokframe.h @@ -68,13 +68,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report that the channel is open - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/transactionrollbackokframe.h b/src/transactionrollbackokframe.h index 868626b..d04532f 100644 --- a/src/transactionrollbackokframe.h +++ b/src/transactionrollbackokframe.h @@ -68,13 +68,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report that the channel is open - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true; diff --git a/src/transactionselectokframe.h b/src/transactionselectokframe.h index b66aede..f83e803 100644 --- a/src/transactionselectokframe.h +++ b/src/transactionselectokframe.h @@ -68,13 +68,13 @@ public: virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); + auto channel = connection->channel(this->channel()); // channel does not exist if(!channel) return false; // report that the channel is open - if (channel->reportSuccess()) channel->synchronized(); + if (channel->reportSuccess()) channel->onSynchronized(); // done return true;