From 62a4262a5890a54638cc63611029503974a2b800 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Sun, 28 Oct 2018 18:13:13 +0100 Subject: [PATCH] the TcpConnection::close() method now supports an "immediate" parameter to skip the official amqp handshake for closing down --- include/amqpcpp/linux_tcp/tcpconnection.h | 15 +++---- src/linux_tcp/sslconnected.h | 23 ++++++++++ src/linux_tcp/sslhandshake.h | 53 +++++++++++++++++++---- src/linux_tcp/sslshutdown.h | 23 ++++++++++ src/linux_tcp/tcpclosed.h | 11 +++++ src/linux_tcp/tcpconnected.h | 23 ++++++++++ src/linux_tcp/tcpconnection.cpp | 46 ++++++++++++++++++++ src/linux_tcp/tcpresolver.h | 20 +++++++++ src/linux_tcp/tcpstate.h | 7 +++ 9 files changed, 204 insertions(+), 17 deletions(-) diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index 9a7a1df..777eb12 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -156,16 +156,15 @@ public: void flush(); /** - * Close the connection - * This closes all channels and the TCP connection + * Close the connection in an elegant fashion. This closes all channels and the + * TCP connection. Note that the connection is not immediately closed: first all + * pending operations are completed, and then an AMQP closing-handshake is + * performed. If you pass a parameter "immediate=true" the connection is + * immediately closed, without waiting for earlier commands * @return bool */ - bool close() - { - // pass to the underlying connection - return _connection.close(); - } - + bool close(bool immediate = false); + /** * The max frame size. Useful if you set up a buffer to parse incoming data: it does not have to exceed this size. * @return uint32_t diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index d8a4c5f..d60a9c7 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -476,6 +476,29 @@ public: // done return this; } + + /** + * Close the connection immediately + * @param monitor object to check if connection object is still active + * @return TcpState the new state + */ + virtual TcpState *abort(const Monitor &monitor) override + { + // if we have already told user space that connection is gone + if (_finalized) return new TcpClosed(this); + + // object will be finalized now + _finalized = true; + + // close the connection + close(); + + // inform user space that the party is over + _handler->onError(_connection, "ssl connection terminated"); + + // go to the final state (if not yet disconnected) + return monitor.valid() ? new TcpClosed(this) : nullptr; + } /** * Send data over the connection diff --git a/src/linux_tcp/sslhandshake.h b/src/linux_tcp/sslhandshake.h index 06178c6..5de7928 100644 --- a/src/linux_tcp/sslhandshake.h +++ b/src/linux_tcp/sslhandshake.h @@ -51,6 +51,28 @@ private: TcpOutBuffer _out; + /** + * Close the socket + * @return bool + */ + bool close() + { + // do nothing if already closed + if (_socket < 0) return false; + + // and stop monitoring it + _handler->monitor(_connection, _socket, 0); + + // close the socket + ::close(_socket); + + // forget filedescriptor + _socket = -1; + + // done + return true; + } + /** * Report a new state * @param monitor @@ -90,14 +112,8 @@ private: */ TcpState *reportError(const Monitor &monitor) { - // we are no longer interested in any events for this socket - _handler->monitor(_connection, _socket, 0); - // close the socket - close(_socket); - - // forget filedescriptor - _socket = -1; + close(); // we have an error - report this to the user _handler->onError(_connection, "failed to setup ssl connection"); @@ -162,8 +178,9 @@ public: // leap out if socket is invalidated if (_socket < 0) return; - // the object got destructed without moving to a new state, this is normally - close(_socket); + // the object got destructed without moving to a new state, this + // situation should normally not occur + ::close(_socket); } /** @@ -257,6 +274,24 @@ public: } } } + + /** + * Close the connection immediately + * @param monitor object to check if connection object is still active + * @return TcpState the new state + */ + virtual TcpState *abort(const Monitor &monitor) override + { + // close the socket + close(); + + // report to the user that the handshake was aborted + _handler->onError(_connection, "ssl handshake aborted"); + + // done, go to the closed state (plus check if connection still exists, because + // after the onError() call the user space program may have destructed that object) + return monitor.valid() ? new TcpClosed(this) : nullptr; + } }; /** diff --git a/src/linux_tcp/sslshutdown.h b/src/linux_tcp/sslshutdown.h index f3a57c9..4d7a43d 100644 --- a/src/linux_tcp/sslshutdown.h +++ b/src/linux_tcp/sslshutdown.h @@ -242,6 +242,29 @@ public: } } } + + /** + * Abort the shutdown operation + * @param monitor Monitor that can be used to check if the tcp connection is still alive + * @return TcpState + */ + virtual TcpState *abort(const Monitor &monitor) override + { + // if we have already told user space that connection is gone + if (_finalized) return new TcpClosed(this); + + // object will be finalized now + _finalized = true; + + // close the connection + close(); + + // inform user space that the party is over + _handler->onError(_connection, "ssl shutdown aborted"); + + // go to the final state (if not yet disconnected) + return monitor.valid() ? new TcpClosed(this) : nullptr; + } }; /** diff --git a/src/linux_tcp/tcpclosed.h b/src/linux_tcp/tcpclosed.h index 71ca4ee..cf1b972 100644 --- a/src/linux_tcp/tcpclosed.h +++ b/src/linux_tcp/tcpclosed.h @@ -42,6 +42,17 @@ public: * Destructor */ virtual ~TcpClosed() noexcept = default; + + /** + * Abort the operation + * @param monitor Monitor that can be used to check if the tcp connection is still alive + * @return TcpState + */ + virtual TcpState *abort(const Monitor &monitor) override + { + // connection was closed and stays closed + return this; + } }; /** diff --git a/src/linux_tcp/tcpconnected.h b/src/linux_tcp/tcpconnected.h index 8273a85..ef011c5 100644 --- a/src/linux_tcp/tcpconnected.h +++ b/src/linux_tcp/tcpconnected.h @@ -278,6 +278,29 @@ public: // all has been sent return this; } + + /** + * Close the connection immediately + * @param monitor object to check if connection object is still active + * @return TcpState the new state + */ + virtual TcpState *abort(const Monitor &monitor) override + { + // if we have already told user space that connection is gone + if (_finalized) return new TcpClosed(this); + + // object will be finalized now + _finalized = true; + + // close the connection + close(); + + // inform user space that the party is over + _handler->onError(_connection, "tcp connection terminated"); + + // go to the final state (if not yet disconnected) + return monitor.valid() ? new TcpClosed(this) : nullptr; + } /** * Report that heartbeat negotiation is going on diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index e23d34e..ee21cd5 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -116,6 +116,52 @@ void TcpConnection::flush() } } +/** + * Close the connection + * @return bool + */ +bool TcpConnection::close(bool immediate) +{ + // if no immediate disconnect is needed, we can simply start the closing handshake + if (!immediate) return _connection.close(); + + // a call to user-space will be made, so we need to monitor if "this" is destructed + Monitor monitor(this); + + // pass to the underlying connection to start the amqp-closing handshake + _connection.close(); + + // if the user-space code destructed the connection, there is nothing else to do + if (!monitor.valid()) return true; + + // store the old state + auto *oldstate = _state.get(); + + // abort the operation + auto *newstate = _state->abort(monitor); + + // if the state did not change, we do not have to update a member, + // when the newstate is nullptr, the object is (being) destructed + // and we do not have to do anything else either + if (oldstate == newstate || newstate == nullptr) return true; + + // in a bizarre set of circumstances, the user may have implemented the + // handler in such a way that the connection object was destructed + if (!monitor.valid()) + { + // ok, user code is weird, connection object no longer exist, get rid of the state too + delete newstate; + } + else + { + // replace it with the new implementation + _state.reset(newstate); + } + + // done, we return true because the connection is closed + return true; +} + /** * Method that is called after the connection was constructed * @param connection The connection that was attached to the handler diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index abca375..dad3cc7 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -249,6 +249,26 @@ public: return proceed(monitor); } + /** + * Close the connection immediately + * @param monitor object to check if connection object is still active + * @return TcpState the new state + */ + virtual TcpState *abort(const Monitor &monitor) override + { + // just wait for the other thread to be ready + _thread.join(); + + // close the socket + if (_socket >= 0) ::close(_socket); + + // inform user space that the connection is cancelled + _handler->onError(_connection, "tcp connect aborted"); + + // go to the final state (if not yet disconnected) + return monitor.valid() ? new TcpClosed(this) : nullptr; + } + /** * Send data over the connection * @param buffer buffer to send diff --git a/src/linux_tcp/tcpstate.h b/src/linux_tcp/tcpstate.h index d83edc0..fd83d59 100644 --- a/src/linux_tcp/tcpstate.h +++ b/src/linux_tcp/tcpstate.h @@ -115,6 +115,13 @@ public: * @return TcpState New implementation object */ virtual TcpState *flush(const Monitor &monitor) { return this; } + + /** + * Abort the operation, immediately proceed to the final state + * @param monitor Monitor that can be used to check if the tcp connection is still alive + * @return TcpState New implementation object + */ + virtual TcpState *abort(const Monitor &monitor) = 0; /** * Report to the handler that the connection was constructed