From a93b88697dc56ad79cfabb13efe421d9f0156290 Mon Sep 17 00:00:00 2001 From: Martijn Otto Date: Mon, 18 May 2015 10:56:50 +0200 Subject: [PATCH] Fix double ready bug for channel, fixes #25 --- include/channelimpl.h | 58 +++++++++++++++++++++++-------------------- src/channelimpl.cpp | 2 +- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/include/channelimpl.h b/include/channelimpl.h index 47da82d..6261827 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -76,6 +76,7 @@ private: */ enum { state_connected, + state_ready, state_closing, state_closed } _state = state_closed; @@ -85,7 +86,7 @@ private: * * We store the data as well as whether they * should be handled synchronously. - * + * * @var std::queue */ std::queue> _queue; @@ -169,9 +170,9 @@ public: { // store callback _readyCallback = callback; - + // direct call if channel is already ready - if (_state == state_connected) callback(); + if (_state == state_ready) callback(); } /** @@ -186,9 +187,9 @@ public: { // store callback _errorCallback = callback; - + // direct call if channel is already in error state - if (_state != state_connected) callback("Channel is in error state"); + if (!connected()) callback("Channel is in error state"); } /** @@ -217,7 +218,7 @@ public: */ bool connected() { - return _state == state_connected; + return _state == state_connected || _state == state_ready; } /** @@ -388,7 +389,7 @@ public: * * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. - * + * * @param count number of messages to pre-fetch * @param global share count between all consumers on the same channel */ @@ -437,34 +438,34 @@ public: /** * Retrieve a single message from RabbitMQ - * + * * When you call this method, you can get one single message from the queue (or none * at all if the queue is empty). The deferred object that is returned, should be used * to install a onEmpty() and onSuccess() callback function that will be called * when the message is consumed and/or when the message could not be consumed. - * + * * The following flags are supported: - * + * * - noack if set, consumed messages do not have to be acked, this happens automatically - * + * * @param queue name of the queue to consume from * @param flags optional flags - * - * The object returns a deferred handler. Callbacks can be installed + * + * The object returns a deferred handler. Callbacks can be installed * using onSuccess(), onEmpty(), onError() and onFinalize() methods. - * + * * The onSuccess() callback has the following signature: - * + * * void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered); - * + * * For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) { - * + * * std::cout << "Message fetched" << std::endl; - * + * * }).onEmpty([]() { - * + * * std::cout << "Queue is empty" << std::endl; - * + * * }); */ DeferredGet &get(const std::string &queue, int flags = 0); @@ -528,7 +529,7 @@ public: } /** - * Signal the channel that a synchronous operation was completed. + * Signal the channel that a synchronous operation was completed. * After this operation, waiting frames can be sent out. */ void onSynchronized(); @@ -541,6 +542,9 @@ public: // callbacks could destroy us, so monitor it Monitor monitor(this); + // if we are still in connected state we are now ready + if (_state == state_connected) _state = state_ready; + // inform handler if (_readyCallback) _readyCallback(); @@ -552,7 +556,7 @@ public: * Report to the handler that the channel is closed * * Returns whether the channel object is still valid - * + * * @return bool */ bool reportClosed() @@ -567,15 +571,15 @@ public: // and pass on to the reportSuccess() method which will call the // appropriate deferred object to report the successful operation bool result = reportSuccess(); - + // leap out if object no longer exists if (!monitor.valid()) return result; - + // all later deferred objects should report an error, because it // was not possible to complete the instruction as the channel is // now closed reportError("Channel has been closed", false); - + // done return result; } @@ -584,7 +588,7 @@ public: * Report success * * Returns whether the channel object is still valid - * + * * @param mixed * @return bool */ @@ -596,7 +600,7 @@ public: // we are going to call callbacks that could destruct the channel Monitor monitor(this); - + // copy the callback (so that it will not be destructed during // the "reportSuccess" call, if the channel is destructed during the call) auto cb = _oldestCallback; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 867ae78..e285385 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -192,7 +192,7 @@ Deferred &ChannelImpl::rollbackTransaction() Deferred &ChannelImpl::close() { // this is completely pointless if not connected - if (_state != state_connected) return push(std::make_shared(_state == state_closing)); + if (!connected()) return push(std::make_shared(_state == state_closing)); // send a channel close frame auto &handler = push(ChannelCloseFrame(_id));