diff --git a/include/amqpcpp/linux_tcp/tcpparent.h b/include/amqpcpp/linux_tcp/tcpparent.h index 968bd4e..a47b27b 100644 --- a/include/amqpcpp/linux_tcp/tcpparent.h +++ b/include/amqpcpp/linux_tcp/tcpparent.h @@ -65,7 +65,7 @@ public: virtual void onIdle(TcpState *state, int socket, int events) = 0; /** - * Method that is called when an error occurs (the connection is lost then) + * Method that is called when an error occurs (the connection is lost) * @param state * @param error */ diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index 048e6b0..f112153 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -307,24 +307,8 @@ public: /** * Destructor */ - virtual ~SslConnected() noexcept - { - // no cleanup if socket is gone - if (_socket < 0) return; - - // and stop monitoring it - _parent->onIdle(this, _socket, 0); - - // close the socket - ::close(_socket); - } + virtual ~SslConnected() noexcept = default; - /** - * The filedescriptor of this connection - * @return int - */ - virtual int fileno() const override { return _socket; } - /** * Number of bytes in the outgoing buffer * @return std::size_t @@ -438,37 +422,11 @@ public: } /** - * When the AMQP transport layer is closed - * @param monitor Object that can be used if connection is still alive - * @return TcpState New implementation object + * Gracefully close the connection + * @return TcpState The next state */ - virtual TcpState *onAmqpClosed(const Monitor &monitor) override - { - // remember that the object is going to be closed - _closed = true; - - // if the previous operation is still in progress we can wait for that - if (_state != state_idle) return this; - - // the connection can be closed right now, move to the next state - return new SslShutdown(this, std::move(_ssl)); - } - - /** - * When an error occurs in the AMQP protocol - * @param monitor Monitor that can be used to check if the connection is still alive - * @param message The error message - * @return TcpState New implementation object - */ - virtual TcpState *onAmqpError(const Monitor &monitor, const char *message) override - { - // tell the user about it - // @todo do we need this here? - //_handler->onError(_connection, message); - - // stop if the object was destructed - if (!monitor.valid()) return nullptr; - + virtual TcpState *close() override + { // remember that the object is going to be closed _closed = true; diff --git a/src/linux_tcp/sslhandshake.h b/src/linux_tcp/sslhandshake.h index 9c763cd..1e8e772 100644 --- a/src/linux_tcp/sslhandshake.h +++ b/src/linux_tcp/sslhandshake.h @@ -45,28 +45,6 @@ private: TcpOutBuffer _out; - /** - * Close the socket - * @return bool - */ - bool close() - { - // do nothing if already closed - if (_socket < 0) return false; - - // and stop monitoring it - _parent->onIdle(this, _socket, 0); - - // close the socket - ::close(_socket); - - // forget filedescriptor - _socket = -1; - - // done - return true; - } - /** * Report a new state * @param monitor @@ -80,24 +58,16 @@ private: // leap out if the user space function destructed the object if (!monitor.valid()) return nullptr; - // copy the socket because we might forget it -// auto socket = _socket; - - // forget the socket member to prevent that it is closed by the destructor - _socket = -1; - // if connection is allowed, we move to the next state if (allowed) return new SslConnected(this, std::move(_ssl), std::move(_out)); // report that the connection is broken - // @todo do we need this? - //_handler->onError(_connection, "TLS connection has been rejected"); + _parent->onError(this, "TLS connection has been rejected"); // the onError method could have destructed this object if (!monitor.valid()) return nullptr; // shutdown the connection - // @todo the onClosed() does not have to be called return new SslShutdown(this, std::move(_ssl)); } @@ -108,16 +78,14 @@ private: */ TcpState *reportError(const Monitor &monitor) { - // close the socket - close(); - // we have an error - report this to the user - // @todo do we need this? - //_handler->onError(_connection, "failed to setup ssl connection"); + _parent->onError(this, "failed to setup ssl connection"); - // done, go to the closed state (plus check if connection still exists, because - // after the onError() call the user space program may have destructed that object) - return monitor.valid() ? new TcpClosed(this) : nullptr; + // stop if connection is gone + if (!monitor.valid()) return nullptr; + + // done, shutdown the tcp connection + return new TcpShutdown(this); } /** @@ -167,21 +135,7 @@ public: /** * Destructor */ - virtual ~SslHandshake() noexcept - { - // leap out if socket is invalidated - if (_socket < 0) return; - - // the object got destructed without moving to a new state, this - // situation should normally not occur - ::close(_socket); - } - - /** - * The filedescriptor of this connection - * @return int - */ - virtual int fileno() const override { return _socket; } + virtual ~SslHandshake() noexcept = default; /** * Number of bytes in the outgoing buffer diff --git a/src/linux_tcp/sslshutdown.h b/src/linux_tcp/sslshutdown.h index b7f5762..1163d0c 100644 --- a/src/linux_tcp/sslshutdown.h +++ b/src/linux_tcp/sslshutdown.h @@ -67,7 +67,6 @@ private: default: // go to the final state (if not yet disconnected) - // @todo special treatment for ssl-protocol errors return proceed(monitor); } } diff --git a/src/linux_tcp/tcpconnected.h b/src/linux_tcp/tcpconnected.h index c3c1d37..c7a0ebc 100644 --- a/src/linux_tcp/tcpconnected.h +++ b/src/linux_tcp/tcpconnected.h @@ -57,51 +57,31 @@ private: bool _finalized = false; - /** - * Start an elegant shutdown - * - * @todo remove this method - */ - void shutdown2() - { - // we will shutdown the socket in a very elegant way, we notify the peer - // that we will not be sending out more write operations - ::shutdown(_socket, SHUT_WR); - - // we still monitor the socket for readability to see if our close call was - // confirmed by the peer - _parent->onIdle(this, _socket, readable); - } - /** * Helper method to report an error - * @param monitor Monitor to check validity of "this" * @return bool Was an error reported? */ - bool reportError(const Monitor &monitor) + bool reportError() { // some errors are ok and do not (necessarily) mean that we're disconnected if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false; - // tell the connection that it failed - // @todo we should report an error, but that could be wrong, because it calls back to us + // tell the parent that it failed + _parent->onError(this, "connection lost"); - // we're no longer interested in the socket (this also calls onClosed()) - cleanup(); - // done return true; } /** - * Construct the next state + * Construct the shutdown state * @param monitor Object that monitors whether connection still exists * @return TcpState* */ TcpState *nextState(const Monitor &monitor) { - // if the object is still in a valid state, we can move to the close-state, - // otherwise there is no point in moving to a next state + // if the object is still in a valid state, we can treat the connection + // as closed otherwise there is no point in moving to a next state return monitor.valid() ? new TcpClosed(this) : nullptr; } @@ -128,12 +108,6 @@ public: */ virtual ~TcpConnected() noexcept = default; - /** - * The filedescriptor of this connection - * @return int - */ - virtual int fileno() const override { return _socket; } - /** * Number of bytes in the outgoing buffer * @return std::size_t @@ -159,7 +133,7 @@ public: auto result = _out.sendto(_socket); // are we in an error state? - if (result < 0 && reportError(monitor)) return nextState(monitor); + if (result < 0 && reportError()) return nextState(monitor); // if buffer is empty by now, we no longer have to check for // writability, but only for readability @@ -173,7 +147,7 @@ public: ssize_t result = _in.receivefrom(_socket, _parent->expected()); // are we in an error state? - if (result < 0 && reportError(monitor)) return nextState(monitor); + if (result < 0 && reportError()) return nextState(monitor); // @todo should we also check for result == 0 @@ -256,34 +230,16 @@ public: // all has been sent return this; } - - /** - * When the AMQP transport layer is closed - * @param monitor Object that can be used if connection is still alive - * @return TcpState New implementation object - */ - virtual TcpState *onAmqpClosed(const Monitor &monitor) override - { - // move to the tcp shutdown state - return new TcpShutdown(this); - } /** - * When an error occurs in the AMQP protocol - * @param monitor Monitor that can be used to check if the connection is still alive - * @param message The error message - * @return TcpState New implementation object + * Gracefully close the connection + * @return TcpState The next state */ - virtual TcpState *onAmqpError(const Monitor &monitor, const char *message) override + virtual TcpState *close() override { - // tell the user about it - // @todo do this somewhere else - //_handler->onError(_connection, message); + // @todo what if we're still busy receiving data? - // stop if the object was destructed - if (!monitor.valid()) return nullptr; - - // move to the tcp shutdown state + // start the tcp shutdown return new TcpShutdown(this); } diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index 59c4c2b..73f111b 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -88,6 +88,7 @@ void TcpConnection::process(int fd, int flags) else { // replace it with the new implementation + // @todo destructing the existing _state may destruct the entire object _state.reset(newstate); } } @@ -178,8 +179,8 @@ void TcpConnection::onError(Connection *connection, const char *message) // remember the old state (this is necessary because _state may be modified by user-code) auto *oldstate = _state.get(); - // tell the state that an error occured at the amqp level - auto *newstate = _state->onAmqpError(monitor, message); + // tell the state that the connection should be closed asap + auto *newstate = _state->close(); // leap out if nothing changes if (newstate == nullptr || newstate == oldstate) return; @@ -200,8 +201,8 @@ void TcpConnection::onClosed(Connection *connection) // remember the old state (this is necessary because _state may be modified by user-code) auto *oldstate = _state.get(); - // tell the state that the connection was closed at the amqp level - auto *newstate = _state->onAmqpClosed(monitor); + // tell the state that the connection should be closed asap + auto *newstate = _state->close(); // leap out if nothing changes if (newstate == nullptr || newstate == oldstate) return; diff --git a/src/linux_tcp/tcpextstate.h b/src/linux_tcp/tcpextstate.h index cbb1f45..8f78f32 100644 --- a/src/linux_tcp/tcpextstate.h +++ b/src/linux_tcp/tcpextstate.h @@ -30,7 +30,7 @@ protected: int _socket; /** - * Clean-up the socket + * Clean-up the socket, and call the onClosed() method */ void cleanup() { diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index 8384ebc..abb140d 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -108,7 +108,7 @@ private: _error = strerror(errno); // close socket because connect failed - close(_socket); + ::close(_socket); // socket no longer is valid _socket = -1; diff --git a/src/linux_tcp/tcpshutdown.h b/src/linux_tcp/tcpshutdown.h index e6f3142..5ad1250 100644 --- a/src/linux_tcp/tcpshutdown.h +++ b/src/linux_tcp/tcpshutdown.h @@ -30,16 +30,6 @@ namespace AMQP { */ class TcpShutdown : public TcpExtState { -protected: - /** - * Method to report the result to the user - */ - virtual void report() - { - // report that the connection was closed - _parent->onClosed(this); - } - public: /** * Constructor @@ -108,9 +98,6 @@ public: // immediately close the socket cleanup(); - // report to the user that the operation is finished - report(); - // move to next state return monitor.valid() ? new TcpClosed(this) : nullptr; } diff --git a/src/linux_tcp/tcpstate.h b/src/linux_tcp/tcpstate.h index 1f8a2da..c372b33 100644 --- a/src/linux_tcp/tcpstate.h +++ b/src/linux_tcp/tcpstate.h @@ -92,6 +92,12 @@ public: // default does nothing } + /** + * Gracefully close the connection + * @return TcpState The next state + */ + virtual TcpState *close() { return this; } + /** * Flush the connection, all outgoing operations should be completed. * @@ -115,21 +121,6 @@ public: * @param heartbeat suggested heartbeat */ virtual void maxframe(size_t maxframe) {} - - /** - * Events that can take place during the AMQP protocol - * - * Both events also trigger the end of a valid connection, and should - * be used to tear down the TCP connection. - * - * @todo are these appropriate names? - * - * @param monitor - * @param TcpState - */ - virtual TcpState *onAmqpError(const Monitor &monitor, const char *error) { return this; } - virtual TcpState *onAmqpClosed(const Monitor &monitor) { return this; } - }; /**