From 428219ad83cca794cac9437cd684d992a81566ff Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Wed, 7 Nov 2018 15:04:08 +0100 Subject: [PATCH] simplified and improved the close procedure --- include/amqpcpp/linux_tcp/tcpconnection.h | 8 --- src/linux_tcp/sslconnected.h | 23 ++++--- src/linux_tcp/tcpconnected.h | 25 ++++--- src/linux_tcp/tcpconnection.cpp | 79 +++++------------------ src/linux_tcp/tcpoutbuffer.h | 12 ++++ src/linux_tcp/tcpstate.h | 5 +- 6 files changed, 57 insertions(+), 95 deletions(-) diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index 9e29b79..d99cdfa 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -52,14 +52,6 @@ private: Connection _connection; - /** - * Assign a new state - * @param monitor - * @param state - * @return bool - */ - bool assign(const Monitor &monitor, TcpState *state); - /** * Method that is called when the heartbeat frequency is negotiated. * @param connection The connection that suggested a heartbeat interval diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index 0269b21..de36f00 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -119,7 +119,7 @@ private: _parent->onIdle(this, _socket, readable); // allow chaining - return monitor.valid() ? this : nullptr; + return this; case SSL_ERROR_WANT_WRITE: // remember state @@ -129,17 +129,14 @@ private: _parent->onIdle(this, _socket, readable | writable); // allow chaining - return monitor.valid() ? this : nullptr; + return this; case SSL_ERROR_NONE: // we're ready for the next instruction from userspace _state = state_idle; - // turns out no error occured, an no action has to be rescheduled - _parent->onIdle(this, _socket, _out || _closed ? readable | writable : readable); - - // allow chaining - return monitor.valid() ? this : nullptr; + // if already closed, proceed to next state + return proceed(); default: // report an error to user-space @@ -352,6 +349,9 @@ public: */ virtual void send(const char *buffer, size_t size) override { + // do nothing if already busy closing + if (_closed) return; + // put the data in the outgoing buffer _out.add(buffer, size); @@ -365,18 +365,17 @@ public: /** * Gracefully close the connection - * @return TcpState The next state */ - virtual TcpState *close() override + virtual void close() override { // remember that the object is going to be closed _closed = true; // if the previous operation is still in progress we can wait for that - if (_state != state_idle) return this; + if (_state != state_idle) return; - // the connection can be closed right now, move to the next state - return new SslShutdown(this, std::move(_ssl)); + // let's wait until the socket becomes writable (because then we can start the shutdown) + _parent->onIdle(this, _socket, readable | writable); } /** diff --git a/src/linux_tcp/tcpconnected.h b/src/linux_tcp/tcpconnected.h index 5244b32..e78dc23 100644 --- a/src/linux_tcp/tcpconnected.h +++ b/src/linux_tcp/tcpconnected.h @@ -135,9 +135,14 @@ public: // are we in an error state? if (result < 0 && reportError()) return finalState(monitor); - // if buffer is empty by now, we no longer have to check for - // writability, but only for readability - if (!_out) _parent->onIdle(this, _socket, readable); + // if we still have a buffer, we keep on monitoring + if (_out) return this; + + // if we do not expect to send more data, we can close the connection for writing + if (_closed) shutdown(_socket, SHUT_WR); + + // check for readability (to find more data, or to be notified that connection is gone) + _parent->onIdle(this, _socket, readable); } // should we check for readability too? @@ -185,6 +190,9 @@ public: */ virtual void send(const char *buffer, size_t size) override { + // we stop sending when connection is closed + if (_closed) return; + // is there already a buffer of data that can not be sent? if (_out) return _out.add(buffer, size); @@ -206,16 +214,18 @@ public: /** * Gracefully close the connection - * @return TcpState The next state */ - virtual TcpState *close() override + virtual void close() override { // do nothing if already closed - if (_closed) return this; + if (_closed) return; // remember that the connection is closed _closed = true; + // wait until the outgoing buffer is all gone + if (_out) return; + // we will shutdown the socket in a very elegant way, we notify the peer // that we will not be sending out more write operations shutdown(_socket, SHUT_WR); @@ -223,9 +233,6 @@ public: // we still monitor the socket for readability to see if our close call was // confirmed by the peer _parent->onIdle(this, _socket, readable); - - // start the tcp shutdown - return this; } /** diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index c4d878d..2803e85 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -57,30 +57,6 @@ std::size_t TcpConnection::queued() const return _state->queued(); } -/** - * Assign a new state - * @param monitor - * @param state - * @return bool - */ -bool TcpConnection::assign(const Monitor &monitor, TcpState *state) -{ - // not possible if object is already destructed - if (!monitor.valid()) return false; - - // destruct the old state first (this could destruct "this") - _state.reset(nullptr); - - // leap out if object was destructed - if (!monitor.valid()) return false; - - // assign the new state - _state.reset(state); - - // done - return true; -} - /** * Process the TCP connection * This method should be called when the filedescriptor that is registered @@ -95,20 +71,23 @@ void TcpConnection::process(int fd, int flags) // monitor the object for destruction, because you never know what the user Monitor monitor(this); - // store the old state - auto *oldstate = _state.get(); - // pass on the the state, that returns a new impl auto *newstate = _state->process(monitor, fd, flags); // if the state did not change, we do not have to update a member, // when the newstate is nullptr, the object is (being) destructed // and we do not have to do anything else either - if (oldstate == newstate || newstate == nullptr) return; + if (newstate == nullptr || newstate == _state.get()) return; + + // wrap the new state in a unique-ptr so that so that the old state + // is not destructed before the new one is assigned + std::unique_ptr ptr(newstate); + + // swap the two pointers (this ensures that the last operation of this + // method is to destruct the old state, which possible results in calls + // to user-space and the destruction of "this" + _state.swap(ptr); - // in a bizarre set of circumstances, the user may have implemented the - // handler in such a way that the connection object was destructed - if (!assign(monitor, newstate)) delete newstate; } /** @@ -125,15 +104,6 @@ bool TcpConnection::close(bool immediate) // fail the connection / report the error to user-space _connection.fail("connection prematurely closed by client"); - // construct a monitor to check if object is still alive - Monitor monitor(this); - - // get rid of the old state - _state.reset(nullptr); - - // leap out if object was destructed - if (!monitor.valid()) return true; - // change the state _state.reset(new TcpClosed(this)); @@ -181,17 +151,11 @@ void TcpConnection::onError(Connection *connection, const char *message) // tell this to the user _handler->onError(this, message); - // remember the old state (this is necessary because _state may be modified by user-code) - auto *oldstate = _state.get(); + // object could be destructed by user-space + if (!monitor.valid()) return; // tell the state that the connection should be closed asap - auto *newstate = _state->close(); - - // leap out if nothing changes - if (newstate == nullptr || newstate == oldstate) return; - - // assign the new state - _state.reset(newstate); + _state->close(); } /** @@ -200,20 +164,8 @@ void TcpConnection::onError(Connection *connection, const char *message) */ void TcpConnection::onClosed(Connection *connection) { - // monitor to check if "this" is destructed - Monitor monitor(this); - - // remember the old state (this is necessary because _state may be modified by user-code) - auto *oldstate = _state.get(); - // tell the state that the connection should be closed asap - auto *newstate = _state->close(); - - // leap out if nothing changes - if (newstate == nullptr || newstate == oldstate) return; - - // assign the new state - _state.reset(newstate); + _state->close(); } /** @@ -228,7 +180,8 @@ void TcpConnection::onError(TcpState *state, const char *message, bool connected // we wait for the subsequent call to the onClosed() method if (connected) return _handler->onError(this, message); - // monitor to check if "this" is destructed + // no extra onClosed() call is expected, so we have to report multiple things + // to user-space, we use a monitor to check if "this" is destructed in the middle Monitor monitor(this); // tell the handler diff --git a/src/linux_tcp/tcpoutbuffer.h b/src/linux_tcp/tcpoutbuffer.h index 31dbd27..ec6a1d9 100644 --- a/src/linux_tcp/tcpoutbuffer.h +++ b/src/linux_tcp/tcpoutbuffer.h @@ -196,6 +196,18 @@ public: } } + /** + * Clear the buffer + */ + void clear() + { + // clear all buffers + _buffers.clear(); + + // reset members + _skip = _size = 0; + } + /** * Fill an iovec buffer * @param buffers the buffers to be filled diff --git a/src/linux_tcp/tcpstate.h b/src/linux_tcp/tcpstate.h index 3a328f0..190091c 100644 --- a/src/linux_tcp/tcpstate.h +++ b/src/linux_tcp/tcpstate.h @@ -93,10 +93,9 @@ public: } /** - * Gracefully close the connection - * @return TcpState The next state + * Gracefully start closing the connection */ - virtual TcpState *close() { return this; } + virtual void close() {} /** * Install max-frame size