diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 7d23f98..1755281 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -43,7 +43,13 @@ protected: state_closing, // connection is busy closing (we have sent the close frame) state_closed // connection is closed } _state = state_protocol; - + + /** + * Has the close() method been called? + * @var bool + */ + bool _closed = false; + /** * All channels that are active * @var map @@ -85,6 +91,13 @@ protected: * @var queue */ std::queue _queue; + + /** + * Helper method to send the close frame + * Return value tells if the connection is still valid + * @return bool + */ + bool sendClose(); private: diff --git a/include/stringfield.h b/include/stringfield.h index 65cdf05..1a2ffd3 100644 --- a/include/stringfield.h +++ b/include/stringfield.h @@ -80,6 +80,7 @@ public: /** * Get the size this field will take when * encoded in the AMQP wire-frame format + * @return size_t */ virtual size_t size() const override { diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index e12a431..1a37f4a 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -157,8 +157,28 @@ size_t ConnectionImpl::parse(char *buffer, size_t size) bool ConnectionImpl::close() { // leap out if already closed or closing - if (_state == state_closed || _state == state_closing) return false; + if (_closed) return false; + // mark that the object is closed + _closed = true; + + // if still busy with handshake, we delay closing for a while + if (_state == state_handshake || _state == state_protocol) return true; + + // perform the close operation + sendClose(); + + // done + return true; +} + +/** + * Method to send the close frames + * Returns true if object still exists + * @return bool + */ +bool ConnectionImpl::sendClose() +{ // after the send operation the object could be dead Monitor monitor(this); @@ -169,14 +189,14 @@ bool ConnectionImpl::close() iter->second->close(); // we could be dead now - if (!monitor.valid()) return true; + if (!monitor.valid()) return false; } // send the close frame - if (!send(ConnectionCloseFrame(0, "shutdown"))) return false; + send(ConnectionCloseFrame(0, "shutdown")); // leap out if object no longer is alive - if (!monitor.valid()) return true; + if (!monitor.valid()) return false; // we're in a new state _state = state_closing; @@ -192,10 +212,14 @@ void ConnectionImpl::setConnected() { // store connected state _state = state_connected; + + // if the close operation was already called, we do that again now again + // so that the actual messages to close down the connection and the channel + // are appended to the queue + if (_closed && !sendClose()) return; // we're going to call the handler, which can destruct the connection, - // so we must monitor if the queue object is still valid after calling the - // handler + // so we must monitor if the queue object is still valid after calling Monitor monitor(this); // inform handler @@ -241,7 +265,7 @@ bool ConnectionImpl::send(const Frame &frame) if (frame.needsSeparator()) buffer.add((uint8_t)206); // are we still setting up the connection? - if (_state == state_connected || frame.partOfHandshake()) + if ((_state == state_connected && _queue.size() == 0) || frame.partOfHandshake()) { // send the buffer _handler->onData(_parent, buffer.data(), buffer.size());