From a9570277b76830c060a5b51abf7cff63c2b1dbf2 Mon Sep 17 00:00:00 2001 From: Martijn Otto Date: Tue, 29 Apr 2014 15:51:33 +0200 Subject: [PATCH] Removed the nowait option from the public interface, because the deferred would never be called and implemented a queue to wait for synchronous methods to complete before sending the next frame --- include/channel.h | 53 ++----------- include/channelimpl.h | 99 ++++++++++++++++-------- include/connectionimpl.h | 7 ++ include/flags.h | 1 - include/outbuffer.h | 20 ++--- src/basicackframe.h | 25 +++++-- src/basiccancelframe.h | 16 +++- src/basiccancelokframe.h | 2 +- src/basicconsumeframe.h | 12 +++ src/basicconsumeokframe.h | 2 +- src/basicdeliverframe.h | 11 +++ src/basicpublishframe.h | 11 +++ src/basicqosokframe.h | 2 +- src/basicrecoverasyncframe.h | 11 +++ src/basicrecoverframe.h | 11 +++ src/basicrejectframe.h | 11 +++ src/basicreturnframe.h | 11 +++ src/channelcloseokframe.h | 12 +-- src/channelflowokframe.h | 2 +- src/channelimpl.cpp | 125 ++++++++++++++++++++----------- src/connectionimpl.cpp | 50 ++++++++----- src/exchangebindframe.h | 47 +++++++----- src/exchangebindokframe.h | 2 +- src/exchangedeclareframe.h | 12 +++ src/exchangedeclareokframe.h | 2 +- src/exchangedeleteframe.h | 12 +++ src/exchangedeleteokframe.h | 2 +- src/exchangeunbindframe.h | 49 +++++++----- src/exchangeunbindokframe.h | 2 +- src/frame.h | 51 ++++++++++--- src/methodframe.h | 8 ++ src/queuebindframe.h | 12 +++ src/queuebindokframe.h | 2 +- src/queuedeclareframe.h | 14 +++- src/queuedeclareokframe.h | 2 +- src/queuedeleteframe.h | 12 +++ src/queuedeleteokframe.h | 2 +- src/queuepurgeframe.h | 26 +++++-- src/queuepurgeokframe.h | 2 +- src/queueunbindokframe.h | 2 +- src/transactioncommitokframe.h | 2 +- src/transactionrollbackokframe.h | 2 +- src/transactionselectokframe.h | 2 +- 43 files changed, 524 insertions(+), 237 deletions(-) diff --git a/include/channel.h b/include/channel.h index 618a314..e722ebb 100644 --- a/include/channel.h +++ b/include/channel.h @@ -174,42 +174,30 @@ public: /** * Bind two exchanges to each other * - * The following flags can be used for the exchange - * - * - nowait do not wait on response - * * @param source the source exchange * @param target the target exchange * @param routingkey the routing key - * @param flags optional flags * @param arguments additional bind arguments * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); } - Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); } - Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); } + Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, arguments); } + Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey) { return _implementation.bindExchange(source, target, routingkey, Table()); } /** * Unbind two exchanges from one another * - * The following flags can be used for the exchange - * - * - nowait do not wait on response - * * @param target the target exchange * @param source the source exchange * @param routingkey the routing key - * @param flags optional flags * @param arguments additional unbind arguments * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); } - Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); } - Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); } + Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, arguments); } + Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey) { return _implementation.unbindExchange(target, source, routingkey, Table()); } /** * Declare a queue @@ -250,22 +238,16 @@ public: /** * Bind a queue to an exchange * - * The following flags can be used for the exchange - * - * - nowait do not wait on response - * * @param exchange the source exchange * @param queue the target queue * @param routingkey the routing key - * @param flags additional flags * @param arguments additional bind arguments * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); } - Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); } - Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); } + Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, arguments); } + Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.bindQueue(exchange, queue, routingkey, Table()); } /** * Unbind a queue from an exchange @@ -283,12 +265,7 @@ public: /** * Purge a queue * - * The following flags can be used for the exchange - * - * - nowait do not wait on response - * * @param name name of the queue - * @param flags additional flags * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. @@ -303,7 +280,7 @@ public: * * }); */ - DeferredDelete &purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); } + DeferredDelete &purgeQueue(const std::string &name){ return _implementation.purgeQueue(name); } /** * Remove a queue @@ -375,11 +352,6 @@ public: * - nolocal if set, messages published on this channel are not also consumed * - noack if set, consumed messages do not have to be acked, this happens automatically * - exclusive request exclusive access, only this consumer can access the queue - * - nowait the server does not have to send a response back that consuming is active - * - * The method Deferred::onSuccess() will be called when the - * consumer has started (unless the nowait option was set, in which case - * no confirmation method is called) * * @param queue the queue from which you want to consume * @param tag a consumer tag that will be associated with this consume operation @@ -411,16 +383,7 @@ public: * * If you want to stop a running consumer, you can use this method with the consumer tag * - * The following flags are supported: - * - * - nowait the server does not have to send a response back that the consumer has been cancelled - * - * The method Deferred::onSuccess() will be called when the consumer - * was succesfully stopped (unless the nowait option was used, in which case no - * confirmation method is called) - * * @param tag the consumer tag - * @param flags optional additional flags * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. @@ -435,7 +398,7 @@ public: * * }); */ - DeferredCancel &cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } + DeferredCancel &cancel(const std::string &tag) { return _implementation.cancel(tag); } /** * Acknoldge a received message diff --git a/include/channelimpl.h b/include/channelimpl.h index 846c1b8..062c160 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -58,18 +58,18 @@ private: /** * Pointer to the oldest deferred result (the first one that is going * to be executed) - * + * * @var Deferred */ std::unique_ptr _oldestCallback = nullptr; - + /** * Pointer to the newest deferred result (the last one to be added). - * + * * @var Deferred */ Deferred *_newestCallback = nullptr; - + /** * The channel number * @var uint16_t @@ -86,6 +86,19 @@ private: state_closed } _state = state_connected; + /** + * The frames that still need to be send out + * + * We store the data as well as whether they + * should be handled synchronously. + */ + std::queue> _queue; + + /** + * Are we currently operating in synchronous mode? + */ + bool _synchronous = false; + /** * The message that is now being received * @var ConsumedMessage @@ -203,13 +216,12 @@ public: * @param source exchange which binds to target * @param target exchange to bind to * @param routingKey routing key - * @param glags additional flags * @param arguments additional arguments for binding * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); + Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments); /** * unbind two exchanges @@ -217,13 +229,12 @@ public: * @param source the source exchange * @param target the target exchange * @param routingkey the routing key - * @param flags optional flags * @param arguments additional unbind arguments * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); + Deferred &unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments); /** * remove an exchange @@ -253,13 +264,12 @@ public: * @param exchangeName name of the exchange to bind to * @param queueName name of the queue * @param routingkey routingkey - * @param flags additional flags * @param arguments additional arguments * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments); + Deferred &bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments); /** * Unbind a queue from an exchange @@ -277,7 +287,6 @@ public: /** * Purge a queue * @param queue queue to purge - * @param flags additional flags * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. @@ -292,7 +301,7 @@ public: * * }); */ - DeferredDelete &purgeQueue(const std::string &name, int flags); + DeferredDelete &purgeQueue(const std::string &name); /** * Remove a queue @@ -363,7 +372,6 @@ public: /** * Cancel a running consumer * @param tag the consumer tag - * @param flags optional flags * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. @@ -378,7 +386,7 @@ public: * * }); */ - DeferredCancel &cancel(const std::string &tag, int flags); + DeferredCancel &cancel(const std::string &tag); /** * Acknowledge a message @@ -429,54 +437,79 @@ public: */ bool send(const Frame &frame); + /** + * Signal the channel that a synchronous operation + * was completed. After this operation, waiting + * frames can be sent out. + */ + void synchronized(); + /** * Report to the handler that the channel is opened */ void reportReady() { + // callbacks could destroy us, so monitor it + Monitor monitor(this); + // inform handler if (_readyCallback) _readyCallback(); + + // if the monitor is still valid, we exit synchronous mode now + if (monitor.valid()) synchronized(); } /** * Report to the handler that the channel is closed + * + * Returns whether the channel object is still valid */ - void reportClosed() + bool reportClosed() { // change state _state = state_closed; // and pass on to the reportSuccess() method which will call the // appropriate deferred object to report the successful operation - reportSuccess(); + return reportSuccess(); + + // technically, we should exit synchronous method now + // since the synchronous channel close frame has been + // acknowledged by the server. + // + // but since the channel was just closed, there is no + // real point in doing this, as we cannot send frames + // out anymore. } /** * Report success * - * This function is called to report success for all - * cases where the callback does not receive any parameters + * Returns whether the channel object is still valid */ template - void reportSuccess(Arguments ...parameters) + bool reportSuccess(Arguments ...parameters) { // skip if there is no oldest callback - if (!_oldestCallback) return; - + if (!_oldestCallback) return true; + // we are going to call callbacks that could destruct the channel Monitor monitor(this); - + // call the callback auto *next = _oldestCallback->reportSuccess(std::forward(parameters)...); - + // leap out if channel no longer exists - if (!monitor.valid()) return; - + if (!monitor.valid()) return false; + // set the oldest callback _oldestCallback.reset(next); - + // if there was no next callback, the newest callback was just used if (!next) _newestCallback = nullptr; + + // we are still valid + return true; } /** @@ -497,30 +530,30 @@ public: { // call the callback auto *next = _oldestCallback->reportError(message); - + // leap out if channel no longer exists if (!monitor.valid()) return; - + // set the oldest callback _oldestCallback.reset(next); } - + // clean up all deferred other objects while (_oldestCallback) { // call the callback auto *next = _oldestCallback->reportError("Channel is in error state"); - + // leap out if channel no longer exists if (!monitor.valid()) return; - + // set the oldest callback _oldestCallback.reset(next); } // all callbacks have been processed, so we also can reset the pointer to the newest _newestCallback = nullptr; - + // inform handler if (notifyhandler && _errorCallback) _errorCallback(message); } @@ -534,7 +567,7 @@ public: { // install the callback if it is assigned if (callback) _consumers[consumertag] = callback; - + // otherwise we erase the previously set callback else _consumers.erase(consumertag); } diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 836d3a0..530888a 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -253,6 +253,13 @@ public: */ bool send(const Frame &frame); + /** + * Send buffered data over the connection + * + * @param buffer the buffer with data to send + */ + bool send(OutBuffer &&buffer); + /** * Get a channel by its identifier * diff --git a/include/flags.h b/include/flags.h index 5f46b52..c2de72f 100644 --- a/include/flags.h +++ b/include/flags.h @@ -26,7 +26,6 @@ extern const int global; extern const int nolocal; extern const int noack; extern const int exclusive; -extern const int nowait; extern const int mandatory; extern const int immediate; extern const int redelivered; diff --git a/include/outbuffer.h b/include/outbuffer.h index 24833fa..54d5988 100644 --- a/include/outbuffer.h +++ b/include/outbuffer.h @@ -24,7 +24,7 @@ private: * @var char* */ char *_buffer; - + /** * Pointer to the buffer to be filled * @var char* @@ -36,13 +36,13 @@ private: * @var size_t */ size_t _size; - + /** * The total capacity of the out buffer * @var size_t */ size_t _capacity; - + public: /** @@ -56,7 +56,7 @@ public: _capacity = capacity; _buffer = _current = new char[capacity]; } - + /** * Copy constructor * @param that @@ -68,11 +68,11 @@ public: _capacity = that._capacity; _buffer = new char[_capacity]; _current = _buffer + _size; - + // copy memory memcpy(_buffer, that._buffer, _size); } - + /** * Move constructor * @param that @@ -84,7 +84,7 @@ public: _capacity = that._capacity; _buffer = that._buffer; _current = that._current; - + // reset the other object that._size = 0; that._capacity = 0; @@ -95,7 +95,7 @@ public: /** * Destructor */ - virtual ~OutBuffer() + virtual ~OutBuffer() { if (_buffer) delete[] _buffer; } @@ -104,7 +104,7 @@ public: * Get access to the internal buffer * @return const char* */ - const char *data() + const char *data() const { return _buffer; } @@ -113,7 +113,7 @@ public: * Current size of the output buffer * @return size_t */ - size_t size() + size_t size() const { return _size; } diff --git a/src/basicackframe.h b/src/basicackframe.h index 8337b9f..7549155 100644 --- a/src/basicackframe.h +++ b/src/basicackframe.h @@ -1,6 +1,6 @@ /** * Class describing a basic acknowledgement frame - * + * * @copyright 2014 Copernica BV */ @@ -38,10 +38,10 @@ protected: { // call base BasicFrame::fill(buffer); - + // add the delivery tag buffer.add(_deliveryTag); - + // add the booleans _multiple.fill(buffer); } @@ -54,25 +54,36 @@ public: * @param deliveryTag server-assigned and channel specific delivery tag * @param multiple acknowledge mutiple messages */ - BasicAckFrame(uint16_t channel, uint64_t deliveryTag, bool multiple = false) : + BasicAckFrame(uint16_t channel, uint64_t deliveryTag, bool multiple = false) : BasicFrame(channel, 9), _deliveryTag(deliveryTag), _multiple(multiple) {} - + /** * Construct based on received frame * @param frame */ - BasicAckFrame(ReceivedFrame &frame) : + BasicAckFrame(ReceivedFrame &frame) : BasicFrame(frame), _deliveryTag(frame.nextUint64()), _multiple(frame) {} - + /** * Destructor */ virtual ~BasicAckFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + virtual bool synchronous() const override + { + return false; + } + /** * Return the method ID * @return uint16_t diff --git a/src/basiccancelframe.h b/src/basiccancelframe.h index ea65d3e..e665ab2 100644 --- a/src/basiccancelframe.h +++ b/src/basiccancelframe.h @@ -67,7 +67,19 @@ public: * Destructor */ virtual ~BasicCancelFrame() {} - + + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous when the nowait option is not used + return !noWait(); + } + /** * Return the consumertag, which is specified by the client or provided by the server * @return string @@ -90,7 +102,7 @@ public: * Return whether to wait for a response * @return boolean */ - const bool noWait() + const bool noWait() const { return _noWait.get(0); } diff --git a/src/basiccancelokframe.h b/src/basiccancelokframe.h index 6d9d42f..6592166 100644 --- a/src/basiccancelokframe.h +++ b/src/basiccancelokframe.h @@ -94,7 +94,7 @@ public: if (!channel) return false; // report - channel->reportSuccess(consumerTag()); + if (channel->reportSuccess(consumerTag())) channel->synchronized(); // done return true; diff --git a/src/basicconsumeframe.h b/src/basicconsumeframe.h index 0bc7888..7e57eee 100644 --- a/src/basicconsumeframe.h +++ b/src/basicconsumeframe.h @@ -111,6 +111,18 @@ public: */ virtual ~BasicConsumeFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous when the nowait option is not set + return !noWait(); + } + /** * Return the method ID * @return uint16_t diff --git a/src/basicconsumeokframe.h b/src/basicconsumeokframe.h index 31c44ef..e767f0f 100644 --- a/src/basicconsumeokframe.h +++ b/src/basicconsumeokframe.h @@ -94,7 +94,7 @@ public: if (!channel) return false; // report - channel->reportSuccess(consumerTag()); + if (channel->reportSuccess(consumerTag())) channel->synchronized(); // done return true; diff --git a/src/basicdeliverframe.h b/src/basicdeliverframe.h index aef677f..ac6128d 100644 --- a/src/basicdeliverframe.h +++ b/src/basicdeliverframe.h @@ -120,6 +120,17 @@ public: return _routingKey; } + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + virtual bool synchronous() const override + { + return false; + } + /** * Return the method ID * @return uint16_t diff --git a/src/basicpublishframe.h b/src/basicpublishframe.h index 9f9eb69..975c2ab 100644 --- a/src/basicpublishframe.h +++ b/src/basicpublishframe.h @@ -94,6 +94,17 @@ public: */ virtual ~BasicPublishFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + return false; + } + /** * Return the name of the exchange to publish to * @return string diff --git a/src/basicqosokframe.h b/src/basicqosokframe.h index dc6e612..9c235e2 100644 --- a/src/basicqosokframe.h +++ b/src/basicqosokframe.h @@ -67,7 +67,7 @@ public: if (!channel) return false; // report - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/basicrecoverasyncframe.h b/src/basicrecoverasyncframe.h index 7d7e036..68eb3ec 100644 --- a/src/basicrecoverasyncframe.h +++ b/src/basicrecoverasyncframe.h @@ -62,6 +62,17 @@ public: */ virtual ~BasicRecoverAsyncFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + virtual bool synchronous() const override + { + return false; + } + /** * Return the method ID * @return uint16_t diff --git a/src/basicrecoverframe.h b/src/basicrecoverframe.h index d378393..ca67f16 100644 --- a/src/basicrecoverframe.h +++ b/src/basicrecoverframe.h @@ -62,6 +62,17 @@ public: */ virtual ~BasicRecoverFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + return false; + } + /** * Return the method ID * @return uint16_t diff --git a/src/basicrejectframe.h b/src/basicrejectframe.h index c5c1f6b..a77d7da 100644 --- a/src/basicrejectframe.h +++ b/src/basicrejectframe.h @@ -73,6 +73,17 @@ public: */ virtual ~BasicRejectFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + return false; + } + /** * Return the method ID * @return uint16_t diff --git a/src/basicreturnframe.h b/src/basicreturnframe.h index 6bb8aef..2fedce6 100644 --- a/src/basicreturnframe.h +++ b/src/basicreturnframe.h @@ -92,6 +92,17 @@ public: */ virtual ~BasicReturnFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + virtual bool synchronous() const override + { + return false; + } + /** * Return the name of the exchange to publish to * @return string diff --git a/src/channelcloseokframe.h b/src/channelcloseokframe.h index 5eb5cda..d06e845 100644 --- a/src/channelcloseokframe.h +++ b/src/channelcloseokframe.h @@ -1,6 +1,6 @@ /** * Class describing a channel close acknowledgement frame - * + * * @copyright 2014 Copernica BV */ @@ -67,13 +67,13 @@ public: { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist - if (!channel) return false; - + if (!channel) return false; + // report that the channel is closed - channel->reportClosed(); - + if (channel->reportClosed()) channel->synchronized(); + // done return true; } diff --git a/src/channelflowokframe.h b/src/channelflowokframe.h index fcd325f..c335564 100644 --- a/src/channelflowokframe.h +++ b/src/channelflowokframe.h @@ -93,7 +93,7 @@ public: if (!channel) return false; // report success for the call - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index c612232..0b917f4 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -85,7 +85,7 @@ ChannelImpl::~ChannelImpl() // close the channel now close(); - + // destruct deferred results while (_oldestCallback) _oldestCallback.reset(_oldestCallback->next()); } @@ -98,13 +98,13 @@ Deferred &ChannelImpl::push(Deferred *deferred) { // do we already have an oldest? if (!_oldestCallback) _oldestCallback.reset(deferred); - + // do we already have a newest? if (_newestCallback) _newestCallback->add(deferred); - + // store newest callback _newestCallback = deferred; - + // done return *deferred; } @@ -222,7 +222,7 @@ Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType typ if (type == ExchangeType::headers)exchangeType = "headers"; // send declare exchange frame - return push(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments)); + return push(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, false, arguments)); } /** @@ -231,16 +231,15 @@ Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType typ * @param source exchange which binds to target * @param target exchange to bind to * @param routingKey routing key - * @param flags additional flags * @param arguments additional arguments for binding * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) +Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { // send exchange bind frame - return push(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments)); + return push(ExchangeBindFrame(_id, target, source, routingkey, false, arguments)); } /** @@ -249,16 +248,15 @@ Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string * @param source the source exchange * @param target the target exchange * @param routingkey the routing key - * @param flags optional flags * @param arguments additional unbind arguments * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) +Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { // send exchange unbind frame - return push(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments)); + return push(ExchangeUnbindFrame(_id, target, source, routingkey, false, arguments)); } /** @@ -273,7 +271,7 @@ Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::stri Deferred &ChannelImpl::removeExchange(const std::string &name, int flags) { // send delete exchange frame - return push(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait)); + return push(ExchangeDeleteFrame(_id, name, flags & ifunused, false)); } /** @@ -288,14 +286,14 @@ Deferred &ChannelImpl::removeExchange(const std::string &name, int flags) DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments) { // the frame to send - QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments); - + QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, false, arguments); + // send the queuedeclareframe auto *result = new DeferredQueue(send(frame)); - + // add the deferred result push(result); - + // done return *result; } @@ -306,16 +304,15 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con * @param exchangeName name of the exchange to bind to * @param queueName name of the queue * @param routingkey routingkey - * @param flags additional flags * @param arguments additional arguments * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) +Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments) { // send the bind queue frame - return push(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments)); + return push(QueueBindFrame(_id, queueName, exchangeName, routingkey, false, arguments)); } /** @@ -338,7 +335,6 @@ Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::strin /** * Purge a queue * @param queue queue to purge - * @param flags additional flags * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. @@ -353,17 +349,17 @@ Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::strin * * }); */ -DeferredDelete &ChannelImpl::purgeQueue(const std::string &name, int flags) +DeferredDelete &ChannelImpl::purgeQueue(const std::string &name) { - // the frame to send - QueuePurgeFrame frame(_id, name, flags & nowait); - + // the frame to send + QueuePurgeFrame frame(_id, name, false); + // send the frame, and create deferred object auto *deferred = new DeferredDelete(send(frame)); - + // push to list push(deferred); - + // done return *deferred; } @@ -389,14 +385,14 @@ DeferredDelete &ChannelImpl::purgeQueue(const std::string &name, int flags) DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) { // the frame to send - QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, flags & nowait); + QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, false); // send the frame, and create deferred object auto *deferred = new DeferredDelete(send(frame)); - + // push to list push(deferred); - + // done return *deferred; } @@ -495,14 +491,14 @@ Deferred &ChannelImpl::setQos(uint16_t prefetchCount) DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { // the frame to send - BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments); - + BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, false, arguments); + // send the frame, and create deferred object auto *deferred = new DeferredConsumer(this, send(frame)); - + // push to list push(deferred); - + // done return *deferred; } @@ -510,7 +506,6 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri /** * Cancel a running consumer * @param tag the consumer tag - * @param flags optional flags * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. @@ -525,17 +520,17 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri * * }); */ -DeferredCancel &ChannelImpl::cancel(const std::string &tag, int flags) +DeferredCancel &ChannelImpl::cancel(const std::string &tag) { // the cancel frame to send - BasicCancelFrame frame(_id, tag, flags & nowait); + BasicCancelFrame frame(_id, tag, false); // send the frame, and create deferred object auto *deferred = new DeferredCancel(this, send(frame)); - + // push to list push(deferred); - + // done return *deferred; } @@ -587,10 +582,56 @@ bool ChannelImpl::send(const Frame &frame) // skip if channel is not connected if (_state != state_connected || !_connection) return false; + // are we currently in synchronous mode or are there + // other frames waiting for their turn to be sent? + if (_synchronous || !_queue.empty()) + { + // we need to wait until the synchronous frame has + // been processed, so queue the frame until it was + _queue.emplace(frame.synchronous(), frame.buffer()); + + // it was of course not actually sent but we pretend + // that it was, because no error occured + return true; + } + + // enter synchronous mode if necessary + _synchronous = frame.synchronous(); + // send to tcp connection return _connection->send(frame); } +/** + * Signal the channel that a synchronous operation + * was completed. After this operation, waiting + * frames can be sent out. + */ +void ChannelImpl::synchronized() +{ + // we are no longer waiting for synchronous operations + _synchronous = false; + + // we need to monitor the channel for validity + Monitor monitor(this); + + // send all frames while not in synchronous mode + while (monitor.valid() && !_synchronous && !_queue.empty()) + { + // retrieve the first buffer and synchronous + auto pair = std::move(_queue.front()); + + // remove from the list + _queue.pop(); + + // mark as synchronous if necessary + _synchronous = pair.first; + + // send it over the connection + _connection->send(std::move(pair.second)); + } +} + /** * Report the received message */ @@ -602,16 +643,16 @@ void ChannelImpl::reportMessage() // look for the consumer auto iter = _consumers.find(_message->consumer()); if (iter == _consumers.end()) return; - + // is this a valid callback method if (!iter->second) return; // after the report the channel may be destructed, monitor that Monitor monitor(this); - + // call the callback _message->report(iter->second); - + // skip if channel was destructed if (!monitor.valid()) return; @@ -628,7 +669,7 @@ ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame) { // destruct if message is already set if (_message) delete _message; - + // construct a message return _message = new ConsumedMessage(frame); } diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index 668fbb3..a26f903 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -213,9 +213,9 @@ void ConnectionImpl::setConnected() // store connected state _state = state_connected; - // if the close operation was already called, we do that again now again - // so that the actual messages to close down the connection and the channel - // are appended to the queue + // if the close method was called before, the frame was not + // sent. append it to the end of the queue to make sure we + // are correctly closed down. if (_closed && !sendClose()) return; // we're going to call the handler, which can destruct the connection, @@ -225,11 +225,8 @@ void ConnectionImpl::setConnected() // inform handler _handler->onConnected(_parent); - // leap out if the connection no longer exists - if (!monitor.valid()) return; - // empty the queue of messages - while (!_queue.empty()) + while (monitor.valid() && !_queue.empty()) { // get the next message OutBuffer buffer(std::move(_queue.front())); @@ -239,9 +236,6 @@ void ConnectionImpl::setConnected() // send it _handler->onData(_parent, buffer.data(), buffer.size()); - - // leap out if the connection was destructed - if (!monitor.valid()) return; } } @@ -256,16 +250,10 @@ bool ConnectionImpl::send(const Frame &frame) if (_state == state_closing || _state == state_closed) return false; // we need an output buffer - OutBuffer buffer(frame.totalSize()); - - // fill the buffer - frame.fill(buffer); - - // append an end of frame byte (but not when still negotiating the protocol) - if (frame.needsSeparator()) buffer.add((uint8_t)206); + OutBuffer buffer(frame.buffer()); // are we still setting up the connection? - if ((_state == state_connected && _queue.size() == 0) || frame.partOfHandshake()) + if ((_state == state_connected && _queue.empty()) || frame.partOfHandshake()) { // send the buffer _handler->onData(_parent, buffer.data(), buffer.size()); @@ -280,6 +268,32 @@ bool ConnectionImpl::send(const Frame &frame) return true; } +/** + * Send buffered data over the connection + * + * @param buffer the buffer with data to send + */ +bool ConnectionImpl::send(OutBuffer &&buffer) +{ + // this only works when we are already connected + if (_state != state_connected) return false; + + // are we waiting for other frames to be sent before us? + if (_queue.empty()) + { + // send it directly + _handler->onData(_parent, buffer.data(), buffer.size()); + } + else + { + // add to the list of waiting buffers + _queue.push(std::move(buffer)); + } + + // done + return true; +} + /** * End of namspace */ diff --git a/src/exchangebindframe.h b/src/exchangebindframe.h index e246fad..1d01161 100644 --- a/src/exchangebindframe.h +++ b/src/exchangebindframe.h @@ -1,6 +1,6 @@ /** * Exchangebindframe.h - * + * * @copyright 2014 Copernica BV */ @@ -26,25 +26,25 @@ private: * @var ShortString */ ShortString _destination; - + /** - * Exchange which is bound + * Exchange which is bound * @var ShortString */ ShortString _source; - + /** * Routing key * @var ShortString */ ShortString _routingKey; - + /** * contains: nowait do not wait on response * @var booleanset */ BooleanSet _bools; - + /** * Additional arguments * @var Table @@ -61,7 +61,7 @@ protected: { // call base ExchangeFrame::fill(buffer); - + buffer.add(_reserved); _destination.fill(buffer); _source.fill(buffer); @@ -70,13 +70,13 @@ protected: _arguments.fill(buffer); } -public: +public: /** * Constructor based on incoming data - * + * * @param frame received frame to decode */ - ExchangeBindFrame(ReceivedFrame &frame) : + ExchangeBindFrame(ReceivedFrame &frame) : ExchangeFrame(frame), _reserved(frame.nextUint16()), _destination(frame), @@ -102,8 +102,19 @@ public: _bools(noWait), _arguments(arguments) {} - - + + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous when the nowait option has not been set + return !noWait(); + } + /** * Get the destination exchange * @return string @@ -112,7 +123,7 @@ public: { return _destination; } - + /** * Get the source exchange * @return string @@ -121,7 +132,7 @@ public: { return _source; } - + /** * Get the routing key * @return string @@ -130,7 +141,7 @@ public: { return _routingKey; } - + /** * Get the method id * @return uint16_t @@ -139,7 +150,7 @@ public: { return 30; } - + /** * Get the additional arguments * @return Table @@ -148,12 +159,12 @@ public: { return _arguments; } - + /** * Get the nowait bool * @return bool */ - bool noWait() + bool noWait() const { return _bools.get(0); } diff --git a/src/exchangebindokframe.h b/src/exchangebindokframe.h index 5b1f29c..157d9e4 100644 --- a/src/exchangebindokframe.h +++ b/src/exchangebindokframe.h @@ -67,7 +67,7 @@ public: if(!channel) return false; // report to handler - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/exchangedeclareframe.h b/src/exchangedeclareframe.h index 9f36e64..aa940dd 100644 --- a/src/exchangedeclareframe.h +++ b/src/exchangedeclareframe.h @@ -106,6 +106,18 @@ public: */ virtual ~ExchangeDeclareFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous without the nowait option + return !noWait(); + } + /** * Method id * @return uint16_t diff --git a/src/exchangedeclareokframe.h b/src/exchangedeclareokframe.h index f86bf3e..1938f73 100644 --- a/src/exchangedeclareokframe.h +++ b/src/exchangedeclareokframe.h @@ -70,7 +70,7 @@ public: if(!channel) return false; // report exchange declare ok - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/exchangedeleteframe.h b/src/exchangedeleteframe.h index 960690c..cdd906f 100644 --- a/src/exchangedeleteframe.h +++ b/src/exchangedeleteframe.h @@ -83,6 +83,18 @@ public: */ virtual ~ExchangeDeleteFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous without the nowait option + return !noWait(); + } + /** * returns the method id * @return uint16_t diff --git a/src/exchangedeleteokframe.h b/src/exchangedeleteokframe.h index ce44336..b8ec41b 100644 --- a/src/exchangedeleteokframe.h +++ b/src/exchangedeleteokframe.h @@ -71,7 +71,7 @@ public: if(!channel) return false; // report to handler - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/exchangeunbindframe.h b/src/exchangeunbindframe.h index 64999d2..d92d941 100644 --- a/src/exchangeunbindframe.h +++ b/src/exchangeunbindframe.h @@ -8,7 +8,7 @@ * Set up namespace */ namespace AMQP { - + /** * Class definition */ @@ -26,31 +26,31 @@ private: * @var ShortString */ ShortString _destination; - + /** - * Exchange which is bound + * Exchange which is bound * @var ShortString */ ShortString _source; - + /** * Routing key * @var ShortString */ ShortString _routingKey; - + /** * contains: nowait do not wait on response * @var booleanset */ BooleanSet _bools; - + /** * Additional arguments * @var Table */ Table _arguments; - + protected: /** * Encode a frame on a string buffer @@ -61,7 +61,7 @@ protected: { // call base ExchangeFrame::fill(buffer); - + buffer.add(_reserved); _destination.fill(buffer); _source.fill(buffer); @@ -73,10 +73,10 @@ protected: public: /** * Constructor based on incoming data - * + * * @param frame received frame to decode */ - ExchangeUnbindFrame(ReceivedFrame &frame) : + ExchangeUnbindFrame(ReceivedFrame &frame) : ExchangeFrame(frame), _reserved(frame.nextUint16()), _destination(frame), @@ -102,8 +102,19 @@ public: _bools(noWait), _arguments(arguments) {} - - + + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous without the nowait option + return !noWait(); + } + /** * Get the destination exchange * @return string @@ -112,7 +123,7 @@ public: { return _destination; } - + /** * Get the source exchange * @return string @@ -121,7 +132,7 @@ public: { return _source; } - + /** * Get the routing key * @return string @@ -130,7 +141,7 @@ public: { return _routingKey; } - + /** * Get the method id * @return uint16_t @@ -139,7 +150,7 @@ public: { return 40; } - + /** * Get the additional arguments * @return Table @@ -148,16 +159,16 @@ public: { return _arguments; } - + /** * Get the nowait bool * @return bool */ - bool noWait() + bool noWait() const { return _bools.get(0); } - + }; // leave namespace } diff --git a/src/exchangeunbindokframe.h b/src/exchangeunbindokframe.h index ea256ab..d25e2a1 100644 --- a/src/exchangeunbindokframe.h +++ b/src/exchangeunbindokframe.h @@ -68,7 +68,7 @@ public: if(!channel) return false; // report to handler - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/frame.h b/src/frame.h index ed8ad0a..2522e7c 100644 --- a/src/frame.h +++ b/src/frame.h @@ -1,9 +1,9 @@ /** * Frame.h - * + * * Base class for frames. This base class can not be constructed from outside * the library, and is only used internally. - * + * * @copyright 2014 Copernica BV */ @@ -11,7 +11,7 @@ * Set up namespace */ namespace AMQP { - + /** * Class definition */ @@ -19,11 +19,11 @@ class Frame { protected: /** - * Protected constructor to ensure that no objects are created from + * Protected constructor to ensure that no objects are created from * outside the library */ Frame() {} - + public: /** * Destructor @@ -35,40 +35,67 @@ public: * @return uint32_t */ virtual uint32_t totalSize() const = 0; - + /** * Fill an output buffer * @param buffer */ virtual void fill(OutBuffer &buffer) const = 0; - + /** * Is this a frame that is part of the connection setup? * @return bool */ virtual bool partOfHandshake() const { return false; } - + /** * Does this frame need an end-of-frame seperator? * @return bool */ virtual bool needsSeparator() const { return true; } - + + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + virtual bool synchronous() const { return false; } + + /** + * Retrieve the buffer in AMQP wire-format for + * sending over the socket connection + */ + OutBuffer buffer() const + { + // we need an output buffer + OutBuffer buffer(totalSize()); + + // fill the buffer + fill(buffer); + + // append an end of frame byte (but not when still negotiating the protocol) + if (needsSeparator()) buffer.add((uint8_t)206); + + // return the created buffer + return buffer; + } + /** * Process the frame * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) + virtual bool process(ConnectionImpl *connection) { // this is an exception throw ProtocolException("unimplemented frame"); - + // unreachable return false; } }; - + /** * End of namespace */ diff --git a/src/methodframe.h b/src/methodframe.h index c435df7..eac62b5 100644 --- a/src/methodframe.h +++ b/src/methodframe.h @@ -49,6 +49,14 @@ public: */ virtual ~MethodFrame() {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override { return true; } + /** * Get the message type * @return uint8_t diff --git a/src/queuebindframe.h b/src/queuebindframe.h index 93b30c6..bc4d2e9 100644 --- a/src/queuebindframe.h +++ b/src/queuebindframe.h @@ -110,6 +110,18 @@ public: _arguments(frame) {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous without the nowait option + return !noWait(); + } + /** * Returns the method id * @return uint16_t diff --git a/src/queuebindokframe.h b/src/queuebindokframe.h index 1d55a33..bf27b87 100644 --- a/src/queuebindokframe.h +++ b/src/queuebindokframe.h @@ -69,7 +69,7 @@ public: if(!channel) return false; // report to handler - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/queuedeclareframe.h b/src/queuedeclareframe.h index 214cd48..475c8b6 100644 --- a/src/queuedeclareframe.h +++ b/src/queuedeclareframe.h @@ -98,6 +98,18 @@ public: _arguments(frame) {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous without the nowait option + return !noWait(); + } + /** * returns the method id * @return string @@ -156,7 +168,7 @@ public: * returns whether to wait for a response * @return bool */ - bool noWait() + bool noWait() const { return _bools.get(4); } diff --git a/src/queuedeclareokframe.h b/src/queuedeclareokframe.h index eb660d8..852c4ec 100644 --- a/src/queuedeclareokframe.h +++ b/src/queuedeclareokframe.h @@ -133,7 +133,7 @@ public: if (!channel) return false; // report success - channel->reportSuccess(name(), messageCount(), consumerCount()); + if (channel->reportSuccess(name(), messageCount(), consumerCount())) channel->synchronized(); // done return true; diff --git a/src/queuedeleteframe.h b/src/queuedeleteframe.h index 8892c5e..dc9a00a 100644 --- a/src/queuedeleteframe.h +++ b/src/queuedeleteframe.h @@ -85,6 +85,18 @@ public: _bools(frame) {} + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous without the nowait option + return !noWait(); + } + /** * returns the method id * @returns uint16_t diff --git a/src/queuedeleteokframe.h b/src/queuedeleteokframe.h index 39cd9b5..c214aa3 100644 --- a/src/queuedeleteokframe.h +++ b/src/queuedeleteokframe.h @@ -94,7 +94,7 @@ public: if(!channel) return false; // report queue deletion success - channel->reportSuccess(this->messageCount()); + if (channel->reportSuccess(this->messageCount())) channel->synchronized(); // done return true; diff --git a/src/queuepurgeframe.h b/src/queuepurgeframe.h index e2a45b2..08cfad5 100644 --- a/src/queuepurgeframe.h +++ b/src/queuepurgeframe.h @@ -1,6 +1,6 @@ /** * Class describing an AMQP queue purge frame - * + * * @copyright 2014 Copernica BV */ @@ -27,7 +27,7 @@ private: ShortString _name; /** - * Do not wait on response + * Do not wait on response * @var BooleanSet */ BooleanSet _noWait; @@ -37,7 +37,7 @@ protected: * Encode the frame into a buffer * * @param buffer buffer to write frame to - */ + */ virtual void fill(OutBuffer& buffer) const override { // call base @@ -63,13 +63,13 @@ public: * @param noWait Do not wait on response * * @return newly created Queuepurgeframe - */ + */ QueuePurgeFrame(uint16_t channel, const std::string& name, bool noWait = false) : QueueFrame(channel, name.length() + 4), // 1 extra for string length, 1 for bool, 2 for deprecated field _name(name), _noWait(noWait) {} - + /** * Constructor based on received data * @param frame received frame @@ -80,11 +80,23 @@ public: _name(frame), _noWait(frame) {} - + + /** + * Is this a synchronous frame? + * + * After a synchronous frame no more frames may be + * sent until the accompanying -ok frame arrives + */ + bool synchronous() const override + { + // we are synchronous without the nowait option + return !noWait(); + } + /** * The method ID * @return method id - */ + */ virtual uint16_t methodID() const override { return 30; diff --git a/src/queuepurgeokframe.h b/src/queuepurgeokframe.h index 166cb60..aaa939d 100644 --- a/src/queuepurgeokframe.h +++ b/src/queuepurgeokframe.h @@ -94,7 +94,7 @@ public: if(!channel) return false; // report queue purge success - channel->reportSuccess(this->messageCount()); + if (channel->reportSuccess(this->messageCount())) channel->synchronized(); // done return true; diff --git a/src/queueunbindokframe.h b/src/queueunbindokframe.h index 14a2761..cdcb570 100644 --- a/src/queueunbindokframe.h +++ b/src/queueunbindokframe.h @@ -73,7 +73,7 @@ public: if(!channel) return false; // report queue unbind success - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/transactioncommitokframe.h b/src/transactioncommitokframe.h index 27f7004..3993a8f 100644 --- a/src/transactioncommitokframe.h +++ b/src/transactioncommitokframe.h @@ -74,7 +74,7 @@ public: if(!channel) return false; // report that the channel is open - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/transactionrollbackokframe.h b/src/transactionrollbackokframe.h index e87d893..868626b 100644 --- a/src/transactionrollbackokframe.h +++ b/src/transactionrollbackokframe.h @@ -74,7 +74,7 @@ public: if(!channel) return false; // report that the channel is open - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true; diff --git a/src/transactionselectokframe.h b/src/transactionselectokframe.h index 8a842f7..b66aede 100644 --- a/src/transactionselectokframe.h +++ b/src/transactionselectokframe.h @@ -74,7 +74,7 @@ public: if(!channel) return false; // report that the channel is open - channel->reportSuccess(); + if (channel->reportSuccess()) channel->synchronized(); // done return true;