From 463eed89c0c3691db2978eecde688d7d11ceb198 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Mon, 5 Mar 2018 22:24:19 +0100 Subject: [PATCH] work in progress on closing ssl connections --- include/amqpcpp/linux_tcp/tcpconnection.h | 1 + include/amqpcpp/monitor.h | 20 ++- src/linux_tcp/tcpconnected.h | 7 +- src/linux_tcp/tcpsslclose.h | 161 ++++++++++++++++++++++ src/linux_tcp/tcpsslconnected.h | 135 ++++++++++++++---- 5 files changed, 293 insertions(+), 31 deletions(-) create mode 100644 src/linux_tcp/tcpsslclose.h diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index 681ecd5..a641576 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -100,6 +100,7 @@ private: /** * Classes that have access to private data */ + friend class TcpSslConnected; friend class TcpConnected; friend class TcpChannel; diff --git a/include/amqpcpp/monitor.h b/include/amqpcpp/monitor.h index 59564f9..efb788c 100644 --- a/include/amqpcpp/monitor.h +++ b/include/amqpcpp/monitor.h @@ -8,7 +8,7 @@ * case the connection object should stop further handling the data. This * monitor class is used to check if the connection has been destructed. * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -76,6 +76,24 @@ public: if (_watchable) _watchable->remove(this); } + /** + * Cast to boolean: is object in valid state? + * @return bool + */ + operator bool () const + { + return _watchable != nullptr; + } + + /** + * Negate operator: is the object in an invalid state? + * @return bool + */ + bool operator! () const + { + return _watchable == nullptr; + } + /** * Check if the object is valid * @return bool diff --git a/src/linux_tcp/tcpconnected.h b/src/linux_tcp/tcpconnected.h index c523183..f710a05 100644 --- a/src/linux_tcp/tcpconnected.h +++ b/src/linux_tcp/tcpconnected.h @@ -180,7 +180,10 @@ public: _in = std::move(buffer); // do we have to reallocate? - if (_reallocate) { _in.reallocate(_reallocate); _reallocate = 0; } + if (_reallocate) _in.reallocate(_reallocate); + + // we can remove the reallocate instruction + _reallocate = 0; } // keep same object @@ -246,7 +249,7 @@ public: */ virtual uint16_t reportNegotiate(uint16_t heartbeat) override { - // remember that we have to reallocated (_in member can not be accessed because it is moved away) + // remember that we have to reallocate (_in member can not be accessed because it is moved away) _reallocate = _connection->maxFrame(); // pass to base diff --git a/src/linux_tcp/tcpsslclose.h b/src/linux_tcp/tcpsslclose.h new file mode 100644 index 0000000..e996c63 --- /dev/null +++ b/src/linux_tcp/tcpsslclose.h @@ -0,0 +1,161 @@ +/** + * TcpSslClose.h + * + * Class that takes care of the final handshake to close a SSL connection + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpSslClose : public TcpState, private Watchable +{ +private: + /** + * The SSL context + * @var TcpSsl + */ + TcpSsl _ssl; + + /** + * Socket file descriptor + * @var int + */ + int _socket; + + + /** + * Proceed with the next operation after the previous operation was + * a success, possibly changing the filedescriptor-monitor + * @return TcpState* + */ + TcpState *proceed() + { + // construct monitor to prevent that we access members if object is destructed + Monitor monitor(this); + + // we're no longer interested in events + _handler->monitor(_connection, _socket, 0); + + // stop if object was destructed + if (!monitor) return nullptr; + + // close the socket + close(_socket); + + // forget the socket + _socket = -1; + + // go to the closed state + return new TcpClosed(_connection, _handler); + } + + /** + * Method to repeat the previous call + * @param result result of an earlier openssl operation + * @return TcpState* + */ + TcpState *repeat(int result) + { + // error was returned, so we must investigate what is going on + auto error = SSL_get_error(_ssl, result); + + // check the error + switch (error) { + case SSL_ERROR_WANT_READ: + // the operation must be repeated when readable + _handler->monitor(_connection, _socket, readable); + return this; + + case SSL_ERROR_WANT_WRITE: + // wait until socket becomes writable again + _handler->monitor(_connection, _socket, readable | writable); + return this; + + default: + + // @todo check how to handle this + return this; + } + } + + +public: + /** + * Constructor + * @param connection Parent TCP connection object + * @param socket The socket filedescriptor + * @param ssl The SSL structure + * @param handler User-supplied handler object + */ + TcpSslClose(TcpConnection *connection, int socket, const TcpSsl &ssl, TcpHandler *handler) : + TcpState(connection, handler), + _ssl(ssl), + _socket(socket) + { + // tell the handler to monitor the socket if there is an out + _handler->monitor(_connection, _socket, readable); + } + + /** + * Destructor + */ + virtual ~TcpSslClose() noexcept + { + // skip if handler is already forgotten + if (_handler == nullptr) return; + + // we no longer have to monitor the socket + _handler->monitor(_connection, _socket, 0); + + // close the socket + close(_socket); + } + + /** + * The filedescriptor of this connection + * @return int + */ + virtual int fileno() const override { return _socket; } + + /** + * Process the filedescriptor in the object + * @param fd The filedescriptor that is active + * @param flags AMQP::readable and/or AMQP::writable + * @return New implementation object + */ + virtual TcpState *process(int fd, int flags) + { + // the socket must be the one this connection writes to + if (fd != _socket) return this; + + // because the object might soon be destructed, we create a monitor to check this + Monitor monitor(this); + + // close the connection + auto result = SSL_shutdown(_ssl); + + // if this is a success, we can proceed with the event loop + if (result > 0) return proceed(); + + // the operation failed, we may have to repeat our call + else return repeat(result); + } +}; + +/** + * End of namespace + */ +} \ No newline at end of file diff --git a/src/linux_tcp/tcpsslconnected.h b/src/linux_tcp/tcpsslconnected.h index f52c6d0..8ed4b55 100644 --- a/src/linux_tcp/tcpsslconnected.h +++ b/src/linux_tcp/tcpsslconnected.h @@ -17,7 +17,8 @@ #include "tcpoutbuffer.h" #include "tcpinbuffer.h" #include "wait.h" -#include +#include "tcpssl.h" +#include "tcpsslclose.h" /** * Set up namespace @@ -27,14 +28,14 @@ namespace AMQP { /** * Class definition */ -class TcpSslConnected: public TcpState, private Watchable +class TcpSslConnected : public TcpState, private Watchable { private: /** * The SSL context - * @var SSL* + * @var TcpSsl */ - SSL *_ssl; + TcpSsl _ssl; /** * Socket file descriptor @@ -63,6 +64,18 @@ private: state_sending, state_receiving } _state; + + /** + * Is the object already closed? + * @var bool + */ + bool _closed = false; + + /** + * Cached reallocation instruction + * @var size_t + */ + size_t _reallocate = 0; /** @@ -71,9 +84,6 @@ private: */ bool reportError() { - // some errors are ok and do not (necessarily) mean that we're disconnected - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false; - // we have an error - report this to the user _handler->onError(_connection, strerror(errno)); @@ -94,7 +104,8 @@ private: } /** - * Proceed with the previous operation, possibly changing the monitor + * Proceed with the next operation after the previous operation was + * a success, possibly changing the filedescriptor-monitor * @return TcpState* */ TcpState *proceed() @@ -106,8 +117,16 @@ private: _state = state_sending; // let's wait until the socket becomes writable - _handler->monitor(_connection, _socket, readable); + _handler->monitor(_connection, _socket, readable | writable); } + else if (_closed) + { + // we forget the current handler to prevent that things are changed + _handler = nullptr; + + // start the state that closes the connection + return new TcpSslClose(_connection, _socket, _ssl, _handler); + } else { // outgoing buffer is empty, we're idle again waiting for further input @@ -140,7 +159,7 @@ private: case SSL_ERROR_WANT_WRITE: // wait until socket becomes writable again - _handler->monitor(_connection, _socket, writable); + _handler->monitor(_connection, _socket, readable | writable); return this; default: @@ -149,18 +168,53 @@ private: return this; } } - + + /** + * Parse the received buffer + * @param size + * @return TcpState + */ + TcpState *parse(size_t size) + { + // we need a local copy of the buffer - because it is possible that "this" + // object gets destructed halfway through the call to the parse() method + TcpInBuffer buffer(std::move(_in)); + + // because the object might soon be destructed, we create a monitor to check this + Monitor monitor(this); + + // parse the buffer + auto processed = _connection->parse(buffer); + + // "this" could be removed by now, check this + if (!monitor.valid()) return nullptr; + + // shrink buffer + buffer.shrink(processed); + + // restore the buffer as member + _in = std::move(buffer); + + // do we have to reallocate? + if (_reallocate) _in.reallocate(_reallocate); + + // we can remove the reallocate instruction + _reallocate = 0; + + // done + return this; + } public: /** - * Constructor - * @param connection Parent TCP connection object - * @param socket The socket filedescriptor - * @param ssl The SSL structure - * @param buffer The buffer that was already built - * @param handler User-supplied handler object + * Constructor + * @param connection Parent TCP connection object + * @param socket The socket filedescriptor + * @param ssl The SSL structure + * @param buffer The buffer that was already built + * @param handler User-supplied handler object */ - TcpSslConnected(TcpConnection *connection, int socket, SSL *ssl, TcpOutBuffer &&buffer, TcpHandler *handler) : + TcpSslConnected(TcpConnection *connection, int socket, const TcpSsl &ssl, TcpOutBuffer &&buffer, TcpHandler *handler) : TcpState(connection, handler), _ssl(ssl), _socket(socket), @@ -213,7 +267,7 @@ public: // try to send more data from the outgoing buffer auto result = _out.sendto(_ssl); - // if this is a success, we may have to update the monitor + // if this is a success, we can proceed with the event loop if (result > 0) return proceed(); // the operation failed, we may have to repeat our call @@ -225,18 +279,11 @@ public: auto result = _in.receivefrom(_ssl, _connection->expected()); // if this is a success, we may have to update the monitor - // @todo also parse the buffer - if (result > 0) return proceed(); + if (result > 0) return parse(result); // the operation failed, we may have to repeat our call else return repeat(result); - - // we're busy with receiving data - // @todo check this } - - // keep same object - return this; } /** @@ -253,8 +300,40 @@ public: // for that operation to complete before we can move on if (_state != state_idle) return; + // object is now busy sending + _state = state_sending; + // let's wait until the socket becomes writable - _handler->monitor(_connection, _socket, writable); + _handler->monitor(_connection, _socket, readable | writable); + } + + /** + * Report that heartbeat negotiation is going on + * @param heartbeat suggested heartbeat + * @return uint16_t accepted heartbeat + */ + virtual uint16_t reportNegotiate(uint16_t heartbeat) override + { + // remember that we have to reallocate (_in member can not be accessed because it is moved away) + _reallocate = _connection->maxFrame(); + + // pass to base + return TcpState::reportNegotiate(heartbeat); + } + + /** + * Report to the handler that the connection was nicely closed + */ + virtual void reportClosed() override + { + // remember that the object is closed + _closed = true; + + // if the previous operation is still in progress + if (_state != state_idle) return; + + // wait until the connection is writable + _handler->monitor(_connection, _socket, writable); } };