Fix double ready bug for channel, fixes #25

This commit is contained in:
Martijn Otto 2015-05-18 10:56:50 +02:00
parent 7ae4f9c5ff
commit a93b88697d
2 changed files with 32 additions and 28 deletions

View File

@ -76,6 +76,7 @@ private:
*/ */
enum { enum {
state_connected, state_connected,
state_ready,
state_closing, state_closing,
state_closed state_closed
} _state = state_closed; } _state = state_closed;
@ -85,7 +86,7 @@ private:
* *
* We store the data as well as whether they * We store the data as well as whether they
* should be handled synchronously. * should be handled synchronously.
* *
* @var std::queue * @var std::queue
*/ */
std::queue<std::pair<bool, OutBuffer>> _queue; std::queue<std::pair<bool, OutBuffer>> _queue;
@ -169,9 +170,9 @@ public:
{ {
// store callback // store callback
_readyCallback = callback; _readyCallback = callback;
// direct call if channel is already ready // direct call if channel is already ready
if (_state == state_connected) callback(); if (_state == state_ready) callback();
} }
/** /**
@ -186,9 +187,9 @@ public:
{ {
// store callback // store callback
_errorCallback = callback; _errorCallback = callback;
// direct call if channel is already in error state // direct call if channel is already in error state
if (_state != state_connected) callback("Channel is in error state"); if (!connected()) callback("Channel is in error state");
} }
/** /**
@ -217,7 +218,7 @@ public:
*/ */
bool connected() bool connected()
{ {
return _state == state_connected; return _state == state_connected || _state == state_ready;
} }
/** /**
@ -388,7 +389,7 @@ public:
* *
* This function returns a deferred handler. Callbacks can be installed * This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods. * using onSuccess(), onError() and onFinalize() methods.
* *
* @param count number of messages to pre-fetch * @param count number of messages to pre-fetch
* @param global share count between all consumers on the same channel * @param global share count between all consumers on the same channel
*/ */
@ -437,34 +438,34 @@ public:
/** /**
* Retrieve a single message from RabbitMQ * Retrieve a single message from RabbitMQ
* *
* When you call this method, you can get one single message from the queue (or none * When you call this method, you can get one single message from the queue (or none
* at all if the queue is empty). The deferred object that is returned, should be used * at all if the queue is empty). The deferred object that is returned, should be used
* to install a onEmpty() and onSuccess() callback function that will be called * to install a onEmpty() and onSuccess() callback function that will be called
* when the message is consumed and/or when the message could not be consumed. * when the message is consumed and/or when the message could not be consumed.
* *
* The following flags are supported: * The following flags are supported:
* *
* - noack if set, consumed messages do not have to be acked, this happens automatically * - noack if set, consumed messages do not have to be acked, this happens automatically
* *
* @param queue name of the queue to consume from * @param queue name of the queue to consume from
* @param flags optional flags * @param flags optional flags
* *
* The object returns a deferred handler. Callbacks can be installed * The object returns a deferred handler. Callbacks can be installed
* using onSuccess(), onEmpty(), onError() and onFinalize() methods. * using onSuccess(), onEmpty(), onError() and onFinalize() methods.
* *
* The onSuccess() callback has the following signature: * The onSuccess() callback has the following signature:
* *
* void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered); * void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered);
* *
* For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) { * For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) {
* *
* std::cout << "Message fetched" << std::endl; * std::cout << "Message fetched" << std::endl;
* *
* }).onEmpty([]() { * }).onEmpty([]() {
* *
* std::cout << "Queue is empty" << std::endl; * std::cout << "Queue is empty" << std::endl;
* *
* }); * });
*/ */
DeferredGet &get(const std::string &queue, int flags = 0); DeferredGet &get(const std::string &queue, int flags = 0);
@ -528,7 +529,7 @@ public:
} }
/** /**
* Signal the channel that a synchronous operation was completed. * Signal the channel that a synchronous operation was completed.
* After this operation, waiting frames can be sent out. * After this operation, waiting frames can be sent out.
*/ */
void onSynchronized(); void onSynchronized();
@ -541,6 +542,9 @@ public:
// callbacks could destroy us, so monitor it // callbacks could destroy us, so monitor it
Monitor monitor(this); Monitor monitor(this);
// if we are still in connected state we are now ready
if (_state == state_connected) _state = state_ready;
// inform handler // inform handler
if (_readyCallback) _readyCallback(); if (_readyCallback) _readyCallback();
@ -552,7 +556,7 @@ public:
* Report to the handler that the channel is closed * Report to the handler that the channel is closed
* *
* Returns whether the channel object is still valid * Returns whether the channel object is still valid
* *
* @return bool * @return bool
*/ */
bool reportClosed() bool reportClosed()
@ -567,15 +571,15 @@ public:
// and pass on to the reportSuccess() method which will call the // and pass on to the reportSuccess() method which will call the
// appropriate deferred object to report the successful operation // appropriate deferred object to report the successful operation
bool result = reportSuccess(); bool result = reportSuccess();
// leap out if object no longer exists // leap out if object no longer exists
if (!monitor.valid()) return result; if (!monitor.valid()) return result;
// all later deferred objects should report an error, because it // all later deferred objects should report an error, because it
// was not possible to complete the instruction as the channel is // was not possible to complete the instruction as the channel is
// now closed // now closed
reportError("Channel has been closed", false); reportError("Channel has been closed", false);
// done // done
return result; return result;
} }
@ -584,7 +588,7 @@ public:
* Report success * Report success
* *
* Returns whether the channel object is still valid * Returns whether the channel object is still valid
* *
* @param mixed * @param mixed
* @return bool * @return bool
*/ */
@ -596,7 +600,7 @@ public:
// we are going to call callbacks that could destruct the channel // we are going to call callbacks that could destruct the channel
Monitor monitor(this); Monitor monitor(this);
// copy the callback (so that it will not be destructed during // copy the callback (so that it will not be destructed during
// the "reportSuccess" call, if the channel is destructed during the call) // the "reportSuccess" call, if the channel is destructed during the call)
auto cb = _oldestCallback; auto cb = _oldestCallback;

View File

@ -192,7 +192,7 @@ Deferred &ChannelImpl::rollbackTransaction()
Deferred &ChannelImpl::close() Deferred &ChannelImpl::close()
{ {
// this is completely pointless if not connected // this is completely pointless if not connected
if (_state != state_connected) return push(std::make_shared<Deferred>(_state == state_closing)); if (!connected()) return push(std::make_shared<Deferred>(_state == state_closing));
// send a channel close frame // send a channel close frame
auto &handler = push(ChannelCloseFrame(_id)); auto &handler = push(ChannelCloseFrame(_id));