improved the tcp handler, added more methods to monitor whether a connection is connected, logged on and in an error state

This commit is contained in:
Emiel Bruijntjes 2018-11-05 16:49:55 +01:00
parent bb417e89c2
commit 54049f9e8e
5 changed files with 141 additions and 41 deletions

View File

@ -42,10 +42,8 @@ private:
* The state of the TCP connection - this state objecs changes based on
* the state of the connection (resolving, connected or closed)
* @var std::unique_ptr<TcpState>
*
* @todo why is this a shared pointer?
*/
std::shared_ptr<TcpState> _state;
std::unique_ptr<TcpState> _state;
/**
* The underlying AMQP connection
@ -92,10 +90,8 @@ private:
*/
virtual void onConnected(Connection *connection) override
{
// @todo we may need this, because from this moment on we can pass an onClosed()
// pass on to the handler
_handler->onConnected(this);
_handler->onReady(this);
}
/**
@ -103,6 +99,28 @@ private:
* @param connection The connection that was closed and that is now unusable
*/
virtual void onClosed(Connection *connection) override;
/**
* Method that is called when the tcp connection has been established
* @param state
*/
virtual void onConnected(TcpState *state) override
{
// pass on to the handler
_handler->onConnected(this);
}
/**
* Method that is called when the connection is secured
* @param state
* @param ssl
* @return bool
*/
virtual bool onSecured(TcpState *state, const SSL *ssl) override
{
// pass on to user-space
return _handler->onSecured(this, ssl);
}
/**
* Method to be called when data was received
@ -116,18 +134,6 @@ private:
return _connection.parse(buffer);
}
/**
* Method that is called when the connection is secured
* @param state
* @param ssl
* @return bool
*/
virtual bool onSecured(TcpState *state, const SSL *ssl) override
{
// pass on to user-space
return _handler->onSecured(this, ssl);
}
/**
* Method to be called when we need to monitor a different filedescriptor
* @param state
@ -140,6 +146,14 @@ private:
return _handler->monitor(this, socket, events);
}
/**
* Method that is called when an error occurs (the connection is lost)
* @param state
* @param error
* @param connected
*/
virtual void onError(TcpState *state, const char *message, bool connected) override;
/**
* Method to be called when it is detected that the connection was closed
* @param state
@ -164,6 +178,12 @@ public:
*/
TcpConnection(TcpHandler *handler, const Address &address);
/**
* No copying
* @param that
*/
TcpConnection(const TcpConnection &that) = delete;
/**
* Destructor
*/

View File

