{auto} if connection.close() was called before the connection was set up, the instructions that were called between setting up the connection and the call to close() were lost. This forced users to always use the onConnected() handler to wait for the first instruction to be sent. But this is not according to the library design, so now the instructions that are given before the close() will be executed too;
This commit is contained in:
parent
8ae54cc598
commit
6cfead9902
|
|
@ -44,6 +44,12 @@ protected:
|
||||||
state_closed // connection is closed
|
state_closed // connection is closed
|
||||||
} _state = state_protocol;
|
} _state = state_protocol;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Has the close() method been called?
|
||||||
|
* @var bool
|
||||||
|
*/
|
||||||
|
bool _closed = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All channels that are active
|
* All channels that are active
|
||||||
* @var map
|
* @var map
|
||||||
|
|
@ -86,6 +92,13 @@ protected:
|
||||||
*/
|
*/
|
||||||
std::queue<OutBuffer> _queue;
|
std::queue<OutBuffer> _queue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to send the close frame
|
||||||
|
* Return value tells if the connection is still valid
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool sendClose();
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -80,6 +80,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Get the size this field will take when
|
* Get the size this field will take when
|
||||||
* encoded in the AMQP wire-frame format
|
* encoded in the AMQP wire-frame format
|
||||||
|
* @return size_t
|
||||||
*/
|
*/
|
||||||
virtual size_t size() const override
|
virtual size_t size() const override
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -157,8 +157,28 @@ size_t ConnectionImpl::parse(char *buffer, size_t size)
|
||||||
bool ConnectionImpl::close()
|
bool ConnectionImpl::close()
|
||||||
{
|
{
|
||||||
// leap out if already closed or closing
|
// leap out if already closed or closing
|
||||||
if (_state == state_closed || _state == state_closing) return false;
|
if (_closed) return false;
|
||||||
|
|
||||||
|
// mark that the object is closed
|
||||||
|
_closed = true;
|
||||||
|
|
||||||
|
// if still busy with handshake, we delay closing for a while
|
||||||
|
if (_state == state_handshake || _state == state_protocol) return true;
|
||||||
|
|
||||||
|
// perform the close operation
|
||||||
|
sendClose();
|
||||||
|
|
||||||
|
// done
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to send the close frames
|
||||||
|
* Returns true if object still exists
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool ConnectionImpl::sendClose()
|
||||||
|
{
|
||||||
// after the send operation the object could be dead
|
// after the send operation the object could be dead
|
||||||
Monitor monitor(this);
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
|
@ -169,14 +189,14 @@ bool ConnectionImpl::close()
|
||||||
iter->second->close();
|
iter->second->close();
|
||||||
|
|
||||||
// we could be dead now
|
// we could be dead now
|
||||||
if (!monitor.valid()) return true;
|
if (!monitor.valid()) return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// send the close frame
|
// send the close frame
|
||||||
if (!send(ConnectionCloseFrame(0, "shutdown"))) return false;
|
send(ConnectionCloseFrame(0, "shutdown"));
|
||||||
|
|
||||||
// leap out if object no longer is alive
|
// leap out if object no longer is alive
|
||||||
if (!monitor.valid()) return true;
|
if (!monitor.valid()) return false;
|
||||||
|
|
||||||
// we're in a new state
|
// we're in a new state
|
||||||
_state = state_closing;
|
_state = state_closing;
|
||||||
|
|
@ -193,9 +213,13 @@ void ConnectionImpl::setConnected()
|
||||||
// store connected state
|
// store connected state
|
||||||
_state = state_connected;
|
_state = state_connected;
|
||||||
|
|
||||||
|
// if the close operation was already called, we do that again now again
|
||||||
|
// so that the actual messages to close down the connection and the channel
|
||||||
|
// are appended to the queue
|
||||||
|
if (_closed && !sendClose()) return;
|
||||||
|
|
||||||
// we're going to call the handler, which can destruct the connection,
|
// we're going to call the handler, which can destruct the connection,
|
||||||
// so we must monitor if the queue object is still valid after calling the
|
// so we must monitor if the queue object is still valid after calling
|
||||||
// handler
|
|
||||||
Monitor monitor(this);
|
Monitor monitor(this);
|
||||||
|
|
||||||
// inform handler
|
// inform handler
|
||||||
|
|
@ -241,7 +265,7 @@ bool ConnectionImpl::send(const Frame &frame)
|
||||||
if (frame.needsSeparator()) buffer.add((uint8_t)206);
|
if (frame.needsSeparator()) buffer.add((uint8_t)206);
|
||||||
|
|
||||||
// are we still setting up the connection?
|
// are we still setting up the connection?
|
||||||
if (_state == state_connected || frame.partOfHandshake())
|
if ((_state == state_connected && _queue.size() == 0) || frame.partOfHandshake())
|
||||||
{
|
{
|
||||||
// send the buffer
|
// send the buffer
|
||||||
_handler->onData(_parent, buffer.data(), buffer.size());
|
_handler->onData(_parent, buffer.data(), buffer.size());
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue