diff --git a/include/amqpcpp/connection.h b/include/amqpcpp/connection.h index 5fc1389..ca46456 100644 --- a/include/amqpcpp/connection.h +++ b/include/amqpcpp/connection.h @@ -1,7 +1,7 @@ /** * Class describing a mid-level Amqp connection * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -147,6 +147,27 @@ public: return _implementation.parse(buffer); } + /** + * Report that the connection was lost in the middle of an operation + * + * The AMQP protocol normally has a nice closing handshake, and a connection + * is elegantly closed via calls to the close() and parse() methods. The parse() + * methods recognizes the close-confirmation and will report this to the handler. + * However, if you notice yourself that the connection is lost in the middle of + * an operation (for example due to a crashing RabbitMQ server), you should + * explicitly tell the connection object about it, so that it can cancel all + * pending operations. For all pending operations the error and finalize callbacks + * will be called. The ConnectionHandler::onError() method will however _not_ be + * called. + * + * @param message the message that has to be passed to all error handlers + * @return bool + */ + bool fail(const char *message) + { + return _implementation.fail(message); + } + /** * Max frame size * diff --git a/include/amqpcpp/connectionimpl.h b/include/amqpcpp/connectionimpl.h index 3c805a9..eede09c 100644 --- a/include/amqpcpp/connectionimpl.h +++ b/include/amqpcpp/connectionimpl.h @@ -5,7 +5,7 @@ * constructed by the connection class itselves and that has all sorts of * methods that are only useful inside the library * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -70,7 +70,8 @@ protected: } _state = state_protocol; /** - * Has the close() method been called? + * Has the close() method been called? If this is true, we automatically + * send a close-frame after all pending operations are finsihed. * @var bool */ bool _closed = false; @@ -148,6 +149,14 @@ protected: * @return bool */ bool waiting() const; + + /** + * Helper method for the fail() method + * @param monitor + * @param message + * @return bool + */ + bool fail(const Monitor &monitor, const char *message); private: /** @@ -326,9 +335,18 @@ public: */ uint64_t parse(const Buffer &buffer); + /** + * Fail all pending - this can be called by user-space when it is recognized that the + * underlying connection is lost. All error-handlers for all operations and open + * channels will be called. This will _not_ call ConnectionHandler::onError() method. + * + * @return bool + */ + bool fail(const char *message); + /** * Close the connection - * This will close all channels + * This will also close all channels * @return bool */ bool close(); @@ -370,27 +388,7 @@ public: * Report an error message * @param message */ - void reportError(const char *message) - { - // set connection state to closed - _state = state_closed; - - // monitor because every callback could invalidate the connection - Monitor monitor(this); - - // all deferred result objects in the channels should report this error too - while (!_channels.empty()) - { - // report the errors - _channels.begin()->second->reportError(message, false); - - // leap out if no longer valid - if (!monitor.valid()) return; - } - - // inform handler - _handler->onError(_parent, message); - } + void reportError(const char *message); /** * Report that the connection is closed diff --git a/include/amqpcpp/linux_tcp.h b/include/amqpcpp/linux_tcp.h index c8b0d70..fb01e70 100644 --- a/include/amqpcpp/linux_tcp.h +++ b/include/amqpcpp/linux_tcp.h @@ -1,3 +1,4 @@ +#include "linux_tcp/tcpparent.h" #include "linux_tcp/tcphandler.h" #include "linux_tcp/tcpconnection.h" #include "linux_tcp/tcpchannel.h" diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index 777eb12..68d5766 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -28,13 +28,22 @@ class TcpState; */ class TcpConnection : private ConnectionHandler, - private Watchable + private Watchable, + private TcpParent { private: + /** + * User-space handler object + * @var TcpHandler + */ + TcpHandler *_handler; + /** * The state of the TCP connection - this state objecs changes based on * the state of the connection (resolving, connected or closed) * @var std::unique_ptr + * + * @todo why is this a shared pointer? */ std::shared_ptr _state; @@ -48,13 +57,21 @@ private: * Method that is called after the connection was constructed * @param connection The connection that was attached to the handler */ - virtual void onAttached(Connection *connection) override; + virtual void onAttached(Connection *connection) override + { + // pass on to the handler + _handler->onAttached(this); + } /** * Method that is called when the connection is destructed * @param connection The connection that was detached from the handler */ - virtual void onDetached(Connection *connection) override; + virtual void onDetached(Connection *connection) override + { + // pass on to the handler + _handler->onDetached(this); + } /** * Method that is called when the heartbeat frequency is negotiated. @@ -76,7 +93,11 @@ private: * Method that is called when the server sends a heartbeat to the client * @param connection The connection over which the heartbeat was received */ - virtual void onHeartbeat(Connection *connection) override; + virtual void onHeartbeat(Connection *connection) override + { + // pass on to tcp handler + _handler->onHeartbeat(this); + } /** * Method called when the connection ends up in an error state @@ -89,7 +110,13 @@ private: * Method that is called when the connection is established * @param connection The connection that can now be used */ - virtual void onConnected(Connection *connection) override; + virtual void onConnected(Connection *connection) override + { + // @todo we may need this, because from this moment on we can pass an onClosed() + + // pass on to the handler + _handler->onConnected(this); + } /** * Method that is called when the connection was closed. @@ -98,23 +125,57 @@ private: virtual void onClosed(Connection *connection) override; /** - * Parse a buffer that was received + * Method to be called when data was received + * @param state * @param buffer + * @return size_t */ - uint64_t parse(const Buffer &buffer) + virtual size_t onReceived(TcpState *state, const Buffer &buffer) override { - // pass to the AMQP connection + // pass on to the connection return _connection.parse(buffer); } + + /** + * Method that is called when the connection is secured + * @param state + * @param ssl + * @return bool + */ + virtual bool onSecured(TcpState *state, const SSL *ssl) override + { + // pass on to user-space + return _handler->onSecured(this, ssl); + } + + /** + * Method to be called when we need to monitor a different filedescriptor + * @param state + * @param fd + * @param events + */ + virtual void onIdle(TcpState *state, int socket, int events) override + { + // pass on to user-space + return _handler->monitor(this, socket, events); + } /** - * Classes that have access to private data + * Method to be called when it is detected that the connection was closed + * @param state */ - friend class SslConnected; - friend class TcpConnected; - friend class TcpChannel; - + virtual void onClosed(TcpState *state) override; + /** + * The expected number of bytes + * @return size_t + */ + virtual size_t expected() override + { + // pass on to the connection + return _connection.expected(); + } + public: /** * Constructor diff --git a/include/amqpcpp/linux_tcp/tcphandler.h b/include/amqpcpp/linux_tcp/tcphandler.h index 6190e1e..7066924 100644 --- a/include/amqpcpp/linux_tcp/tcphandler.h +++ b/include/amqpcpp/linux_tcp/tcphandler.h @@ -14,11 +14,6 @@ */ #pragma once -/** - * Dependencies - */ -#include - /** * Set up namespace */ @@ -82,6 +77,10 @@ public: */ virtual bool onSecured(TcpConnection *connection, const SSL *ssl) { + // make sure compilers dont complain about unused parameters + (void) connection; + (void) ssl; + // default implementation: do not inspect anything, just allow the connection return true; } @@ -99,6 +98,10 @@ public: */ virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) { + // make sure compilers dont complain about unused parameters + (void) connection; + (void) interval; + // default implementation, suggested heartbeat is ok return interval; } @@ -109,7 +112,11 @@ public: * secure TLS connection, and the AMQP login handshake has been completed. * @param connection The TCP connection */ - virtual void onConnected(TcpConnection *connection) {} + virtual void onConnected(TcpConnection *connection) + { + // make sure compilers dont complain about unused parameters + (void) connection; + } /** * Method that is called when the server sends a heartbeat to the client @@ -117,20 +124,33 @@ public: * * @see ConnectionHandler::onHeartbeat */ - virtual void onHeartbeat(TcpConnection *connection) {} + virtual void onHeartbeat(TcpConnection *connection) + { + // make sure compilers dont complain about unused parameters + (void) connection; + } /** * Method that is called when the TCP connection ends up in an error state * @param connection The TCP connection * @param message Error message */ - virtual void onError(TcpConnection *connection, const char *message) {} + virtual void onError(TcpConnection *connection, const char *message) + { + // make sure compilers dont complain about unused parameters + (void) connection; + (void) message; + } /** * Method that is called when the TCP connection is closed * @param connection The TCP connection */ - virtual void onClosed(TcpConnection *connection) {} + virtual void onClosed(TcpConnection *connection) + { + // make sure compilers dont complain about unused parameters + (void) connection; + } /** * Monitor a filedescriptor for readability or writability diff --git a/include/amqpcpp/linux_tcp/tcpparent.h b/include/amqpcpp/linux_tcp/tcpparent.h new file mode 100644 index 0000000..968bd4e --- /dev/null +++ b/include/amqpcpp/linux_tcp/tcpparent.h @@ -0,0 +1,90 @@ +/** + * TcpParent.h + * + * Interface to be implemented by the parent of a tcp-state. This is + * an _internal_ interface that is not relevant for user-space applications. + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Forward declarations + */ +class TcpState; +class Buffer; + +/** + * Class definition + */ +class TcpParent +{ +public: + /** + * Destructor + */ + virtual ~TcpParent() = default; + + /** + * Method to be called when data was received + * @param state + * @param buffer + * @return size_t + */ + virtual size_t onReceived(TcpState *state, const Buffer &buffer) = 0; + + /** + * Method that is called when the connection is secured + * @param state + * @param ssl + * @return bool + */ + virtual bool onSecured(TcpState *state, const SSL *ssl) = 0; + + /** + * Method to be called when we need to monitor a different filedescriptor + * @param state + * @param fd + * @param events + */ + virtual void onIdle(TcpState *state, int socket, int events) = 0; + + /** + * Method that is called when an error occurs (the connection is lost then) + * @param state + * @param error + */ + virtual void onError(TcpState *state, const char *message) = 0; + + /** + * Method to be called when it is detected that the connection was nicely closed + * @param state + */ + virtual void onClosed(TcpState *state) = 0; + + /** + * The expected number of bytes + * @return size_t + */ + virtual size_t expected() = 0; +}; + +/** + * End of namespace + */ +} diff --git a/src/channelcloseframe.h b/src/channelcloseframe.h index 48476af..d41b3da 100644 --- a/src/channelcloseframe.h +++ b/src/channelcloseframe.h @@ -1,7 +1,7 @@ /** * Class describing a channel close frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -81,10 +81,10 @@ public: * @param failingClass failing class id if applicable * @param failingMethod failing method id if applicable */ - ChannelCloseFrame(uint16_t channel, uint16_t code = 0, const std::string& text = "", uint16_t failingClass = 0, uint16_t failingMethod = 0) : + ChannelCloseFrame(uint16_t channel, uint16_t code = 0, std::string text = "", uint16_t failingClass = 0, uint16_t failingMethod = 0) : ChannelFrame(channel, (uint32_t)(text.length() + 7)), // sizeof code, failingclass, failingmethod (2byte + 2byte + 2byte) + text length + text length byte _code(code), - _text(text), + _text(std::move(text)), _failingClass(failingClass), _failingMethod(failingMethod) {} @@ -92,7 +92,7 @@ public: /** * Destructor */ - virtual ~ChannelCloseFrame() {} + virtual ~ChannelCloseFrame() = default; /** * Method id diff --git a/src/connectioncloseframe.h b/src/connectioncloseframe.h index 9828bb5..312c95d 100644 --- a/src/connectioncloseframe.h +++ b/src/connectioncloseframe.h @@ -1,7 +1,7 @@ /** * Class describing connection close frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -81,10 +81,10 @@ public: * @param failingClass id of the failing class if applicable * @param failingMethod id of the failing method if applicable */ - ConnectionCloseFrame(uint16_t code, const std::string &text, uint16_t failingClass = 0, uint16_t failingMethod = 0) : + ConnectionCloseFrame(uint16_t code, std::string text, uint16_t failingClass = 0, uint16_t failingMethod = 0) : ConnectionFrame((uint32_t)(text.length() + 7)), // 1 for extra string byte, 2 for each uint16 _code(code), - _text(text), + _text(std::move(text)), _failingClass(failingClass), _failingMethod(failingMethod) {} diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index fe1e567..3df2d15 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -186,6 +186,64 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer) return processed; } +/** + * Fail all open channels, helper method + * @param monitor object to check if object still exists + * @param message error message + * @return bool does the object still exist? + */ +bool ConnectionImpl::fail(const Monitor &monitor, const char *message) +{ + // all deferred result objects in the channels should report this error too + while (!_channels.empty()) + { + // report the errors + _channels.begin()->second->reportError(message, false); + + // leap out if no longer valid + if (!monitor.valid()) return false; + } + + // done + return true; +} + +/** + * Fail the connection / report that the connection is lost + * @param message + * @return bool + */ +bool ConnectionImpl::fail(const char *message) +{ + // if already closed + if (_state == state_closed) return false; + + // from now on we consider the connection to be closed + _state = state_closed; + + // monitor because every callback could invalidate the connection + fail(Monitor(this), message); + + // done + return true; +} + +/** + * Report an error to user-space + * @param message the error message + */ +void ConnectionImpl::reportError(const char *message) +{ + // monitor because every callback could invalidate the connection + Monitor monitor(this); + + // fail all operations + if (!fail(monitor, message)) return; + + // inform handler + _handler->onError(_parent, message); +} + /** * Close the connection * This will close all channels @@ -194,7 +252,7 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer) bool ConnectionImpl::close() { // leap out if already closed or closing - if (_closed) return false; + if (_closed || _state == state_closed) return false; // mark that the object is closed _closed = true; diff --git a/src/linux_tcp/includes.h b/src/linux_tcp/includes.h index a83e9bd..a039554 100644 --- a/src/linux_tcp/includes.h +++ b/src/linux_tcp/includes.h @@ -20,6 +20,7 @@ #include "amqpcpp/linux_tcp/tcpdefines.h" // mid level includes +#include "amqpcpp/linux_tcp/tcpparent.h" #include "amqpcpp/linux_tcp/tcphandler.h" #include "amqpcpp/linux_tcp/tcpconnection.h" diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index d60a9c7..2220a71 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -28,7 +28,7 @@ namespace AMQP { /** * Class definition */ -class SslConnected : public TcpState, private Watchable +class SslConnected : public TcpExtState { private: /** @@ -37,12 +37,6 @@ private: */ SslWrapper _ssl; - /** - * Socket file descriptor - * @var int - */ - int _socket; - /** * The outgoing buffer * @var TcpBuffer @@ -71,12 +65,6 @@ private: */ bool _closed = false; - /** - * Have we reported the final instruction to the user? - * @var bool - */ - bool _finalized = false; - /** * Cached reallocation instruction * @var size_t @@ -84,43 +72,6 @@ private: size_t _reallocate = 0; - /** - * Close the connection - * @return bool - */ - bool close() - { - // do nothing if already closed - if (_socket < 0) return false; - - // and stop monitoring it - _handler->monitor(_connection, _socket, 0); - - // close the socket - ::close(_socket); - - // forget filedescriptor - _socket = -1; - - // done - return true; - } - - /** - * Construct the final state - * @param monitor Object that monitors whether connection still exists - * @return TcpState* - */ - TcpState *finalstate(const Monitor &monitor) - { - // close the socket if it is still open - close(); - - // if the object is still in a valid state, we can move to the close-state, - // otherwise there is no point in moving to a next state - return monitor.valid() ? new TcpClosed(this) : nullptr; - } - /** * Proceed with the next operation after the previous operation was * a success, possibly changing the filedescriptor-monitor @@ -132,23 +83,17 @@ private: if (_out) { // let's wait until the socket becomes writable - _handler->monitor(_connection, _socket, readable | writable); + _parent->onIdle(this, _socket, readable | writable); } else if (_closed) { // start the state that closes the connection - auto *nextstate = new SslShutdown(_connection, _socket, std::move(_ssl), _finalized, _handler); - - // we forget the current socket to prevent that it gets destructed - _socket = -1; - - // report the next state - return nextstate; + return new SslShutdown(this, std::move(_ssl)); } else { // let's wait until the socket becomes readable - _handler->monitor(_connection, _socket, readable); + _parent->onIdle(this, _socket, readable); } // done @@ -171,7 +116,7 @@ private: _state = state; // the operation must be repeated when readable - _handler->monitor(_connection, _socket, readable); + _parent->onIdle(this, _socket, readable); // allow chaining return monitor.valid() ? this : nullptr; @@ -181,7 +126,7 @@ private: _state = state; // wait until socket becomes writable again - _handler->monitor(_connection, _socket, readable | writable); + _parent->onIdle(this, _socket, readable | writable); // allow chaining return monitor.valid() ? this : nullptr; @@ -191,23 +136,17 @@ private: _state = state_idle; // turns out no error occured, an no action has to be rescheduled - _handler->monitor(_connection, _socket, _out || _closed ? readable | writable : readable); + _parent->onIdle(this, _socket, _out || _closed ? readable | writable : readable); // allow chaining return monitor.valid() ? this : nullptr; default: - // if we have already reported an error to user space, we can go to the final state right away - if (_finalized) return finalstate(monitor); - - // remember that we've sent out an error - _finalized = true; - - // tell the handler - _handler->onError(_connection, "ssl error"); - - // go to the final state - return finalstate(monitor); + + // @todo report an error to all channels + + // ssl level error, we have to tear down the tcp connection + return monitor.valid() ? new TcpShutdown(this) : nullptr; } } @@ -224,7 +163,7 @@ private: TcpInBuffer buffer(std::move(_in)); // parse the buffer - auto processed = _connection->parse(buffer); + auto processed = _parent->onReceived(this, buffer); // "this" could be removed by now, check this if (!monitor.valid()) return nullptr; @@ -329,7 +268,7 @@ private: _state = state_idle; // read data from ssl into the buffer - auto result = _in.receivefrom(_ssl, _connection->expected()); + auto result = _in.receivefrom(_ssl, _parent->expected()); // if this is a failure, we are going to repeat the operation if (result <= 0) return repeat(monitor, state_receiving, OpenSSL::SSL_get_error(_ssl, result)); @@ -350,31 +289,34 @@ private: public: /** * Constructor - * @param connection Parent TCP connection object - * @param socket The socket filedescriptor + * @param state The previous state * @param ssl The SSL structure * @param buffer The buffer that was already built - * @param handler User-supplied handler object */ - SslConnected(TcpConnection *connection, int socket, SslWrapper &&ssl, TcpOutBuffer &&buffer, TcpHandler *handler) : - TcpState(connection, handler), + SslConnected(TcpExtState *state, SslWrapper &&ssl, TcpOutBuffer &&buffer) : + TcpExtState(state), _ssl(std::move(ssl)), - _socket(socket), _out(std::move(buffer)), _in(4096), _state(_out ? state_sending : state_idle) { // tell the handler to monitor the socket if there is an out - _handler->monitor(_connection, _socket, _state == state_sending ? readable | writable : readable); + _parent->onIdle(this, _socket, _state == state_sending ? readable | writable : readable); } /** - * Destructor + * Destructor */ virtual ~SslConnected() noexcept { + // no cleanup if socket is gone + if (_socket < 0) return; + + // and stop monitoring it + _parent->onIdle(this, _socket, 0); + // close the socket - close(); + ::close(_socket); } /** @@ -426,7 +368,7 @@ public: */ virtual TcpState *flush(const Monitor &monitor) override { - // we are not going to do this is object is busy reading + // we are not going to do this if object is busy reading if (_state == state_receiving) return this; // create an object to wait for the filedescriptor to becomes active @@ -484,17 +426,12 @@ public: */ virtual TcpState *abort(const Monitor &monitor) override { - // if we have already told user space that connection is gone - if (_finalized) return new TcpClosed(this); + // close the connection right now + if (!close(monitor)) return nullptr; - // object will be finalized now - _finalized = true; - - // close the connection - close(); - - // inform user space that the party is over - _handler->onError(_connection, "ssl connection terminated"); + // tell the connection that it failed (this eventually ends up in our reportError() method) + // @todo to we indeed need this? + //_connection->fail(); // go to the final state (if not yet disconnected) return monitor.valid() ? new TcpClosed(this) : nullptr; @@ -515,56 +452,59 @@ public: if (_state != state_idle) return; // let's wait until the socket becomes writable - _handler->monitor(_connection, _socket, readable | writable); + _parent->onIdle(this, _socket, readable | writable); } /** - * Report that heartbeat negotiation is going on - * @param heartbeat suggested heartbeat - * @return uint16_t accepted heartbeat + * When the AMQP transport layer is closed + * @param monitor Object that can be used if connection is still alive + * @return TcpState New implementation object */ - virtual uint16_t reportNegotiate(uint16_t heartbeat) override - { - // remember that we have to reallocate (_in member can not be accessed because it is moved away) - _reallocate = _connection->maxFrame(); - - // pass to base - return TcpState::reportNegotiate(heartbeat); - } - - /** - * Report a connection error - * @param error - */ - virtual void reportError(const char *error) override - { - // we want to start the elegant ssl shutdown procedure, so we call reportClosed() here too, - // because that function does exactly what we want to do here too - reportClosed(); - - // if the user was already notified of an final state, we do not have to proceed - if (_finalized) return; - - // remember that this is the final call to user space - _finalized = true; - - // pass to handler - _handler->onError(_connection, error); - } - - /** - * Report to the handler that the connection was nicely closed - */ - virtual void reportClosed() override + virtual TcpState *onAmqpClosed(const Monitor &monitor) override { // remember that the object is going to be closed _closed = true; // if the previous operation is still in progress we can wait for that - if (_state != state_idle) return; + if (_state != state_idle) return this; - // wait until the connection is writable so that we can close it then - _handler->monitor(_connection, _socket, readable | writable); + // the connection can be closed right now, move to the next state + return new SslShutdown(this, std::move(_ssl)); + } + + /** + * When an error occurs in the AMQP protocol + * @param monitor Monitor that can be used to check if the connection is still alive + * @param message The error message + * @return TcpState New implementation object + */ + virtual TcpState *onAmqpError(const Monitor &monitor, const char *message) override + { + // tell the user about it + // @todo do we need this here? + //_handler->onError(_connection, message); + + // stop if the object was destructed + if (!monitor.valid()) return nullptr; + + // remember that the object is going to be closed + _closed = true; + + // if the previous operation is still in progress we can wait for that + if (_state != state_idle) return this; + + // the connection can be closed right now, move to the next state + return new SslShutdown(this, std::move(_ssl)); + } + + /** + * Install max-frame size + * @param heartbeat suggested heartbeat + */ + virtual void maxframe(size_t maxframe) override + { + // remember that we have to reallocate (_in member can not be accessed because it is moved away) + _reallocate = maxframe; } }; diff --git a/src/linux_tcp/sslhandshake.h b/src/linux_tcp/sslhandshake.h index 5de7928..251eef9 100644 --- a/src/linux_tcp/sslhandshake.h +++ b/src/linux_tcp/sslhandshake.h @@ -29,7 +29,7 @@ namespace AMQP { /** * Class definition */ -class SslHandshake : public TcpState, private Watchable +class SslHandshake : public TcpExtState { private: /** @@ -38,12 +38,6 @@ private: */ SslWrapper _ssl; - /** - * The socket file descriptor - * @var int - */ - int _socket; - /** * The outgoing buffer * @var TcpOutBuffer @@ -61,7 +55,7 @@ private: if (_socket < 0) return false; // and stop monitoring it - _handler->monitor(_connection, _socket, 0); + _parent->onIdle(this, _socket, 0); // close the socket ::close(_socket); @@ -81,28 +75,30 @@ private: TcpState *nextstate(const Monitor &monitor) { // check if the handler allows the connection - bool allowed = _handler->onSecured(_connection, _ssl); + bool allowed = _parent->onSecured(this, _ssl); // leap out if the user space function destructed the object if (!monitor.valid()) return nullptr; // copy the socket because we might forget it - auto socket = _socket; +// auto socket = _socket; // forget the socket member to prevent that it is closed by the destructor _socket = -1; // if connection is allowed, we move to the next state - if (allowed) return new SslConnected(_connection, socket, std::move(_ssl), std::move(_out), _handler); + if (allowed) return new SslConnected(this, std::move(_ssl), std::move(_out)); // report that the connection is broken - _handler->onError(_connection, "TLS connection has been rejected"); + // @todo do we need this? + //_handler->onError(_connection, "TLS connection has been rejected"); // the onError method could have destructed this object if (!monitor.valid()) return nullptr; // shutdown the connection - return new SslShutdown(_connection, socket, std::move(_ssl), true, _handler); + // @todo the onClosed() does not have to be called + return new SslShutdown(this, std::move(_ssl)); } /** @@ -116,7 +112,8 @@ private: close(); // we have an error - report this to the user - _handler->onError(_connection, "failed to setup ssl connection"); + // @todo do we need this? + //_handler->onError(_connection, "failed to setup ssl connection"); // done, go to the closed state (plus check if connection still exists, because // after the onError() call the user space program may have destructed that object) @@ -131,7 +128,7 @@ private: TcpState *proceed(int events) { // tell the handler that we want to listen for certain events - _handler->monitor(_connection, _socket, events); + _parent->onIdle(this, _socket, events); // allow chaining return this; @@ -143,18 +140,15 @@ public: * * @todo catch the exception! * - * @param connection Parent TCP connection object - * @param socket The socket filedescriptor + * @param state Earlier state * @param hostname The hostname to connect to * @param context SSL context * @param buffer The buffer that was already built - * @param handler User-supplied handler object * @throws std::runtime_error */ - SslHandshake(TcpConnection *connection, int socket, const std::string &hostname, TcpOutBuffer &&buffer, TcpHandler *handler) : - TcpState(connection, handler), + SslHandshake(TcpExtState *state, const std::string &hostname, TcpOutBuffer &&buffer) : + TcpExtState(state), _ssl(SslContext(OpenSSL::TLS_client_method())), - _socket(socket), _out(std::move(buffer)) { // we will be using the ssl context as a client @@ -164,10 +158,10 @@ public: OpenSSL::SSL_ctrl(_ssl, SSL_CTRL_SET_TLSEXT_HOSTNAME, TLSEXT_NAMETYPE_host_name, (void *)hostname.data()); // associate the ssl context with the socket filedescriptor - if (OpenSSL::SSL_set_fd(_ssl, socket) == 0) throw std::runtime_error("failed to associate filedescriptor with ssl socket"); + if (OpenSSL::SSL_set_fd(_ssl, _socket) == 0) throw std::runtime_error("failed to associate filedescriptor with ssl socket"); // we are going to wait until the socket becomes writable before we start the handshake - _handler->monitor(_connection, _socket, writable); + _parent->onIdle(this, _socket, writable); } /** @@ -238,7 +232,7 @@ public: // the handshake is still busy, outgoing data must be cached _out.add(buffer, size); } - + /** * Flush the connection, sent all buffered data to the socket * @param monitor Object to check if connection still exists @@ -286,7 +280,8 @@ public: close(); // report to the user that the handshake was aborted - _handler->onError(_connection, "ssl handshake aborted"); + // @todo do we need this? + //_handler->onError(_connection, "ssl handshake aborted"); // done, go to the closed state (plus check if connection still exists, because // after the onError() call the user space program may have destructed that object) diff --git a/src/linux_tcp/sslshutdown.h b/src/linux_tcp/sslshutdown.h index 4d7a43d..02e7d12 100644 --- a/src/linux_tcp/sslshutdown.h +++ b/src/linux_tcp/sslshutdown.h @@ -20,7 +20,7 @@ namespace AMQP { /** * Class definition */ -class SslShutdown : public TcpState, private Watchable +class SslShutdown : public TcpExtState { private: /** @@ -28,64 +28,7 @@ private: * @var SslWrapper */ SslWrapper _ssl; - - /** - * Socket file descriptor - * @var int - */ - int _socket; - /** - * Have we already notified user space of connection end? - * @var bool - */ - bool _finalized; - - - /** - * Close the socket - * @return bool - */ - bool close() - { - // skip if already closed - if (_socket < 0) return false; - - // we're no longer interested in events - _handler->monitor(_connection, _socket, 0); - - // close the socket - ::close(_socket); - - // forget the socket - _socket = -1; - - // done - return true; - } - - /** - * Report an error - * @param monitor object to check if connection still exists - * @return TcpState* - */ - TcpState *reporterror(const Monitor &monitor) - { - // close the socket - close(); - - // if we have already told user space that connection is gone - if (_finalized) return new TcpClosed(this); - - // object will be finalized now - _finalized = true; - - // inform user space that the party is over - _handler->onError(_connection, "ssl shutdown error"); - - // go to the final state (if not yet disconnected) - return monitor.valid() ? new TcpClosed(this) : nullptr; - } /** * Proceed with the next operation after the previous operation was @@ -93,22 +36,10 @@ private: * @param monitor object to check if connection still exists * @return TcpState* */ - TcpState *proceed(const Monitor &monitor) + virtual TcpState *proceed(const Monitor &monitor) { - // close the socket - close(); - - // if we have already told user space that connection is gone - if (_finalized) return new TcpClosed(this); - - // object will be finalized now - _finalized = true; - - // inform user space that the party is over - _handler->onClosed(_connection); - - // go to the final state (if not yet disconnected) - return monitor.valid() ? new TcpClosed(this) : nullptr; + // next state is to shutdown the connection + return new TcpShutdown(this); } /** @@ -126,17 +57,18 @@ private: switch (error) { case SSL_ERROR_WANT_READ: // the operation must be repeated when readable - _handler->monitor(_connection, _socket, readable); + _parent->onIdle(this, _socket, readable); return this; case SSL_ERROR_WANT_WRITE: // wait until socket becomes writable again - _handler->monitor(_connection, _socket, readable | writable); + _parent->onIdle(this, _socket, readable | writable); return this; default: // go to the final state (if not yet disconnected) - return reporterror(monitor); + // @todo special treatment for ssl-protocol errors + return proceed(monitor); } } @@ -144,37 +76,28 @@ private: public: /** * Constructor - * @param connection Parent TCP connection object - * @param socket The socket filedescriptor + * @param state Previous state * @param ssl The SSL structure - * @param finalized Is the user already notified of connection end (onError() has been called) - * @param handler User-supplied handler object */ - SslShutdown(TcpConnection *connection, int socket, SslWrapper &&ssl, bool finalized, TcpHandler *handler) : - TcpState(connection, handler), - _ssl(std::move(ssl)), - _socket(socket), - _finalized(finalized) + SslShutdown(TcpExtState *state, SslWrapper &&ssl) : + TcpExtState(state), + _ssl(std::move(ssl)) { // wait until the socket is accessible - _handler->monitor(_connection, _socket, readable | writable); - } + _parent->onIdle(this, _socket, readable | writable); + } + + /** + * No copying + * @param that + */ + SslShutdown(const SslShutdown &that) = delete; /** * Destructor */ - virtual ~SslShutdown() noexcept - { - // close the socket - close(); - } + virtual ~SslShutdown() noexcept = default; - /** - * The filedescriptor of this connection - * @return int - */ - virtual int fileno() const override { return _socket; } - /** * Process the filedescriptor in the object * @param monitor Object to check if connection still exists @@ -211,6 +134,8 @@ public: */ virtual TcpState *flush(const Monitor &monitor) override { + // @todo do we even need this? isn't flushing reserved for data? + // create an object to wait for the filedescriptor to becomes active Poll poll(_socket); @@ -238,29 +163,25 @@ public: case SSL_ERROR_WANT_WRITE: poll.active(true); break; // something is wrong, we proceed to the next state - default: return reporterror(monitor); + default: return proceed(monitor); } } } /** - * Abort the shutdown operation + * Abort the shutdown operation immediately * @param monitor Monitor that can be used to check if the tcp connection is still alive * @return TcpState */ virtual TcpState *abort(const Monitor &monitor) override { - // if we have already told user space that connection is gone - if (_finalized) return new TcpClosed(this); + // cleanup the connection + // @todo this also calls onClosed() + cleanup(); - // object will be finalized now - _finalized = true; - - // close the connection - close(); - - // inform user space that the party is over - _handler->onError(_connection, "ssl shutdown aborted"); + // report to user-space that the ssl shutdown was aborted + // @todo + //_handler->onError(_connection, "ssl shutdown aborted"); // go to the final state (if not yet disconnected) return monitor.valid() ? new TcpClosed(this) : nullptr; diff --git a/src/linux_tcp/tcpclosed.h b/src/linux_tcp/tcpclosed.h index cf1b972..ebd2561 100644 --- a/src/linux_tcp/tcpclosed.h +++ b/src/linux_tcp/tcpclosed.h @@ -4,7 +4,7 @@ * Class that is used when the TCP connection ends up in a closed state * * @author Emiel Bruijntjes - * @copyright 2015 - 2016 Copernica BV + * @copyright 2015 - 2018 Copernica BV */ /** @@ -25,11 +25,10 @@ class TcpClosed : public TcpState public: /** * Constructor - * @param connection The parent TcpConnection object - * @param handler User supplied handler + * @param parent The parent object */ - TcpClosed(TcpConnection *connection, TcpHandler *handler) : - TcpState(connection, handler) {} + TcpClosed(TcpParent *parent) : + TcpState(parent) {} /** * Constructor @@ -42,7 +41,7 @@ public: * Destructor */ virtual ~TcpClosed() noexcept = default; - + /** * Abort the operation * @param monitor Monitor that can be used to check if the tcp connection is still alive diff --git a/src/linux_tcp/tcpconnected.h b/src/linux_tcp/tcpconnected.h index ef011c5..7b4c956 100644 --- a/src/linux_tcp/tcpconnected.h +++ b/src/linux_tcp/tcpconnected.h @@ -18,6 +18,7 @@ */ #include "tcpoutbuffer.h" #include "tcpinbuffer.h" +#include "tcpshutdown.h" #include "poll.h" /** @@ -28,15 +29,9 @@ namespace AMQP { /** * Class definition */ -class TcpConnected : public TcpState, private Watchable +class TcpConnected : public TcpExtState { private: - /** - * The socket file descriptor - * @var int - */ - int _socket; - /** * The outgoing buffer * @var TcpOutBuffer @@ -63,48 +58,36 @@ private: /** - * Close the connection - * @return bool + * Start an elegant shutdown + * + * @todo remove this method */ - bool close() + void shutdown2() { - // do nothing if already closed - if (_socket < 0) return false; + // we will shutdown the socket in a very elegant way, we notify the peer + // that we will not be sending out more write operations + ::shutdown(_socket, SHUT_WR); - // and stop monitoring it - _handler->monitor(_connection, _socket, 0); - - // close the socket - ::close(_socket); - - // forget filedescriptor - _socket = -1; - - // done - return true; + // we still monitor the socket for readability to see if our close call was + // confirmed by the peer + _parent->onIdle(this, _socket, readable); } /** * Helper method to report an error + * @param monitor Monitor to check validity of "this" * @return bool Was an error reported? */ - bool reportError() + bool reportError(const Monitor &monitor) { // some errors are ok and do not (necessarily) mean that we're disconnected if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false; - - // connection can be closed now - close(); - - // if the user has already been notified, we do not have to do anything else - if (_finalized) return true; - - // update the _finalized member before we make the call to user space because - // the user space may destruct this object - _finalized = true; - - // we have an error - report this to the user - _handler->onError(_connection, strerror(errno)); + + // tell the connection that it failed + // @todo we should report an error, but that could be wrong, because it calls back to us + + // we're no longer interested in the socket (this also calls onClosed()) + cleanup(); // done return true; @@ -125,14 +108,11 @@ private: public: /** * Constructor - * @param connection Parent TCP connection object - * @param socket The socket filedescriptor + * @param state The previous state * @param buffer The buffer that was already built - * @param handler User-supplied handler object */ - TcpConnected(TcpConnection *connection, int socket, TcpOutBuffer &&buffer, TcpHandler *handler) : - TcpState(connection, handler), - _socket(socket), + TcpConnected(TcpExtState *state, TcpOutBuffer &&buffer) : + TcpExtState(state), _out(std::move(buffer)), _in(4096) { @@ -140,17 +120,13 @@ public: if (_out) _out.sendto(_socket); // tell the handler to monitor the socket, if there is an out - _handler->monitor(_connection, _socket, _out ? readable | writable : readable); + _parent->onIdle(this, _socket, _out ? readable | writable : readable); } /** * Destructor */ - virtual ~TcpConnected() noexcept - { - // close the socket - close(); - } + virtual ~TcpConnected() noexcept = default; /** * The filedescriptor of this connection @@ -183,28 +159,30 @@ public: auto result = _out.sendto(_socket); // are we in an error state? - if (result < 0 && reportError()) return nextState(monitor); + if (result < 0 && reportError(monitor)) return nextState(monitor); // if buffer is empty by now, we no longer have to check for // writability, but only for readability - if (!_out) _handler->monitor(_connection, _socket, readable); + if (!_out) _parent->onIdle(this, _socket, readable); } // should we check for readability too? if (flags & readable) { // read data from buffer - ssize_t result = _in.receivefrom(_socket, _connection->expected()); + ssize_t result = _in.receivefrom(_socket, _parent->expected()); // are we in an error state? - if (result < 0 && reportError()) return nextState(monitor); + if (result < 0 && reportError(monitor)) return nextState(monitor); + + // @todo should we also check for result == 0 // we need a local copy of the buffer - because it is possible that "this" // object gets destructed halfway through the call to the parse() method TcpInBuffer buffer(std::move(_in)); // parse the buffer - auto processed = _connection->parse(buffer); + auto processed = _parent->onReceived(this, buffer); // "this" could be removed by now, check this if (!monitor.valid()) return nullptr; @@ -249,7 +227,7 @@ public: _out.add(buffer + bytes, size - bytes); // start monitoring the socket to find out when it is writable - _handler->monitor(_connection, _socket, readable | writable); + _parent->onIdle(this, _socket, readable | writable); } /** @@ -286,77 +264,55 @@ public: */ virtual TcpState *abort(const Monitor &monitor) override { - // if we have already told user space that connection is gone - if (_finalized) return new TcpClosed(this); + // close the connection right now + if (!close(monitor)) return nullptr; - // object will be finalized now - _finalized = true; - - // close the connection - close(); - - // inform user space that the party is over - _handler->onError(_connection, "tcp connection terminated"); + // fail the connection (this ends up in our reportError() method) + // @todo should this not happen + //_connection->fail(); // go to the final state (if not yet disconnected) return monitor.valid() ? new TcpClosed(this) : nullptr; } /** - * Report that heartbeat negotiation is going on - * @param heartbeat suggested heartbeat - * @return uint16_t accepted heartbeat + * When the AMQP transport layer is closed + * @param monitor Object that can be used if connection is still alive + * @return TcpState New implementation object */ - virtual uint16_t reportNegotiate(uint16_t heartbeat) override + virtual TcpState *onAmqpClosed(const Monitor &monitor) override + { + // move to the tcp shutdown state + return new TcpShutdown(this); + } + + /** + * When an error occurs in the AMQP protocol + * @param monitor Monitor that can be used to check if the connection is still alive + * @param message The error message + * @return TcpState New implementation object + */ + virtual TcpState *onAmqpError(const Monitor &monitor, const char *message) override + { + // tell the user about it + // @todo do this somewhere else + //_handler->onError(_connection, message); + + // stop if the object was destructed + if (!monitor.valid()) return nullptr; + + // move to the tcp shutdown state + return new TcpShutdown(this); + } + + /** + * Install max-frame size + * @param heartbeat suggested heartbeat + */ + virtual void maxframe(size_t maxframe) override { // remember that we have to reallocate (_in member can not be accessed because it is moved away) - _reallocate = _connection->maxFrame(); - - // pass to base - return TcpState::reportNegotiate(heartbeat); - } - - /** - * Report to the handler that the object is in an error state. - * @param error - */ - virtual void reportError(const char *error) override - { - // close the socket - close(); - - // if the user was already notified of an final state, we do not have to proceed - if (_finalized) return; - - // remember that this is the final call to user space - _finalized = true; - - // pass to handler - _handler->onError(_connection, error); - } - - /** - * Report to the handler that the connection was nicely closed - * This is the counter-part of the connection->close() call. - */ - virtual void reportClosed() override - { - // we will shutdown the socket in a very elegant way, we notify the peer - // that we will not be sending out more write operations - shutdown(_socket, SHUT_WR); - - // we still monitor the socket for readability to see if our close call was - // confirmed by the peer - _handler->monitor(_connection, _socket, readable); - - // if the user was already notified of an final state, we do not have to proceed - if (_finalized) return; - - // remember that this is the final call to user space - _finalized = true; - - // pass to handler - _handler->onClosed(_connection); + _reallocate = maxframe; } }; diff --git a/src/linux_tcp/tcpconnection.cpp b/src/linux_tcp/tcpconnection.cpp index ee21cd5..4fa471a 100644 --- a/src/linux_tcp/tcpconnection.cpp +++ b/src/linux_tcp/tcpconnection.cpp @@ -25,7 +25,8 @@ namespace AMQP { * @param hostname The address to connect to */ TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) : - _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), handler)), + _handler(handler), + _state(new TcpResolver(this, address.hostname(), address.port(), address.secure())), _connection(this, address.login(), address.vhost()) {} /** @@ -65,7 +66,7 @@ void TcpConnection::process(int fd, int flags) { // monitor the object for destruction, because you never know what the user Monitor monitor(this); - + // store the old state auto *oldstate = _state.get(); @@ -122,6 +123,8 @@ void TcpConnection::flush() */ bool TcpConnection::close(bool immediate) { + // @todo what if not yet connected / still doing a lookup? + // if no immediate disconnect is needed, we can simply start the closing handshake if (!immediate) return _connection.close(); @@ -134,54 +137,29 @@ bool TcpConnection::close(bool immediate) // if the user-space code destructed the connection, there is nothing else to do if (!monitor.valid()) return true; - // store the old state + // remember the old state (this is necessary because _state may be modified by user-code) auto *oldstate = _state.get(); - + // abort the operation + // @todo does this call user-space stuff? auto *newstate = _state->abort(monitor); // if the state did not change, we do not have to update a member, // when the newstate is nullptr, the object is (being) destructed // and we do not have to do anything else either - if (oldstate == newstate || newstate == nullptr) return true; + if (newstate == nullptr || newstate == oldstate) return true; - // in a bizarre set of circumstances, the user may have implemented the - // handler in such a way that the connection object was destructed - if (!monitor.valid()) - { - // ok, user code is weird, connection object no longer exist, get rid of the state too - delete newstate; - } - else - { - // replace it with the new implementation - _state.reset(newstate); - } + // replace it with the new implementation + _state.reset(newstate); + + // fail the connection / report the error to user-space + // @todo what if channels are not even ready? + _connection.fail("connection prematurely closed by client"); // done, we return true because the connection is closed return true; } -/** - * Method that is called after the connection was constructed - * @param connection The connection that was attached to the handler - */ -void TcpConnection::onAttached(Connection *connection) -{ - // pass on to the state - _state->reportAttached(); -} - -/** - * Method that is called when the connection is destructed - * @param connection The connection that was detached from the handler - */ -void TcpConnection::onDetached(Connection *connection) -{ - // pass on to the state - _state->reportDetached(); -} - /** * Method that is called when the heartbeat frequency is negotiated. * @param connection The connection that suggested a heartbeat interval @@ -190,8 +168,11 @@ void TcpConnection::onDetached(Connection *connection) */ uint16_t TcpConnection::onNegotiate(Connection *connection, uint16_t interval) { - // the state object should do this - return _state->reportNegotiate(interval); + // tell the max-frame size + _state->maxframe(connection->maxFrame()); + + // tell the handler + return _handler->onNegotiate(this, interval); } /** @@ -206,16 +187,6 @@ void TcpConnection::onData(Connection *connection, const char *buffer, size_t si _state->send(buffer, size); } -/** - * Method that is called when the server sends a heartbeat to the client - * @param connection The connection over which the heartbeat was received - */ -void TcpConnection::onHeartbeat(Connection *connection) -{ - // let the state object do this - _state->reportHeartbeat(); -} - /** * Method called when the connection ends up in an error state * @param connection The connection that entered the error state @@ -223,18 +194,20 @@ void TcpConnection::onHeartbeat(Connection *connection) */ void TcpConnection::onError(Connection *connection, const char *message) { - // tell the implementation to report the error - _state->reportError(message); -} - -/** - * Method that is called when the connection is established - * @param connection The connection that can now be used - */ -void TcpConnection::onConnected(Connection *connection) -{ - // tell the implementation to report the status - _state->reportConnected(); + // monitor to check if "this" is destructed + Monitor monitor(this); + + // remember the old state (this is necessary because _state may be modified by user-code) + auto *oldstate = _state.get(); + + // tell the state that an error occured at the amqp level + auto *newstate = _state->onAmqpError(monitor, message); + + // leap out if nothing changes + if (newstate == nullptr || newstate == oldstate) return; + + // assign the new state + _state.reset(newstate); } /** @@ -243,8 +216,20 @@ void TcpConnection::onConnected(Connection *connection) */ void TcpConnection::onClosed(Connection *connection) { - // tell the implementation to report that connection is closed now - _state->reportClosed(); + // monitor to check if "this" is destructed + Monitor monitor(this); + + // remember the old state (this is necessary because _state may be modified by user-code) + auto *oldstate = _state.get(); + + // tell the state that the connection was closed at the amqp level + auto *newstate = _state->onAmqpClosed(monitor); + + // leap out if nothing changes + if (newstate == nullptr || newstate == oldstate) return; + + // assign the new state + _state.reset(newstate); } /** diff --git a/src/linux_tcp/tcpextstate.h b/src/linux_tcp/tcpextstate.h new file mode 100644 index 0000000..cbb1f45 --- /dev/null +++ b/src/linux_tcp/tcpextstate.h @@ -0,0 +1,107 @@ +/** + * TcpExtState.h + * + * Extended state that also contains the socket filedescriptor + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpExtState : public TcpState +{ +protected: + /** + * The filedescriptor + * @var int + */ + int _socket; + + /** + * Clean-up the socket + */ + void cleanup() + { + // do nothing if no longer connected + if (_socket < 0) return; + + // tell handler that the socket is idle and we're no longer interested in events + _parent->onIdle(this, _socket, 0); + + // close the socket + ::close(_socket); + + // forget the socket + _socket = -1; + + // tell the handler that the connection is now closed + _parent->onClosed(this); + } + + +protected: + /** + * Constructor + * @param parent + */ + TcpExtState(TcpParent *parent) : + TcpState(parent), + _socket(-1) {} + + /** + * Constructor + * @param state + */ + TcpExtState(TcpExtState *state) : + TcpState(state), + _socket(state->_socket) + { + // invalidate the other state + state->_socket = -1; + } + +public: + /** + * No copying + * @param that + */ + TcpExtState(const TcpExtState &that) = delete; + + /** + * Destructor + */ + virtual ~TcpExtState() + { + // cleanup the socket + cleanup(); + } + + /** + * The filedescriptor of this connection + * @return int + */ + virtual int fileno() const override + { + // expose the socket + return _socket; + } +}; + +/** + * End of namespace + */ +} + + diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index 7ac140a..d3af215 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -32,7 +32,7 @@ namespace AMQP { /** * Class definition */ -class TcpResolver : public TcpState +class TcpResolver : public TcpExtState { private: /** @@ -59,12 +59,6 @@ private: */ Pipe _pipe; - /** - * Non-blocking socket that is connected to RabbitMQ - * @var int - */ - int _socket = -1; - /** * Possible error that occured * @var std::string @@ -150,20 +144,19 @@ private: public: /** * Constructor - * @param connection Parent connection object + * @param parent Parent connection object * @param hostname The hostname for the lookup * @param portnumber The portnumber for the lookup * @param secure Do we need a secure tls connection when ready? - * @param handler User implemented handler object */ - TcpResolver(TcpConnection *connection, const std::string &hostname, uint16_t port, bool secure, TcpHandler *handler) : - TcpState(connection, handler), - _hostname(hostname), + TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure) : + TcpExtState(parent), + _hostname(std::move(hostname)), _secure(secure), _port(port) { // tell the event loop to monitor the filedescriptor of the pipe - handler->monitor(connection, _pipe.in(), readable); + parent->onIdle(this, _pipe.in(), readable); // we can now start the thread (must be started after filedescriptor is monitored!) std::thread thread(std::bind(&TcpResolver::run, this)); @@ -178,7 +171,7 @@ public: virtual ~TcpResolver() noexcept { // stop monitoring the pipe filedescriptor - _handler->monitor(_connection, _pipe.in(), 0); + _parent->onIdle(this, _pipe.in(), 0); // wait for the thread to be ready _thread.join(); @@ -201,21 +194,21 @@ public: { // if we need a secure connection, we move to the tls handshake // @todo catch possible exception - if (_secure) return new SslHandshake(_connection, _socket, _hostname, std::move(_buffer), _handler); + if (_secure) return new SslHandshake(this, _hostname, std::move(_buffer)); // otherwise we have a valid regular tcp connection - return new TcpConnected(_connection, _socket, std::move(_buffer), _handler); + return new TcpConnected(this, std::move(_buffer)); } else { // report error - _handler->onError(_connection, _error.data()); + _parent->onError(this, _error.data()); // handler callback might have destroyed connection if (!monitor.valid()) return nullptr; // create dummy implementation - return new TcpClosed(_connection, _handler); + return new TcpClosed(this); } } @@ -260,10 +253,10 @@ public: _thread.join(); // close the socket - if (_socket >= 0) ::close(_socket); + if (_socket >= 0) { ::close(_socket); _socket = -1; } // inform user space that the connection is cancelled - _handler->onError(_connection, "tcp connect aborted"); + _parent->onError(this, "tcp connect aborted"); // go to the final state (if not yet disconnected) return monitor.valid() ? new TcpClosed(this) : nullptr; diff --git a/src/linux_tcp/tcpsecurestate.h b/src/linux_tcp/tcpsecurestate.h new file mode 100644 index 0000000..77aae4d --- /dev/null +++ b/src/linux_tcp/tcpsecurestate.h @@ -0,0 +1,106 @@ +/** + * TcpSecureState.h + * + * Utility class that takes care of setting a new state. It contains + * a number of checks that prevents that the state is overwritten + * if the object is destructed in the meantime + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpSecureState +{ +private: + /** + * Monitor to check the validity of the connection + * @var Monitor + */ + Monitor _monitor; + + /** + * Reference to the pointer to the state that should be updated + * @var std::unique_ptr + */ + std::unique_ptr &_state; + + /** + * The old pointer + * @var TcpState* + */ + const TcpState *_old; + +public: + /** + * Constructor + * @param watchable the object that can be destructor + * @param state the old state value + */ + TcpSecureState(Watchable *watchable, std::unique_ptr &state) : + _monitor(watchable), _state(state), _old(state.get()) {} + + /** + * No copying + * @param that + */ + TcpSecureState(const TcpSecureState &that) = delete; + + /** + * Destructor + */ + virtual ~TcpSecureState() = default; + + /** + * Expose the monitor + * @return Monitor + */ + const Monitor &monitor() const { return _monitor; } + + /** + * Assign a new state + * @param state this is a newly allocated state + * @return bool true if the object is still valid + */ + bool assign(TcpState *state) + { + // do nothing if the state did not change, or if object was destructed + if (_old == state || state == nullptr) return _monitor.valid(); + + // can we assign a new state? + if (_monitor.valid()) + { + // assign the + _state.reset(state); + + // object is still valid + return true; + } + else + { + // otherwise the object is destructed and the new state should be destructed too + delete state; + + // object is no longer valid + return false; + } + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/linux_tcp/tcpshutdown.h b/src/linux_tcp/tcpshutdown.h new file mode 100644 index 0000000..75f5c26 --- /dev/null +++ b/src/linux_tcp/tcpshutdown.h @@ -0,0 +1,141 @@ +/** + * TcpShutdown.h + * + * State in the TCP handshake that is responsible for gracefully + * shutting down the connection by closing our side of the connection, + * and waiting for the server to close the connection on the other + * side too. + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "tcpextstate.h" + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpShutdown : public TcpExtState +{ +protected: + /** + * Method to report the result to the user + */ + virtual void report() + { + // report that the connection was closed + _parent->onClosed(this); + } + +public: + /** + * Constructor + * @param state The previous state + */ + TcpShutdown(TcpExtState *state) : TcpExtState(state) + { + // we will shutdown the socket in a very elegant way, we notify the peer + // that we will not be sending out more write operations + shutdown(_socket, SHUT_WR); + + // we still monitor the socket for readability to see if our close call was + // confirmed by the peer + _parent->onIdle(this, _socket, readable); + } + + /** + * Forbidden to copy + * @param that + */ + TcpShutdown(const TcpShutdown &that) = delete; + + /** + * Destructor + */ + virtual ~TcpShutdown() = default; + + /** + * Process the filedescriptor in the object + * @param monitor Monitor that can be used to check if the tcp connection is still alive + * @param fd The filedescriptor that is active + * @param flags AMQP::readable and/or AMQP::writable + * @return New implementation object + */ + virtual TcpState *process(const Monitor &monitor, int fd, int flags) + { + // must be the right filedescriptor + if (_socket != fd) return this; + + // if the socket is not readable, we do not have to check anything + if (!(flags & readable)) return this; + + // buffer to read data in + char buffer[64]; + + // read in data (we only do this to discover if the connection is really closed) + auto result = read(_socket, buffer, sizeof(buffer)); + + // if we read something, we keep on reading + if (result > 0) return this; + + // or should we retry? + if (result < 0 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) return this; + + // flush the connection to close the connection and report to the user + return flush(monitor); + } + + /** + * Flush the connection, make sure all network operations are finished + * @param monitor Object to check if connection still exists + * @return TcpState New state + */ + virtual TcpState *flush(const Monitor &monitor) override + { + // immediately close the socket + cleanup(); + + // report to the user that the operation is finished + report(); + + // move to next state + return monitor.valid() ? new TcpClosed(this) : nullptr; + } + + /** + * Abort the operation, immediately proceed to the final state + * @param monitor Monitor that can be used to check if the tcp connection is still alive + * @return TcpState New implementation object + */ + virtual TcpState *abort(const Monitor &monitor) override + { + // close the socket completely + cleanup(); + + // report the error to user-space + // @todo do we have to report this? + //_handler->onError(_connection, "tcp shutdown aborted"); + + // move to next state + return monitor.valid() ? new TcpClosed(this) : nullptr; + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/linux_tcp/tcpstate.h b/src/linux_tcp/tcpstate.h index fd83d59..b9a250a 100644 --- a/src/linux_tcp/tcpstate.h +++ b/src/linux_tcp/tcpstate.h @@ -24,32 +24,26 @@ class TcpState { protected: /** - * Parent TcpConnection object as is seen by the user - * @var TcpConnection + * Parent object that constructed the state + * @var TcpParent */ - TcpConnection *_connection; - - /** - * User-supplied handler - * @var TcpHandler - */ - TcpHandler *_handler; + TcpParent *_parent; protected: /** * Protected constructor - * @param connection Original TCP connection object + * @param parent The parent object * @param handler User-supplied handler class */ - TcpState(TcpConnection *connection, TcpHandler *handler) : - _connection(connection), _handler(handler) {} + TcpState(TcpParent *parent) : + _parent(parent) {} /** * Protected "copy" constructor * @param state Original TcpState object */ TcpState(const TcpState *state) : - _connection(state->_connection), _handler(state->_handler) {} + _parent(state->_parent) {} public: /** @@ -124,81 +118,25 @@ public: virtual TcpState *abort(const Monitor &monitor) = 0; /** - * Report to the handler that the connection was constructed - */ - virtual void reportAttached() - { - // pass to the handler - _handler->onAttached(_connection); - } - - /** - * Report to the handler that the connection was destructed - */ - virtual void reportDetached() - { - // pass to the handler - _handler->onDetached(_connection); - } - - /** - * Report to the handler that heartbeat negotiation is going on + * Install max-frame size * @param heartbeat suggested heartbeat - * @return uint16_t accepted heartbeat */ - virtual uint16_t reportNegotiate(uint16_t heartbeat) - { - // pass to handler - return _handler->onNegotiate(_connection, heartbeat); - } - - /** - * Report to the handler that the object is in an error state. - * - * This is the last method to be called on the handler object, from now on - * the handler will no longer be called to report things to user space. - * The state object itself stays active, and further calls to process() - * may be possible. - * - * @param error - */ - virtual void reportError(const char *error) - { - // pass to handler - _handler->onError(_connection, error); - } + virtual void maxframe(size_t maxframe) {} /** - * Report that a heartbeat frame was received - */ - virtual void reportHeartbeat() - { - // pass to handler - _handler->onHeartbeat(_connection); - } - - /** - * Report to the handler that the connection is ready for use - */ - virtual void reportConnected() - { - // pass to handler - _handler->onConnected(_connection); - } - - /** - * Report to the handler that the connection was correctly closed, after - * the user has called the Connection::close() method. The underlying TCP - * connection still has to be closed. + * Events that can take place during the AMQP protocol * - * This is the last method that is called on the object, from now on no - * other methods may be called on the _handler variable. + * Both events also trigger the end of a valid connection, and should + * be used to tear down the TCP connection. + * + * @todo are these appropriate names? + * + * @param monitor + * @param TcpState */ - virtual void reportClosed() - { - // pass to handler - _handler->onClosed(_connection); - } + virtual TcpState *onAmqpError(const Monitor &monitor, const char *error) { return this; } + virtual TcpState *onAmqpClosed(const Monitor &monitor) { return this; } + }; /**