From f23bcf19f15c348ad319af8473f8e4afaeddc601 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 8 Mar 2018 10:02:42 +0100 Subject: [PATCH] improved docblocks in tcpstate.h header file, when an error or closed is reported to user space, the _handler variable is now reset to prevent that it will be used to report more than once (we still need to check if this does not trigger other errors), and the state object is no longer destructed after a reportClosed() call, so that it can clean up nicely (which we need to the tls shutdown anyway) --- include/amqpcpp/linux_tcp/tcpconnection.h | 2 +- src/linux_tcp/openssl.cpp | 14 ++++ src/linux_tcp/openssl.h | 1 + src/linux_tcp/sslconnected.h | 68 ++++++++++++++++--- src/linux_tcp/sslhandshake.h | 6 +- src/linux_tcp/tcpconnected.h | 11 ++- src/linux_tcp/tcpconnection.cpp | 23 +++---- src/linux_tcp/tcpresolver.h | 6 +- src/linux_tcp/tcpstate.h | 82 ++++++++++++++++++----- 9 files changed, 160 insertions(+), 53 deletions(-) diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index f7d3212..72b43df 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -5,7 +5,7 @@ * IO between the client application and the RabbitMQ server. * * @author Emiel Bruijntjes - * @copyright 2015 - 2016 Copernica BV + * @copyright 2015 - 2018 Copernica BV */ /** diff --git a/src/linux_tcp/openssl.cpp b/src/linux_tcp/openssl.cpp index 39bff69..234b2e7 100644 --- a/src/linux_tcp/openssl.cpp +++ b/src/linux_tcp/openssl.cpp @@ -211,6 +211,20 @@ int SSL_do_handshake(SSL *ssl) return func(ssl); } +/** + * Obtain shutdown statue for TLS/SSL I/O operation + * @param ssl SSL object + * @return int returns error values + */ +int SSL_get_shutdown(const SSL *ssl) +{ + // create a function + static Function func("SSL_get_shutdown"); + + // call the openssl function + return func(ssl); +} + /** * Obtain result code for TLS/SSL I/O operation * @param ssl SSL object diff --git a/src/linux_tcp/openssl.h b/src/linux_tcp/openssl.h index 387654d..bd67e5b 100644 --- a/src/linux_tcp/openssl.h +++ b/src/linux_tcp/openssl.h @@ -41,6 +41,7 @@ int SSL_read(SSL *ssl, void *buf, int num); int SSL_write(SSL *ssl, const void *buf, int num); int SSL_shutdown(SSL *ssl); int SSL_set_fd(SSL *ssl, int fd); +int SSL_get_shutdown(const SSL *ssl); int SSL_get_error(const SSL *ssl, int ret); int SSL_up_ref(SSL *ssl); void SSL_set_connect_state(SSL *ssl); diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index b6e5d21..5bbe81a 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -149,23 +149,54 @@ private: { // error was returned, so we must investigate what is going on auto error = OpenSSL::SSL_get_error(_ssl, result); - + + // create a monitor because the handler could make things ugly + Monitor monitor(this); + // check the error switch (error) { case SSL_ERROR_WANT_READ: // the operation must be repeated when readable _handler->monitor(_connection, _socket, readable); - return this; + + // allow chaining + return monitor.valid() ? this : nullptr; case SSL_ERROR_WANT_WRITE: // wait until socket becomes writable again _handler->monitor(_connection, _socket, readable | writable); - return this; + + // allow chaining + return monitor.valid() ? this : nullptr; + + case SSL_ERROR_NONE: + // turns out no error occured, an no action has to be rescheduled + _handler->monitor(_connection, _socket, _out ? readable | writable : readable); + + // we're ready for the next instruction from userspace + _state = state_idle; + + // allow chaining + return monitor.valid() ? this : nullptr; default: + // is the peer trying to shutdown? (we dont expect this) + bool shutdown = OpenSSL::SSL_get_shutdown(_ssl); - // @todo check how to handle this - return this; + // send back a nice shutdown + if (shutdown) OpenSSL::SSL_shutdown(_ssl); + + // tell the handler + _handler->onError(_connection, "ssl error"); + + // no need to chain if object is already destructed + if (!monitor) return nullptr; + + // create a new new object + //return shutdown ? + + // allow chaining + return nullptr; //monitor.valid() ? new TcpClosed(this) : nullptr; } } @@ -258,9 +289,6 @@ public: // the socket must be the one this connection writes to if (fd != _socket) return this; - // because the object might soon be destructed, we create a monitor to check this - Monitor monitor(this); - // are we busy with sending or receiving data? if (_state == state_sending) { @@ -288,14 +316,34 @@ public: /** * Flush the connection, sent all buffered data to the socket + * @param monitor Object to check if connection still exists * @return TcpState new tcp state */ - virtual TcpState *flush() override + virtual TcpState *flush(const Monitor &monitor) override { + // we are not going to do this is object is busy reading + if (_state == state_receiving) return this; + // create an object to wait for the filedescriptor to becomes active Wait wait(_socket); - // @todo implementation + // keep looping while we have an outgoing buffer + while (_out) + { + // try to send more data from the outgoing buffer + auto result = _out.sendto(_ssl); + + // go to the next state + auto *state = result > 0 ? proceed() : repeat(result); + + return state; + +// if (result > 0) return proceed(); +// +// // the operation failed, we may have to repeat our call +// else return repeat(result); + } + return this; } diff --git a/src/linux_tcp/sslhandshake.h b/src/linux_tcp/sslhandshake.h index 4095664..9a4be90 100644 --- a/src/linux_tcp/sslhandshake.h +++ b/src/linux_tcp/sslhandshake.h @@ -148,11 +148,12 @@ public: /** * Process the filedescriptor in the object + * @param monitor Object to check if connection still exists * @param fd Filedescriptor that is active * @param flags AMQP::readable and/or AMQP::writable * @return New state object */ - virtual TcpState *process(int fd, int flags) override + virtual TcpState *process(const Monitor &monitor, int fd, int flags) override { // must be the socket if (fd != _socket) return this; @@ -187,9 +188,10 @@ public: /** * Flush the connection, sent all buffered data to the socket + * @param monitor Object to check if connection still exists * @return TcpState new tcp state */ - virtual TcpState *flush() override + virtual TcpState *flush(const Monitor &monitor) override { // create an object to wait for the filedescriptor to becomes active Wait wait(_socket); diff --git a/src/linux_tcp/tcpconnected.h b/src/linux_tcp/tcpconnected.h index 4aeb8e8..bb0ac5d 100644 --- a/src/linux_tcp/tcpconnected.h +++ b/src/linux_tcp/tcpconnected.h @@ -128,18 +128,16 @@ public: /** * Process the filedescriptor in the object + * @param monitor Monitor to check if the object is still alive * @param fd Filedescriptor that is active * @param flags AMQP::readable and/or AMQP::writable * @return New state object */ - virtual TcpState *process(int fd, int flags) override + virtual TcpState *process(const Monitor &monitor, int fd, int flags) override { // must be the socket if (fd != _socket) return this; - // because the object might soon be destructed, we create a monitor to check this - Monitor monitor(this); - // can we write more data to the socket? if (flags & writable) { @@ -218,9 +216,10 @@ public: /** * Flush the connection, sent all buffered data to the socket + * @param monitor Object to check if connection still lives * @return TcpState new tcp state */ - virtual TcpState *flush() override + virtual TcpState *flush(const Monitor &monitor) override { // create an object to wait for the filedescriptor to becomes active Wait wait(_socket); @@ -232,7 +231,7 @@ public: if (!wait.writable()) return this; // socket is writable, send as much data as possible - auto *newstate = process(_socket, writable); + auto *newstate = process(monitor, _socket, writable); // are we done if (newstate != this) return newstate; diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index f0663c3..6aeefb8 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -4,7 +4,7 @@ * Implementation file for the TCP connection * * @author Emiel Bruijntjes - * @copyright 2015 - 2016 Copernica BV + * @copyright 2015 - 2018 Copernica BV */ /** @@ -12,6 +12,7 @@ */ #include "includes.h" #include "tcpresolver.h" +#include "tcpstate.h" /** * Set up namespace @@ -24,7 +25,7 @@ namespace AMQP { * @param hostname The address to connect to */ TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) : - _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), handler)), +// _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), handler)), _connection(this, address.login(), address.vhost()) {} /** @@ -48,11 +49,11 @@ int TcpConnection::fileno() const */ void TcpConnection::process(int fd, int flags) { - // monitor the object for destruction - Monitor monitor{ this }; + // monitor the object for destruction, because you never know what the user + Monitor monitor(this); // pass on the the state, that returns a new impl - auto *result = _state->process(fd, flags); + auto *result = _state->process(monitor, fd, flags); // are we still valid if (!monitor.valid()) return; @@ -78,7 +79,7 @@ void TcpConnection::flush() while (true) { // flush the object - auto *newstate = _state->flush(); + auto *newstate = _state->flush(monitor); // done if object no longer exists if (!monitor.valid()) return; @@ -136,7 +137,7 @@ void TcpConnection::onError(Connection *connection, const char *message) auto ptr = std::move(_state); // object is now in a closed state - _state.reset(new TcpClosed(ptr.get())); + //_state.reset(new TcpClosed(ptr.get())); // tell the implementation to report the error ptr->reportError(message); @@ -158,14 +159,8 @@ void TcpConnection::onConnected(Connection *connection) */ void TcpConnection::onClosed(Connection *connection) { - // current object is going to be removed, but we have to keep it for a while - auto ptr = std::move(_state); - - // object is now in a closed state - _state.reset(new TcpClosed(ptr.get())); - // tell the implementation to report that connection is closed now - ptr->reportClosed(); + _state->reportClosed(); } /** diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index ee24e2d..93399e3 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -212,11 +212,12 @@ public: /** * Wait for the resolver to be ready + * @param monitor Object to check if connection still exists * @param fd The filedescriptor that is active * @param flags Flags to indicate that fd is readable and/or writable * @return New implementation object */ - virtual TcpState *process(int fd, int flags) override + virtual TcpState *process(const Monitor &monitor, int fd, int flags) override { // only works if the incoming pipe is readable if (fd != _pipe.in() || !(flags & readable)) return this; @@ -227,9 +228,10 @@ public: /** * Flush state / wait for the connection to complete + * @param monitor Object to check if connection still exists * @return New implementation object */ - virtual TcpState *flush() override + virtual TcpState *flush(const Monitor &monitor) override { // just wait for the other thread to be ready _thread.join(); diff --git a/src/linux_tcp/tcpstate.h b/src/linux_tcp/tcpstate.h index 93ddc21..2b5dd3f 100644 --- a/src/linux_tcp/tcpstate.h +++ b/src/linux_tcp/tcpstate.h @@ -36,6 +36,22 @@ protected: TcpHandler *_handler; protected: + /** + * Helper function to reset the handler, and to return the old handler object + * @return TcpHandler* User-supplied handler that was just reset + */ + TcpHandler *reset(TcpHandler *handler) + { + // remember old handler + auto *oldhandler = _handler; + + // install the new handler + _handler = handler; + + // return the old handler + return oldhandler; + } + /** * Protected constructor * @param connection Original TCP connection object @@ -64,12 +80,19 @@ public: virtual int fileno() const { return -1; } /** - * Process the filedescriptor in the object + * Process the filedescriptor in the object + * + * This method should return the handler object that will be responsible for + * all future readable/writable events for the file descriptor, or nullptr + * if the underlying connection object has already been destructed by the + * user and it would be pointless to set up a new handler. + * + * @param monitor Monitor that can be used to check if the tcp connection is still alive * @param fd The filedescriptor that is active * @param flags AMQP::readable and/or AMQP::writable * @return New implementation object */ - virtual TcpState *process(int fd, int flags) + virtual TcpState *process(const Monitor &monitor, int fd, int flags) { // default implementation does nothing and preserves same implementation return this; @@ -77,8 +100,8 @@ public: /** * Send data over the connection - * @param buffer buffer to send - * @param size size of the buffer + * @param buffer Buffer to send + * @param size Size of the buffer */ virtual void send(const char *buffer, size_t size) { @@ -86,7 +109,25 @@ public: } /** - * Report that heartbeat negotiation is going on + * Flush the connection, all outgoing operations should be completed. + * + * If the state changes during the operation, the new state object should + * be returned instead, or nullptr if the user has closed the connection + * in the meantime. If the connection object got destructed by a user space + * call, this method should return nullptr. A monitor object is pass in to + * allow the flush() method to check if the connection still exists. + * + * If this object returns a new state object (instead of "this"), the + * connection object will immediately proceed with calling flush() on that + * new state object too. + * + * @param monitor Monitor that can be used to check if the tcp connection is still alive + * @return TcpState New implementation object + */ + virtual TcpState *flush(const Monitor &monitor) { return this; } + + /** + * Report to the handler that heartbeat negotiation is going on * @param heartbeat suggested heartbeat * @return uint16_t accepted heartbeat */ @@ -97,25 +138,25 @@ public: } /** - * Flush the connection - * @return TcpState new implementation object - */ - virtual TcpState *flush() { return this; } - - /** - * Report to the handler that the object is in an error state + * Report to the handler that the object is in an error state. + * + * This is the last method to be called on the handler object, from now on + * the handler will no longer be called to report things to user space. + * The state object itself stays active, and further calls to process() + * may be possible. + * * @param error */ - void reportError(const char *error) + virtual void reportError(const char *error) { // pass to handler - _handler->onError(_connection, error); + reset(nullptr)->onError(_connection, error); } /** * Report that a heartbeat frame was received */ - void reportHeartbeat() + virtual void reportHeartbeat() { // pass to handler _handler->onHeartbeat(_connection); @@ -124,19 +165,24 @@ public: /** * Report to the handler that the connection is ready for use */ - void reportConnected() + virtual void reportConnected() { // pass to handler _handler->onConnected(_connection); } /** - * Report to the handler that the connection was nicely closed + * Report to the handler that the connection was correctly closed, after + * the user has called the Connection::close() method. The underlying TCP + * connection still has to be closed. + * + * This is the last method that is called on the object, from now on no + * other methods may be called on the _handler variable. */ virtual void reportClosed() { // pass to handler - _handler->onClosed(_connection); + reset(nullptr)->onClosed(_connection); } };