@ -38,8 +38,6 @@ public:
/**
* Method that is called immediately after a connection has been constructed.
* @param connection The connection object that was just constructed
*
* @see ConnectionHandler::onAttached
*/
virtual void onAttached(TcpConnection *connection)
{
@ -48,12 +46,14 @@ public:
}
/**
* Method that is called right before a connection object is destructed.
* @param connection The connection that is being destructed
*
* @see ConnectionHandler::onDetached
* Method that is called when the TCP connection ends up in a connected state
* This method is called after the TCP connection has been set up, but before
* the (optional) secure TLS connection is ready, and before the AMQP login
* handshake has been completed. If this step has been set, the onClosed()
* method will also always be called when the connection is closed.
* @param connection The TCP connection
*/
virtual void onDetached(TcpConnection *connection)
virtual void onConnected(TcpConnection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
@ -107,12 +107,11 @@ public:
}
/**
* Method that is called when the AMQP connection ends up in a connected state
* This method is called after the TCP connection has been set up, the (optional)
* secure TLS connection, and the AMQP login handshake has been completed.
* Method that is called after the AMQP login handshake has been completed
* and the connection object is ready for sending out actual AMQP instructions
* @param connection The TCP connection
*/
virtual void onConnected(TcpConnection *connection)
virtual void onReady(TcpConnection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
@ -121,7 +120,6 @@ public:
/**
* Method that is called when the server sends a heartbeat to the client
* @param connection The connection over which the heartbeat was received
*
* @see ConnectionHandler::onHeartbeat
*/
virtual void onHeartbeat(TcpConnection *connection)
@ -131,7 +129,11 @@ public:
}
/**
* Method that is called when the TCP connection ends up in an error state
* Method that is called when the connection ends up in an error state
* This could either be an error at the AMQP level, but could also
* be an error at the TCP of SSL level (like a broken connection).
* If the connection is connected (the onConnected() method was called
* before), the onClosed() method is going to be called too.
* @param connection The TCP connection
* @param message Error message
*/
@ -143,7 +145,8 @@ public:
}
/**
* Method that is called when the TCP connection is closed
* Method that is called when the TCP connection is closed. This method
* is always called if you have also received a call to onConnected().
* @param connection The TCP connection
*/
virtual void onClosed(TcpConnection *connection)
@ -152,6 +155,18 @@ public:
(void) connection;
}
/**
* Method that is called when the handler will no longer be notified.
* This is the last call to your handler, and it is typically used
* to clean up stuff.
* @param connection The connection that is being destructed
*/
virtual void onDetached(TcpConnection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}
/**
* Monitor a filedescriptor for readability or writability
*

View File

@ -40,14 +40,13 @@ public:
*/
virtual ~TcpParent() = default;
/**
* Method to be called when data was received
* Method that is called when the TCP connection has been established
* @param state
* @param buffer
* @return size_t
*/
virtual size_t onReceived(TcpState *state, const Buffer &buffer) = 0;
virtual void onConnected(TcpState *state) = 0;
/**
* Method that is called when the connection is secured
* @param state
@ -55,6 +54,14 @@ public:
* @return bool
*/
virtual bool onSecured(TcpState *state, const SSL *ssl) = 0;
/**
* Method to be called when data was received
* @param state
* @param buffer
* @return size_t
*/
virtual size_t onReceived(TcpState *state, const Buffer &buffer) = 0;
/**
* Method to be called when we need to monitor a different filedescriptor
@ -68,8 +75,9 @@ public:
* Method that is called when an error occurs (the connection is lost)
* @param state
* @param error
* @param connected
*/
virtual void onError(TcpState *state, const char *message) = 0;
virtual void onError(TcpState *state, const char *message, bool connected = true) = 0;
/**
* Method to be called when it is detected that the connection was nicely closed

View File

@ -27,7 +27,11 @@ namespace AMQP {
TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) :
_handler(handler),
_state(new TcpResolver(this, address.hostname(), address.port(), address.secure())),
_connection(this, address.login(), address.vhost()) {}
_connection(this, address.login(), address.vhost())
{
// tell the handler
_handler->onAttached(this);
}
/**
* Destructor
@ -167,7 +171,7 @@ void TcpConnection::onData(Connection *connection, const char *buffer, size_t si
}
/**
* Method called when the connection ends up in an error state
* Method called when the AMQP connection ends up in an error state
* @param connection The connection that entered the error state
* @param message Error message
*/
@ -176,6 +180,9 @@ void TcpConnection::onError(Connection *connection, const char *message)
// monitor to check if "this" is destructed
Monitor monitor(this);
// tell this to the user
_handler->onError(this, message);
// remember the old state (this is necessary because _state may be modified by user-code)
auto *oldstate = _state.get();
@ -211,6 +218,50 @@ void TcpConnection::onClosed(Connection *connection)
_state.reset(newstate);
}
/**
* Method that is called when an error occurs (the connection is lost)
* @param state
* @param error
* @param connected
*/
void TcpConnection::onError(TcpState *state, const char *message, bool connected)
{
// if the object is still connected, we only have to report the error and
// we wait for the subsequent call to the onClosed() method
if (connected) return _handler->onError(this, message);
// monitor to check if "this" is destructed
Monitor monitor(this);
// tell the handler
_handler->onError(this, message);
// leap out if object was destructed
if (!monitor.valid()) return;
// tell the handler that no further events will be fired
_handler->onDetached(this);
}
/**
* Method to be called when it is detected that the connection was closed
* @param state
*/
void TcpConnection::onClosed(TcpState *state)
{
// monitor to check if "this" is destructed
Monitor monitor(this);
// tell the handler
_handler->onClosed(this);
// leap out if object was destructed
if (!monitor.valid()) return;
// tell the handler that no further events will be fired
_handler->onDetached(this);
}
/**
* End of namespace
*/

View File

@ -192,6 +192,12 @@ public:
// do we have a valid socket?
if (_socket >= 0)
{
// report that the network-layer is connected
_parent->onConnected(this);
// handler callback might have destroyed connection
if (!monitor.valid()) return nullptr;
// if we need a secure connection, we move to the tls handshake
// @todo catch possible exception
if (_secure) return new SslHandshake(this, _hostname, std::move(_buffer));
@ -202,7 +208,7 @@ public:
else
{
// report error
_parent->onError(this, _error.data());
_parent->onError(this, _error.data(), false);
// handler callback might have destroyed connection
if (!monitor.valid()) return nullptr;