the connection.close function was only waiting for synchronous calls to complete, async calls that were waiting (after a synchronous) were still discarded, this has been fixed

This commit is contained in:
Emiel Bruijntjes 2014-08-20 13:40:29 +02:00
parent d23e818f64
commit c7b3f71f14
3 changed files with 49 additions and 14 deletions

View File

@ -52,8 +52,7 @@ public:
*/ */
void onReady(const SuccessCallback &callback) void onReady(const SuccessCallback &callback)
{ {
// store callback in implementation _implementation->onReady(callback);
_implementation->_readyCallback = callback;
} }
/** /**
@ -66,8 +65,7 @@ public:
*/ */
void onError(const ErrorCallback &callback) void onError(const ErrorCallback &callback)
{ {
// store callback in implementation _implementation->onError(callback);
_implementation->_errorCallback = callback;
} }
/** /**

View File

@ -144,9 +144,40 @@ public:
*/ */
void detach() void detach()
{ {
// connection is gone
_connection = nullptr; _connection = nullptr;
} }
/**
* Callback that is called when the channel was succesfully created.
* @param callback the callback to execute
*/
void onReady(const SuccessCallback &callback)
{
// store callback
_readyCallback = callback;
// direct call if channel is already ready
if (_state == state_connected) callback();
}
/**
* Callback that is called when an error occurs.
*
* Only one error callback can be registered. Calling this function
* multiple times will remove the old callback.
*
* @param callback the callback to execute
*/
void onError(const ErrorCallback &callback)
{
// store callback
_errorCallback = callback;
// direct call if channel is already in error state
if (_state != state_connected) callback("Channel is in error state");
}
/** /**
* Pause deliveries on a channel * Pause deliveries on a channel
* *
@ -480,7 +511,7 @@ public:
*/ */
bool waiting() const bool waiting() const
{ {
return _synchronous; return _synchronous || !_queue.empty();
} }
/** /**

View File

@ -80,13 +80,16 @@ void ChannelImpl::attach(Connection *connection)
// this is invalid // this is invalid
_state = state_closed; _state = state_closed;
} }
else else
{ {
// busy connecting // assume channel is connected
_state = state_connected; _state = state_connected;
// valid id, send a channel open frame // send the open frame
send(ChannelOpenFrame(_id)); if (send(ChannelOpenFrame(_id))) return;
// report an error
reportError("Channel could not be initialized", true);
} }
} }
@ -663,11 +666,14 @@ bool ChannelImpl::send(const Frame &frame)
return true; return true;
} }
// enter synchronous mode if necessary
_synchronous = frame.synchronous();
// send to tcp connection // send to tcp connection
return _connection->send(frame); if (!_connection->send(frame)) return false;
// frame was sent, if this was a synchronous frame, we now have to wait
_synchronous = frame.synchronous();
// done
return true;
} }
/** /**