diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index 6ab898e..9e29b79 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -217,14 +217,6 @@ public: */ void process(int fd, int flags); - /** - * Flush the connection - all unsent bytes are sent to the socket rigth away - * This is a blocking operation. The connection object normally only sends data - * when the socket is known to be writable, but with this method you can force - * the outgoing buffer to be fushed - */ - void flush(); - /** * Close the connection in an elegant fashion. This closes all channels and the * TCP connection. Note that the connection is not immediately closed: first all diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index fd2a357..0269b21 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -142,9 +142,11 @@ private: return monitor.valid() ? this : nullptr; default: + // report an error to user-space + _parent->onError(this, "ssl protocol error"); // ssl level error, we have to tear down the tcp connection - return monitor.valid() ? new TcpShutdown(this) : nullptr; + return monitor.valid() ? new TcpClosed(this) : nullptr; } } @@ -343,64 +345,6 @@ public: return proceed(); } - /** - * Flush the connection, sent all buffered data to the socket - * @param monitor Object to check if connection still exists - * @return TcpState new tcp state - */ - virtual TcpState *flush(const Monitor &monitor) override - { - // we are not going to do this if object is busy reading - if (_state == state_receiving) return this; - - // create an object to wait for the filedescriptor to becomes active - Poll poll(_socket); - - // we are going to check for errors after the openssl operations, so we make - // sure that the error queue is currently completely empty - OpenSSL::ERR_clear_error(); - - // keep looping while we have an outgoing buffer - while (_out) - { - // move to the idle-state - _state = state_idle; - - // try to send more data from the outgoing buffer - auto result = _out.sendto(_ssl); - - // was this a success? - if (result > 0) - { - // proceed to the next state - auto *nextstate = proceed(); - - // leap out if we move to a different state - if (nextstate != this) return nextstate; - } - else - { - // error was returned, so we must investigate what is going on - auto error = OpenSSL::SSL_get_error(_ssl, result); - - // get the next state given the error - auto *nextstate = repeat(monitor, state_sending, error); - - // leap out if we move to a different state - if (nextstate != this) return nextstate; - - // check the type of error, and wait now - switch (error) { - case SSL_ERROR_WANT_READ: poll.readable(true); break; - case SSL_ERROR_WANT_WRITE: poll.active(true); break; - } - } - } - - // done - return this; - } - /** * Send data over the connection * @param buffer buffer to send diff --git a/src/linux_tcp/sslhandshake.h b/src/linux_tcp/sslhandshake.h index 1e8e772..7746c90 100644 --- a/src/linux_tcp/sslhandshake.h +++ b/src/linux_tcp/sslhandshake.h @@ -85,7 +85,7 @@ private: if (!monitor.valid()) return nullptr; // done, shutdown the tcp connection - return new TcpShutdown(this); + return new TcpClosed(this); } /** @@ -186,42 +186,6 @@ public: // the handshake is still busy, outgoing data must be cached _out.add(buffer, size); } - - /** - * Flush the connection, sent all buffered data to the socket - * @param monitor Object to check if connection still exists - * @return TcpState new tcp state - */ - virtual TcpState *flush(const Monitor &monitor) override - { - // create an object to wait for the filedescriptor to becomes active - Poll poll(_socket); - - // keep looping - while (true) - { - // start the ssl handshake - int result = OpenSSL::SSL_do_handshake(_ssl); - - // if the connection succeeds, we can move to the ssl-connected state - if (result == 1) return nextstate(monitor); - - // error was returned, so we must investigate what is going on - auto error = OpenSSL::SSL_get_error(_ssl, result); - - // check the error - switch (error) { - - // if openssl reports that socket readability or writability is needed, - // we wait for that until this situation is reached - case SSL_ERROR_WANT_READ: poll.readable(true); break; - case SSL_ERROR_WANT_WRITE: poll.active(true); break; - - // something is wrong, we proceed to the next state - default: return reportError(monitor); - } - } - } }; /** diff --git a/src/linux_tcp/sslshutdown.h b/src/linux_tcp/sslshutdown.h index 1163d0c..7de70db 100644 --- a/src/linux_tcp/sslshutdown.h +++ b/src/linux_tcp/sslshutdown.h @@ -38,8 +38,8 @@ private: */ virtual TcpState *proceed(const Monitor &monitor) { - // next state is to shutdown the connection - return new TcpShutdown(this); + // next state is to close the connection + return new TcpClosed(this); } /** @@ -125,47 +125,6 @@ public: // the operation failed, we may have to repeat our call else return repeat(monitor, result); } - - /** - * Flush the connection, sent all buffered data to the socket - * @param monitor Object to check if connection still exists - * @return TcpState new tcp state - */ - virtual TcpState *flush(const Monitor &monitor) override - { - // @todo do we even need this? isn't flushing reserved for data? - - // create an object to wait for the filedescriptor to becomes active - Poll poll(_socket); - - // keep looping - while (true) - { - // close the connection - auto result = OpenSSL::SSL_shutdown(_ssl); - - // on result==0 we need an additional call - while (result == 0) result = OpenSSL::SSL_shutdown(_ssl); - - // if this is a success, we can proceed with the event loop - if (result > 0) return proceed(monitor); - - // error was returned, so we must investigate what is going on - auto error = OpenSSL::SSL_get_error(_ssl, result); - - // check the error - switch (error) { - - // if openssl reports that socket readability or writability is needed, - // we wait for that until this situation is reached - case SSL_ERROR_WANT_READ: poll.readable(true); break; - case SSL_ERROR_WANT_WRITE: poll.active(true); break; - - // something is wrong, we proceed to the next state - default: return proceed(monitor); - } - } - } }; /** diff --git a/src/linux_tcp/tcpconnected.h b/src/linux_tcp/tcpconnected.h index c7a0ebc..5244b32 100644 --- a/src/linux_tcp/tcpconnected.h +++ b/src/linux_tcp/tcpconnected.h @@ -18,7 +18,7 @@ */ #include "tcpoutbuffer.h" #include "tcpinbuffer.h" -#include "tcpshutdown.h" +#include "tcpextstate.h" #include "poll.h" /** @@ -51,10 +51,10 @@ private: size_t _reallocate = 0; /** - * Have we already made the last report to the user (about an error or closed connection?) + * Did the user ask to elegantly close the connection? * @var bool */ - bool _finalized = false; + bool _closed = false; /** @@ -66,19 +66,19 @@ private: // some errors are ok and do not (necessarily) mean that we're disconnected if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false; - // tell the parent that it failed - _parent->onError(this, "connection lost"); + // tell the parent that it failed (but not if the connection was elegantly closed) + if (!_closed) _parent->onError(this, "connection lost"); // done return true; } /** - * Construct the shutdown state + * Construct the final state * @param monitor Object that monitors whether connection still exists * @return TcpState* */ - TcpState *nextState(const Monitor &monitor) + TcpState *finalState(const Monitor &monitor) { // if the object is still in a valid state, we can treat the connection // as closed otherwise there is no point in moving to a next state @@ -133,7 +133,7 @@ public: auto result = _out.sendto(_socket); // are we in an error state? - if (result < 0 && reportError()) return nextState(monitor); + if (result < 0 && reportError()) return finalState(monitor); // if buffer is empty by now, we no longer have to check for // writability, but only for readability @@ -147,7 +147,7 @@ public: ssize_t result = _in.receivefrom(_socket, _parent->expected()); // are we in an error state? - if (result < 0 && reportError()) return nextState(monitor); + if (result < 0 && reportError()) return finalState(monitor); // @todo should we also check for result == 0 @@ -204,43 +204,28 @@ public: _parent->onIdle(this, _socket, readable | writable); } - /** - * Flush the connection, sent all buffered data to the socket - * @param monitor Object to check if connection still lives - * @return TcpState new tcp state - */ - virtual TcpState *flush(const Monitor &monitor) override - { - // create an object to wait for the filedescriptor to becomes active - Poll poll(_socket); - - // keep running until the out buffer is not empty - while (_out) - { - // poll the socket, is it already writable? - if (!poll.writable(true)) return this; - - // socket is writable, send as much data as possible - auto *newstate = process(monitor, _socket, writable); - - // are we done - if (newstate != this) return newstate; - } - - // all has been sent - return this; - } - /** * Gracefully close the connection * @return TcpState The next state */ virtual TcpState *close() override { - // @todo what if we're still busy receiving data? + // do nothing if already closed + if (_closed) return this; + + // remember that the connection is closed + _closed = true; + + // we will shutdown the socket in a very elegant way, we notify the peer + // that we will not be sending out more write operations + shutdown(_socket, SHUT_WR); + + // we still monitor the socket for readability to see if our close call was + // confirmed by the peer + _parent->onIdle(this, _socket, readable); // start the tcp shutdown - return new TcpShutdown(this); + return this; } /** diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index cfa9db9..c4d878d 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -111,37 +111,6 @@ void TcpConnection::process(int fd, int flags) if (!assign(monitor, newstate)) delete newstate; } -/** - * Flush the tcp connection - */ -void TcpConnection::flush() -{ - // monitor the object for destruction - Monitor monitor(this); - - // keep looping - while (true) - { - // get the old state - auto *oldstate = _state.get(); - - // flush the object - auto *newstate = _state->flush(monitor); - - // done if object no longer exists - if (newstate == nullptr || newstate == oldstate || !monitor.valid()) return; - - // replace the new state - if (assign(monitor, newstate)) continue; - - // the "this" object was destructed - delete newstate; - - // leap out because there is nothing left to do - return; - } -} - /** * Close the connection * @return bool diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index f127bbb..b3a3a51 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -234,20 +234,6 @@ public: return proceed(monitor); } - /** - * Flush state / wait for the connection to complete - * @param monitor Object to check if connection still exists - * @return New implementation object - */ - virtual TcpState *flush(const Monitor &monitor) override - { - // just wait for the other thread to be ready - _thread.join(); - - // proceed to the next state - return proceed(monitor); - } - /** * Send data over the connection * @param buffer buffer to send diff --git a/src/linux_tcp/tcpshutdown.h b/src/linux_tcp/tcpshutdown.h deleted file mode 100644 index 5ad1250..0000000 --- a/src/linux_tcp/tcpshutdown.h +++ /dev/null @@ -1,110 +0,0 @@ -/** - * TcpShutdown.h - * - * State in the TCP handshake that is responsible for gracefully - * shutting down the connection by closing our side of the connection, - * and waiting for the server to close the connection on the other - * side too. - * - * @author Emiel Bruijntjes - * @copyright 2018 Copernica BV - */ - -/** - * Include guard - */ -#pragma once - -/** - * Dependencies - */ -#include "tcpextstate.h" - -/** - * Begin of namespace - */ -namespace AMQP { - -/** - * Class definition - */ -class TcpShutdown : public TcpExtState -{ -public: - /** - * Constructor - * @param state The previous state - */ - TcpShutdown(TcpExtState *state) : TcpExtState(state) - { - // we will shutdown the socket in a very elegant way, we notify the peer - // that we will not be sending out more write operations - shutdown(_socket, SHUT_WR); - - // we still monitor the socket for readability to see if our close call was - // confirmed by the peer - _parent->onIdle(this, _socket, readable); - } - - /** - * Forbidden to copy - * @param that - */ - TcpShutdown(const TcpShutdown &that) = delete; - - /** - * Destructor - */ - virtual ~TcpShutdown() = default; - - /** - * Process the filedescriptor in the object - * @param monitor Monitor that can be used to check if the tcp connection is still alive - * @param fd The filedescriptor that is active - * @param flags AMQP::readable and/or AMQP::writable - * @return New implementation object - */ - virtual TcpState *process(const Monitor &monitor, int fd, int flags) - { - // must be the right filedescriptor - if (_socket != fd) return this; - - // if the socket is not readable, we do not have to check anything - if (!(flags & readable)) return this; - - // buffer to read data in - char buffer[64]; - - // read in data (we only do this to discover if the connection is really closed) - auto result = read(_socket, buffer, sizeof(buffer)); - - // if we read something, we keep on reading - if (result > 0) return this; - - // or should we retry? - if (result < 0 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) return this; - - // flush the connection to close the connection and report to the user - return flush(monitor); - } - - /** - * Flush the connection, make sure all network operations are finished - * @param monitor Object to check if connection still exists - * @return TcpState New state - */ - virtual TcpState *flush(const Monitor &monitor) override - { - // immediately close the socket - cleanup(); - - // move to next state - return monitor.valid() ? new TcpClosed(this) : nullptr; - } -}; - -/** - * End of namespace - */ -} - diff --git a/src/linux_tcp/tcpstate.h b/src/linux_tcp/tcpstate.h index c372b33..3a328f0 100644 --- a/src/linux_tcp/tcpstate.h +++ b/src/linux_tcp/tcpstate.h @@ -98,24 +98,6 @@ public: */ virtual TcpState *close() { return this; } - /** - * Flush the connection, all outgoing operations should be completed. - * - * If the state changes during the operation, the new state object should - * be returned instead, or nullptr if the user has closed the connection - * in the meantime. If the connection object got destructed by a user space - * call, this method should return nullptr. A monitor object is pass in to - * allow the flush() method to check if the connection still exists. - * - * If this object returns a new state object (instead of "this"), the - * connection object will immediately proceed with calling flush() on that - * new state object too. - * - * @param monitor Monitor that can be used to check if the tcp connection is still alive - * @return TcpState New implementation object - */ - virtual TcpState *flush(const Monitor &monitor) { return this; } - /** * Install max-frame size * @param heartbeat suggested heartbeat