diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index f7d3212..72b43df 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -5,7 +5,7 @@ * IO between the client application and the RabbitMQ server. * * @author Emiel Bruijntjes - * @copyright 2015 - 2016 Copernica BV + * @copyright 2015 - 2018 Copernica BV */ /** diff --git a/src/linux_tcp/openssl.cpp b/src/linux_tcp/openssl.cpp index 39bff69..234b2e7 100644 --- a/src/linux_tcp/openssl.cpp +++ b/src/linux_tcp/openssl.cpp @@ -211,6 +211,20 @@ int SSL_do_handshake(SSL *ssl) return func(ssl); } +/** + * Obtain shutdown statue for TLS/SSL I/O operation + * @param ssl SSL object + * @return int returns error values + */ +int SSL_get_shutdown(const SSL *ssl) +{ + // create a function + static Function func("SSL_get_shutdown"); + + // call the openssl function + return func(ssl); +} + /** * Obtain result code for TLS/SSL I/O operation * @param ssl SSL object diff --git a/src/linux_tcp/openssl.h b/src/linux_tcp/openssl.h index 387654d..bd67e5b 100644 --- a/src/linux_tcp/openssl.h +++ b/src/linux_tcp/openssl.h @@ -41,6 +41,7 @@ int SSL_read(SSL *ssl, void *buf, int num); int SSL_write(SSL *ssl, const void *buf, int num); int SSL_shutdown(SSL *ssl); int SSL_set_fd(SSL *ssl, int fd); +int SSL_get_shutdown(const SSL *ssl); int SSL_get_error(const SSL *ssl, int ret); int SSL_up_ref(SSL *ssl); void SSL_set_connect_state(SSL *ssl); diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index b6e5d21..5bbe81a 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -149,23 +149,54 @@ private: { // error was returned, so we must investigate what is going on auto error = OpenSSL::SSL_get_error(_ssl, result); - + + // create a monitor because the handler could make things ugly + Monitor monitor(this); + // check the error switch (error) { case SSL_ERROR_WANT_READ: // the operation must be repeated when readable _handler->monitor(_connection, _socket, readable); - return this; + + // allow chaining + return monitor.valid() ? this : nullptr; case SSL_ERROR_WANT_WRITE: // wait until socket becomes writable again _handler->monitor(_connection, _socket, readable | writable); - return this; + + // allow chaining + return monitor.valid() ? this : nullptr; + + case SSL_ERROR_NONE: + // turns out no error occured, an no action has to be rescheduled + _handler->monitor(_connection, _socket, _out ? readable | writable : readable); + + // we're ready for the next instruction from userspace + _state = state_idle; + + // allow chaining + return monitor.valid() ? this : nullptr; default: + // is the peer trying to shutdown? (we dont expect this) + bool shutdown = OpenSSL::SSL_get_shutdown(_ssl); - // @todo check how to handle this - return this; + // send back a nice shutdown + if (shutdown) OpenSSL::SSL_shutdown(_ssl); + + // tell the handler + _handler->onError(_connection, "ssl error"); + + // no need to chain if object is already destructed + if (!monitor) return nullptr; + + // create a new new object + //return shutdown ? + + // allow chaining + return nullptr; //monitor.valid() ? new TcpClosed(this) : nullptr; } } @@ -258,9 +289,6 @@ public: // the socket must be the one this connection writes to if (fd != _socket) return this; - // because the object might soon be destructed, we create a monitor to check this - Monitor monitor(this); - // are we busy with sending or receiving data? if (_state == state_sending) { @@ -288,14 +316,34 @@ public: /** * Flush the connection, sent all buffered data to the socket + * @param monitor Object to check if connection still exists * @return TcpState new tcp state */ - virtual TcpState *flush() override + virtual TcpState *flush(const Monitor &monitor) override { + // we are not going to do this is object is busy reading + if (_state == state_receiving) return this; + // create an object to wait for the filedescriptor to becomes active Wait wait(_socket); - // @todo implementation + // keep looping while we have an outgoing buffer + while (_out) + { + // try to send more data from the outgoing buffer + auto result = _out.sendto(_ssl); + + // go to the next state + auto *state = result > 0 ? proceed() : repeat(result); + + return state; + +// if (result > 0) return proceed(); +// +// // the operation failed, we may have to repeat our call +// else return repeat(result); + } + return this; } diff --git a/src/linux_tcp/sslhandshake.h b/src/linux_tcp/sslhandshake.h index 4095664..9a4be90 100644 --- a/src/linux_tcp/sslhandshake.h +++ b/src/linux_tcp/sslhandshake.h @@ -148,11 +148,12 @@ public: /** * Process the filedescriptor in the object + * @param monitor Object to check if connection still exists * @param fd Filedescriptor that is active * @param flags AMQP::readable and/or AMQP::writable * @return New state object */ - virtual TcpState *process(int fd, int flags) override + virtual TcpState *process(const Monitor &monitor, int fd, int flags) override { // must be the socket if (fd != _socket) return this; @@ -187,9 +188,10 @@ public: /** * Flush the connection, sent all buffered data to the socket + * @param monitor Object to check if connection still exists * @return TcpState new tcp state */ - virtual TcpState *flush() override + virtual TcpState *flush(const Monitor &monitor) override { // create an object to wait for the filedescriptor to becomes active Wait wait(_socket); diff --git a/src/linux_tcp/tcpconnected.h b/src/linux_tcp/tcpconnected.h index 4aeb8e8..bb0ac5d 100644 --- a/src/linux_tcp/tcpconnected.h +++ b/src/linux_tcp/tcpconnected.h @@ -128,18 +128,16 @@ public: /** * Process the filedescriptor in the object + * @param monitor Monitor to check if the object is still alive * @param fd Filedescriptor that is active * @param flags AMQP::readable and/or AMQP::writable * @return New state object */ - virtual TcpState *process(int fd, int flags) override + virtual TcpState *process(const Monitor &monitor, int fd, int flags) override { // must be the socket if (fd != _socket) return this; - // because the object might soon be destructed, we create a monitor to check this - Monitor monitor(this); - // can we write more data to the socket? if (flags & writable) { @@ -218,9 +216,10 @@ public: /** * Flush the connection, sent all buffered data to the socket + * @param monitor Object to check if connection still lives * @return TcpState new tcp state */ - virtual TcpState *flush() override + virtual TcpState *flush(const Monitor &monitor) override { // create an object to wait for the filedescriptor to becomes active Wait wait(_socket); @@ -232,7 +231,7 @@ public: if (!wait.writable()) return this; // socket is writable, send as much data as possible - auto *newstate = process(_socket, writable); + auto *newstate = process(monitor, _socket, writable); // are we done if (newstate != this) return newstate; diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index f0663c3..6aeefb8 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -4,7 +4,7 @@ * Implementation file for the TCP connection * * @author Emiel Bruijntjes - * @copyright 2015 - 2016 Copernica BV + * @copyright 2015 - 2018 Copernica BV */ /** @@ -12,6 +12,7 @@ */ #include "includes.h" #include "tcpresolver.h" +#include "tcpstate.h" /** * Set up namespace @@ -24,7 +25,7 @@ namespace AMQP { * @param hostname The address to connect to */ TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) : - _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), handler)), +// _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), handler)), _connection(this, address.login(), address.vhost()) {} /** @@ -48,11 +49,11 @@ int TcpConnection::fileno() const */ void TcpConnection::process(int fd, int flags) { - // monitor the object for destruction - Monitor monitor{ this }; + // monitor the object for destruction, because you never know what the user + Monitor monitor(this); // pass on the the state, that returns a new impl - auto *result = _state->process(fd, flags); + auto *result = _state->process(monitor, fd, flags); // are we still valid if (!monitor.valid()) return; @@ -78,7 +79,7 @@ void TcpConnection::flush() while (true) { // flush the object - auto *newstate = _state->flush(); + auto *newstate = _state->flush(monitor); // done if object no longer exists if (!monitor.valid()) return; @@ -136,7 +137,7 @@ void TcpConnection::onError(Connection *connection, const char *message) auto ptr = std::move(_state); // object is now in a closed state - _state.reset(new TcpClosed(ptr.get())); + //_state.reset(new TcpClosed(ptr.get())); // tell the implementation to report the error ptr->reportError(message); @@ -158,14 +159,8 @@ void TcpConnection::onConnected(Connection *connection) */ void TcpConnection::onClosed(Connection *connection) { - // current object is going to be removed, but we have to keep it for a while - auto ptr = std::move(_state); - - // object is now in a closed state - _state.reset(new TcpClosed(ptr.get())); - // tell the implementation to report that connection is closed now - ptr->reportClosed(); + _state->reportClosed(); } /** diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index ee24e2d..93399e3 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -212,11 +212,12 @@ public: /** * Wait for the resolver to be ready + * @param monitor Object to check if connection still exists * @param fd The filedescriptor that is active * @param flags Flags to indicate that fd is readable and/or writable * @return New implementation object */ - virtual TcpState *process(int fd, int flags) override + virtual TcpState *process(const Monitor &monitor, int fd, int flags) override { // only works if the incoming pipe is readable if (fd != _pipe.in() || !(flags & readable)) return this; @@ -227,9 +228,10 @@ public: /** * Flush state / wait for the connection to complete + * @param monitor Object to check if connection still exists * @return New implementation object */ - virtual TcpState *flush() override + virtual TcpState *flush(const Monitor &monitor) override { // just wait for the other thread to be ready _thread.join(); diff --git a/src/linux_tcp/tcpstate.h b/src/linux_tcp/tcpstate.h index 93ddc21..2b5dd3f 100644 --- a/src/linux_tcp/tcpstate.h +++ b/src/linux_tcp/tcpstate.h @@ -36,6 +36,22 @@ protected: TcpHandler *_handler; protected: + /** + * Helper function to reset the handler, and to return the old handler object + * @return TcpHandler* User-supplied handler that was just reset + */ + TcpHandler *reset(TcpHandler *handler) + { + // remember old handler + auto *oldhandler = _handler; + + // install the new handler + _handler = handler; + + // return the old handler + return oldhandler; + } + /** * Protected constructor * @param connection Original TCP connection object @@ -64,12 +80,19 @@ public: virtual int fileno() const { return -1; } /** - * Process the filedescriptor in the object + * Process the filedescriptor in the object + * + * This method should return the handler object that will be responsible for + * all future readable/writable events for the file descriptor, or nullptr + * if the underlying connection object has already been destructed by the + * user and it would be pointless to set up a new handler. + * + * @param monitor Monitor that can be used to check if the tcp connection is still alive * @param fd The filedescriptor that is active * @param flags AMQP::readable and/or AMQP::writable * @return New implementation object */ - virtual TcpState *process(int fd, int flags) + virtual TcpState *process(const Monitor &monitor, int fd, int flags) { // default implementation does nothing and preserves same implementation return this; @@ -77,8 +100,8 @@ public: /** * Send data over the connection - * @param buffer buffer to send - * @param size size of the buffer + * @param buffer Buffer to send + * @param size Size of the buffer */ virtual void send(const char *buffer, size_t size) { @@ -86,7 +109,25 @@ public: } /** - * Report that heartbeat negotiation is going on + * Flush the connection, all outgoing operations should be completed. + * + * If the state changes during the operation, the new state object should + * be returned instead, or nullptr if the user has closed the connection + * in the meantime. If the connection object got destructed by a user space + * call, this method should return nullptr. A monitor object is pass in to + * allow the flush() method to check if the connection still exists. + * + * If this object returns a new state object (instead of "this"), the + * connection object will immediately proceed with calling flush() on that + * new state object too. + * + * @param monitor Monitor that can be used to check if the tcp connection is still alive + * @return TcpState New implementation object + */ + virtual TcpState *flush(const Monitor &monitor) { return this; } + + /** + * Report to the handler that heartbeat negotiation is going on * @param heartbeat suggested heartbeat * @return uint16_t accepted heartbeat */ @@ -97,25 +138,25 @@ public: } /** - * Flush the connection - * @return TcpState new implementation object - */ - virtual TcpState *flush() { return this; } - - /** - * Report to the handler that the object is in an error state + * Report to the handler that the object is in an error state. + * + * This is the last method to be called on the handler object, from now on + * the handler will no longer be called to report things to user space. + * The state object itself stays active, and further calls to process() + * may be possible. + * * @param error */ - void reportError(const char *error) + virtual void reportError(const char *error) { // pass to handler - _handler->onError(_connection, error); + reset(nullptr)->onError(_connection, error); } /** * Report that a heartbeat frame was received */ - void reportHeartbeat() + virtual void reportHeartbeat() { // pass to handler _handler->onHeartbeat(_connection); @@ -124,19 +165,24 @@ public: /** * Report to the handler that the connection is ready for use */ - void reportConnected() + virtual void reportConnected() { // pass to handler _handler->onConnected(_connection); } /** - * Report to the handler that the connection was nicely closed + * Report to the handler that the connection was correctly closed, after + * the user has called the Connection::close() method. The underlying TCP + * connection still has to be closed. + * + * This is the last method that is called on the object, from now on no + * other methods may be called on the _handler variable. */ virtual void reportClosed() { // pass to handler - _handler->onClosed(_connection); + reset(nullptr)->onClosed(_connection); } };