work in progress on refactored tcp handling, to solve various issues, like the one that lost connections do not trigger operations to fail

This commit is contained in:
Emiel Bruijntjes 2018-11-04 23:34:31 +01:00
parent 3f32e8773d
commit b81bc340b5
21 changed files with 943 additions and 612 deletions

View File

@ -1,7 +1,7 @@
/**
* Class describing a mid-level Amqp connection
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2018 Copernica BV
*/
/**
@ -147,6 +147,27 @@ public:
return _implementation.parse(buffer);
}
/**
* Report that the connection was lost in the middle of an operation
*
* The AMQP protocol normally has a nice closing handshake, and a connection
* is elegantly closed via calls to the close() and parse() methods. The parse()
* methods recognizes the close-confirmation and will report this to the handler.
* However, if you notice yourself that the connection is lost in the middle of
* an operation (for example due to a crashing RabbitMQ server), you should
* explicitly tell the connection object about it, so that it can cancel all
* pending operations. For all pending operations the error and finalize callbacks
* will be called. The ConnectionHandler::onError() method will however _not_ be
* called.
*
* @param message the message that has to be passed to all error handlers
* @return bool
*/
bool fail(const char *message)
{
return _implementation.fail(message);
}
/**
* Max frame size
*

View File

@ -5,7 +5,7 @@
* constructed by the connection class itselves and that has all sorts of
* methods that are only useful inside the library
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2018 Copernica BV
*/
/**
@ -70,7 +70,8 @@ protected:
} _state = state_protocol;
/**
* Has the close() method been called?
* Has the close() method been called? If this is true, we automatically
* send a close-frame after all pending operations are finsihed.
* @var bool
*/
bool _closed = false;
@ -148,6 +149,14 @@ protected:
* @return bool
*/
bool waiting() const;
/**
* Helper method for the fail() method
* @param monitor
* @param message
* @return bool
*/
bool fail(const Monitor &monitor, const char *message);
private:
/**
@ -326,9 +335,18 @@ public:
*/
uint64_t parse(const Buffer &buffer);
/**
* Fail all pending - this can be called by user-space when it is recognized that the
* underlying connection is lost. All error-handlers for all operations and open
* channels will be called. This will _not_ call ConnectionHandler::onError() method.
*
* @return bool
*/
bool fail(const char *message);
/**
* Close the connection
* This will close all channels
* This will also close all channels
* @return bool
*/
bool close();
@ -370,27 +388,7 @@ public:
* Report an error message
* @param message
*/
void reportError(const char *message)
{
// set connection state to closed
_state = state_closed;
// monitor because every callback could invalidate the connection
Monitor monitor(this);
// all deferred result objects in the channels should report this error too
while (!_channels.empty())
{
// report the errors
_channels.begin()->second->reportError(message, false);
// leap out if no longer valid
if (!monitor.valid()) return;
}
// inform handler
_handler->onError(_parent, message);
}
void reportError(const char *message);
/**
* Report that the connection is closed

View File

@ -1,3 +1,4 @@
#include "linux_tcp/tcpparent.h"
#include "linux_tcp/tcphandler.h"
#include "linux_tcp/tcpconnection.h"
#include "linux_tcp/tcpchannel.h"

View File

@ -28,13 +28,22 @@ class TcpState;
*/
class TcpConnection :
private ConnectionHandler,
private Watchable
private Watchable,
private TcpParent
{
private:
/**
* User-space handler object
* @var TcpHandler
*/
TcpHandler *_handler;
/**
* The state of the TCP connection - this state objecs changes based on
* the state of the connection (resolving, connected or closed)
* @var std::unique_ptr<TcpState>
*
* @todo why is this a shared pointer?
*/
std::shared_ptr<TcpState> _state;
@ -48,13 +57,21 @@ private:
* Method that is called after the connection was constructed
* @param connection The connection that was attached to the handler
*/
virtual void onAttached(Connection *connection) override;
virtual void onAttached(Connection *connection) override
{
// pass on to the handler
_handler->onAttached(this);
}
/**
* Method that is called when the connection is destructed
* @param connection The connection that was detached from the handler
*/
virtual void onDetached(Connection *connection) override;
virtual void onDetached(Connection *connection) override
{
// pass on to the handler
_handler->onDetached(this);
}
/**
* Method that is called when the heartbeat frequency is negotiated.
@ -76,7 +93,11 @@ private:
* Method that is called when the server sends a heartbeat to the client
* @param connection The connection over which the heartbeat was received
*/
virtual void onHeartbeat(Connection *connection) override;
virtual void onHeartbeat(Connection *connection) override
{
// pass on to tcp handler
_handler->onHeartbeat(this);
}
/**
* Method called when the connection ends up in an error state
@ -89,7 +110,13 @@ private:
* Method that is called when the connection is established
* @param connection The connection that can now be used
*/
virtual void onConnected(Connection *connection) override;
virtual void onConnected(Connection *connection) override
{
// @todo we may need this, because from this moment on we can pass an onClosed()
// pass on to the handler
_handler->onConnected(this);
}
/**
* Method that is called when the connection was closed.
@ -98,23 +125,57 @@ private:
virtual void onClosed(Connection *connection) override;
/**
* Parse a buffer that was received
* Method to be called when data was received
* @param state
* @param buffer
* @return size_t
*/
uint64_t parse(const Buffer &buffer)
virtual size_t onReceived(TcpState *state, const Buffer &buffer) override
{
// pass to the AMQP connection
// pass on to the connection
return _connection.parse(buffer);
}
/**
* Method that is called when the connection is secured
* @param state
* @param ssl
* @return bool
*/
virtual bool onSecured(TcpState *state, const SSL *ssl) override
{
// pass on to user-space
return _handler->onSecured(this, ssl);
}
/**
* Method to be called when we need to monitor a different filedescriptor
* @param state
* @param fd
* @param events
*/
virtual void onIdle(TcpState *state, int socket, int events) override
{
// pass on to user-space
return _handler->monitor(this, socket, events);
}
/**
* Classes that have access to private data
* Method to be called when it is detected that the connection was closed
* @param state
*/
friend class SslConnected;
friend class TcpConnected;
friend class TcpChannel;
virtual void onClosed(TcpState *state) override;
/**
* The expected number of bytes
* @return size_t
*/
virtual size_t expected() override
{
// pass on to the connection
return _connection.expected();
}
public:
/**
* Constructor

View File

@ -14,11 +14,6 @@
*/
#pragma once
/**
* Dependencies
*/
#include <openssl/ssl.h>
/**
* Set up namespace
*/
@ -82,6 +77,10 @@ public:
*/
virtual bool onSecured(TcpConnection *connection, const SSL *ssl)
{
// make sure compilers dont complain about unused parameters
(void) connection;
(void) ssl;
// default implementation: do not inspect anything, just allow the connection
return true;
}
@ -99,6 +98,10 @@ public:
*/
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval)
{
// make sure compilers dont complain about unused parameters
(void) connection;
(void) interval;
// default implementation, suggested heartbeat is ok
return interval;
}
@ -109,7 +112,11 @@ public:
* secure TLS connection, and the AMQP login handshake has been completed.
* @param connection The TCP connection
*/
virtual void onConnected(TcpConnection *connection) {}
virtual void onConnected(TcpConnection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}
/**
* Method that is called when the server sends a heartbeat to the client
@ -117,20 +124,33 @@ public:
*
* @see ConnectionHandler::onHeartbeat
*/
virtual void onHeartbeat(TcpConnection *connection) {}
virtual void onHeartbeat(TcpConnection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}
/**
* Method that is called when the TCP connection ends up in an error state
* @param connection The TCP connection
* @param message Error message
*/
virtual void onError(TcpConnection *connection, const char *message) {}
virtual void onError(TcpConnection *connection, const char *message)
{
// make sure compilers dont complain about unused parameters
(void) connection;
(void) message;
}
/**
* Method that is called when the TCP connection is closed
* @param connection The TCP connection
*/
virtual void onClosed(TcpConnection *connection) {}
virtual void onClosed(TcpConnection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}
/**
* Monitor a filedescriptor for readability or writability

View File

@ -0,0 +1,90 @@
/**
* TcpParent.h
*
* Interface to be implemented by the parent of a tcp-state. This is
* an _internal_ interface that is not relevant for user-space applications.
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include <openssl/ssl.h>
/**
* Begin of namespace
*/
namespace AMQP {
/**
* Forward declarations
*/
class TcpState;
class Buffer;
/**
* Class definition
*/
class TcpParent
{
public:
/**
* Destructor
*/
virtual ~TcpParent() = default;
/**
* Method to be called when data was received
* @param state
* @param buffer
* @return size_t
*/
virtual size_t onReceived(TcpState *state, const Buffer &buffer) = 0;
/**
* Method that is called when the connection is secured
* @param state
* @param ssl
* @return bool
*/
virtual bool onSecured(TcpState *state, const SSL *ssl) = 0;
/**
* Method to be called when we need to monitor a different filedescriptor
* @param state
* @param fd
* @param events
*/
virtual void onIdle(TcpState *state, int socket, int events) = 0;
/**
* Method that is called when an error occurs (the connection is lost then)
* @param state
* @param error
*/
virtual void onError(TcpState *state, const char *message) = 0;
/**
* Method to be called when it is detected that the connection was nicely closed
* @param state
*/
virtual void onClosed(TcpState *state) = 0;
/**
* The expected number of bytes
* @return size_t
*/
virtual size_t expected() = 0;
};
/**
* End of namespace
*/
}

View File

@ -1,7 +1,7 @@
/**
* Class describing a channel close frame
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2018 Copernica BV
*/
/**
@ -81,10 +81,10 @@ public:
* @param failingClass failing class id if applicable
* @param failingMethod failing method id if applicable
*/
ChannelCloseFrame(uint16_t channel, uint16_t code = 0, const std::string& text = "", uint16_t failingClass = 0, uint16_t failingMethod = 0) :
ChannelCloseFrame(uint16_t channel, uint16_t code = 0, std::string text = "", uint16_t failingClass = 0, uint16_t failingMethod = 0) :
ChannelFrame(channel, (uint32_t)(text.length() + 7)), // sizeof code, failingclass, failingmethod (2byte + 2byte + 2byte) + text length + text length byte
_code(code),
_text(text),
_text(std::move(text)),
_failingClass(failingClass),
_failingMethod(failingMethod)
{}
@ -92,7 +92,7 @@ public:
/**
* Destructor
*/
virtual ~ChannelCloseFrame() {}
virtual ~ChannelCloseFrame() = default;
/**
* Method id

View File

@ -1,7 +1,7 @@
/**
* Class describing connection close frame
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2018 Copernica BV
*/
/**
@ -81,10 +81,10 @@ public:
* @param failingClass id of the failing class if applicable
* @param failingMethod id of the failing method if applicable
*/
ConnectionCloseFrame(uint16_t code, const std::string &text, uint16_t failingClass = 0, uint16_t failingMethod = 0) :
ConnectionCloseFrame(uint16_t code, std::string text, uint16_t failingClass = 0, uint16_t failingMethod = 0) :
ConnectionFrame((uint32_t)(text.length() + 7)), // 1 for extra string byte, 2 for each uint16
_code(code),
_text(text),
_text(std::move(text)),
_failingClass(failingClass),
_failingMethod(failingMethod)
{}

View File

@ -186,6 +186,64 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer)
return processed;
}
/**
* Fail all open channels, helper method
* @param monitor object to check if object still exists
* @param message error message
* @return bool does the object still exist?
*/
bool ConnectionImpl::fail(const Monitor &monitor, const char *message)
{
// all deferred result objects in the channels should report this error too
while (!_channels.empty())
{
// report the errors
_channels.begin()->second->reportError(message, false);
// leap out if no longer valid
if (!monitor.valid()) return false;
}
// done
return true;
}
/**
* Fail the connection / report that the connection is lost
* @param message
* @return bool
*/
bool ConnectionImpl::fail(const char *message)
{
// if already closed
if (_state == state_closed) return false;
// from now on we consider the connection to be closed
_state = state_closed;
// monitor because every callback could invalidate the connection
fail(Monitor(this), message);
// done
return true;
}
/**
* Report an error to user-space
* @param message the error message
*/
void ConnectionImpl::reportError(const char *message)
{
// monitor because every callback could invalidate the connection
Monitor monitor(this);
// fail all operations
if (!fail(monitor, message)) return;
// inform handler
_handler->onError(_parent, message);
}
/**
* Close the connection
* This will close all channels
@ -194,7 +252,7 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer)
bool ConnectionImpl::close()
{
// leap out if already closed or closing
if (_closed) return false;
if (_closed || _state == state_closed) return false;
// mark that the object is closed
_closed = true;

View File

@ -20,6 +20,7 @@
#include "amqpcpp/linux_tcp/tcpdefines.h"
// mid level includes
#include "amqpcpp/linux_tcp/tcpparent.h"
#include "amqpcpp/linux_tcp/tcphandler.h"
#include "amqpcpp/linux_tcp/tcpconnection.h"

View File

@ -28,7 +28,7 @@ namespace AMQP {
/**
* Class definition
*/
class SslConnected : public TcpState, private Watchable
class SslConnected : public TcpExtState
{
private:
/**
@ -37,12 +37,6 @@ private:
*/
SslWrapper _ssl;
/**
* Socket file descriptor
* @var int
*/
int _socket;
/**
* The outgoing buffer
* @var TcpBuffer
@ -71,12 +65,6 @@ private:
*/
bool _closed = false;
/**
* Have we reported the final instruction to the user?
* @var bool
*/
bool _finalized = false;
/**
* Cached reallocation instruction
* @var size_t
@ -84,43 +72,6 @@ private:
size_t _reallocate = 0;
/**
* Close the connection
* @return bool
*/
bool close()
{
// do nothing if already closed
if (_socket < 0) return false;
// and stop monitoring it
_handler->monitor(_connection, _socket, 0);
// close the socket
::close(_socket);
// forget filedescriptor
_socket = -1;
// done
return true;
}
/**
* Construct the final state
* @param monitor Object that monitors whether connection still exists
* @return TcpState*
*/
TcpState *finalstate(const Monitor &monitor)
{
// close the socket if it is still open
close();
// if the object is still in a valid state, we can move to the close-state,
// otherwise there is no point in moving to a next state
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
/**
* Proceed with the next operation after the previous operation was
* a success, possibly changing the filedescriptor-monitor
@ -132,23 +83,17 @@ private:
if (_out)
{
// let's wait until the socket becomes writable
_handler->monitor(_connection, _socket, readable | writable);
_parent->onIdle(this, _socket, readable | writable);
}
else if (_closed)
{
// start the state that closes the connection
auto *nextstate = new SslShutdown(_connection, _socket, std::move(_ssl), _finalized, _handler);
// we forget the current socket to prevent that it gets destructed
_socket = -1;
// report the next state
return nextstate;
return new SslShutdown(this, std::move(_ssl));
}
else
{
// let's wait until the socket becomes readable
_handler->monitor(_connection, _socket, readable);
_parent->onIdle(this, _socket, readable);
}
// done
@ -171,7 +116,7 @@ private:
_state = state;
// the operation must be repeated when readable
_handler->monitor(_connection, _socket, readable);
_parent->onIdle(this, _socket, readable);
// allow chaining
return monitor.valid() ? this : nullptr;
@ -181,7 +126,7 @@ private:
_state = state;
// wait until socket becomes writable again
_handler->monitor(_connection, _socket, readable | writable);
_parent->onIdle(this, _socket, readable | writable);
// allow chaining
return monitor.valid() ? this : nullptr;
@ -191,23 +136,17 @@ private:
_state = state_idle;
// turns out no error occured, an no action has to be rescheduled
_handler->monitor(_connection, _socket, _out || _closed ? readable | writable : readable);
_parent->onIdle(this, _socket, _out || _closed ? readable | writable : readable);
// allow chaining
return monitor.valid() ? this : nullptr;
default:
// if we have already reported an error to user space, we can go to the final state right away
if (_finalized) return finalstate(monitor);
// remember that we've sent out an error
_finalized = true;
// tell the handler
_handler->onError(_connection, "ssl error");
// go to the final state
return finalstate(monitor);
// @todo report an error to all channels
// ssl level error, we have to tear down the tcp connection
return monitor.valid() ? new TcpShutdown(this) : nullptr;
}
}
@ -224,7 +163,7 @@ private:
TcpInBuffer buffer(std::move(_in));
// parse the buffer
auto processed = _connection->parse(buffer);
auto processed = _parent->onReceived(this, buffer);
// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;
@ -329,7 +268,7 @@ private:
_state = state_idle;
// read data from ssl into the buffer
auto result = _in.receivefrom(_ssl, _connection->expected());
auto result = _in.receivefrom(_ssl, _parent->expected());
// if this is a failure, we are going to repeat the operation
if (result <= 0) return repeat(monitor, state_receiving, OpenSSL::SSL_get_error(_ssl, result));
@ -350,31 +289,34 @@ private:
public:
/**
* Constructor
* @param connection Parent TCP connection object
* @param socket The socket filedescriptor
* @param state The previous state
* @param ssl The SSL structure
* @param buffer The buffer that was already built
* @param handler User-supplied handler object
*/
SslConnected(TcpConnection *connection, int socket, SslWrapper &&ssl, TcpOutBuffer &&buffer, TcpHandler *handler) :
TcpState(connection, handler),
SslConnected(TcpExtState *state, SslWrapper &&ssl, TcpOutBuffer &&buffer) :
TcpExtState(state),
_ssl(std::move(ssl)),
_socket(socket),
_out(std::move(buffer)),
_in(4096),
_state(_out ? state_sending : state_idle)
{
// tell the handler to monitor the socket if there is an out
_handler->monitor(_connection, _socket, _state == state_sending ? readable | writable : readable);
_parent->onIdle(this, _socket, _state == state_sending ? readable | writable : readable);
}
/**
* Destructor
* Destructor
*/
virtual ~SslConnected() noexcept
{
// no cleanup if socket is gone
if (_socket < 0) return;
// and stop monitoring it
_parent->onIdle(this, _socket, 0);
// close the socket
close();
::close(_socket);
}
/**
@ -426,7 +368,7 @@ public:
*/
virtual TcpState *flush(const Monitor &monitor) override
{
// we are not going to do this is object is busy reading
// we are not going to do this if object is busy reading
if (_state == state_receiving) return this;
// create an object to wait for the filedescriptor to becomes active
@ -484,17 +426,12 @@ public:
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// if we have already told user space that connection is gone
if (_finalized) return new TcpClosed(this);
// close the connection right now
if (!close(monitor)) return nullptr;
// object will be finalized now
_finalized = true;
// close the connection
close();
// inform user space that the party is over
_handler->onError(_connection, "ssl connection terminated");
// tell the connection that it failed (this eventually ends up in our reportError() method)
// @todo to we indeed need this?
//_connection->fail();
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;
@ -515,56 +452,59 @@ public:
if (_state != state_idle) return;
// let's wait until the socket becomes writable
_handler->monitor(_connection, _socket, readable | writable);
_parent->onIdle(this, _socket, readable | writable);
}
/**
* Report that heartbeat negotiation is going on
* @param heartbeat suggested heartbeat
* @return uint16_t accepted heartbeat
* When the AMQP transport layer is closed
* @param monitor Object that can be used if connection is still alive
* @return TcpState New implementation object
*/
virtual uint16_t reportNegotiate(uint16_t heartbeat) override
{
// remember that we have to reallocate (_in member can not be accessed because it is moved away)
_reallocate = _connection->maxFrame();
// pass to base
return TcpState::reportNegotiate(heartbeat);
}
/**
* Report a connection error
* @param error
*/
virtual void reportError(const char *error) override
{
// we want to start the elegant ssl shutdown procedure, so we call reportClosed() here too,
// because that function does exactly what we want to do here too
reportClosed();
// if the user was already notified of an final state, we do not have to proceed
if (_finalized) return;
// remember that this is the final call to user space
_finalized = true;
// pass to handler
_handler->onError(_connection, error);
}
/**
* Report to the handler that the connection was nicely closed
*/
virtual void reportClosed() override
virtual TcpState *onAmqpClosed(const Monitor &monitor) override
{
// remember that the object is going to be closed
_closed = true;
// if the previous operation is still in progress we can wait for that
if (_state != state_idle) return;
if (_state != state_idle) return this;
// wait until the connection is writable so that we can close it then
_handler->monitor(_connection, _socket, readable | writable);
// the connection can be closed right now, move to the next state
return new SslShutdown(this, std::move(_ssl));
}
/**
* When an error occurs in the AMQP protocol
* @param monitor Monitor that can be used to check if the connection is still alive
* @param message The error message
* @return TcpState New implementation object
*/
virtual TcpState *onAmqpError(const Monitor &monitor, const char *message) override
{
// tell the user about it
// @todo do we need this here?
//_handler->onError(_connection, message);
// stop if the object was destructed
if (!monitor.valid()) return nullptr;
// remember that the object is going to be closed
_closed = true;
// if the previous operation is still in progress we can wait for that
if (_state != state_idle) return this;
// the connection can be closed right now, move to the next state
return new SslShutdown(this, std::move(_ssl));
}
/**
* Install max-frame size
* @param heartbeat suggested heartbeat
*/
virtual void maxframe(size_t maxframe) override
{
// remember that we have to reallocate (_in member can not be accessed because it is moved away)
_reallocate = maxframe;
}
};

View File

@ -29,7 +29,7 @@ namespace AMQP {
/**
* Class definition
*/
class SslHandshake : public TcpState, private Watchable
class SslHandshake : public TcpExtState
{
private:
/**
@ -38,12 +38,6 @@ private:
*/
SslWrapper _ssl;
/**
* The socket file descriptor
* @var int
*/
int _socket;
/**
* The outgoing buffer
* @var TcpOutBuffer
@ -61,7 +55,7 @@ private:
if (_socket < 0) return false;
// and stop monitoring it
_handler->monitor(_connection, _socket, 0);
_parent->onIdle(this, _socket, 0);
// close the socket
::close(_socket);
@ -81,28 +75,30 @@ private:
TcpState *nextstate(const Monitor &monitor)
{
// check if the handler allows the connection
bool allowed = _handler->onSecured(_connection, _ssl);
bool allowed = _parent->onSecured(this, _ssl);
// leap out if the user space function destructed the object
if (!monitor.valid()) return nullptr;
// copy the socket because we might forget it
auto socket = _socket;
// auto socket = _socket;
// forget the socket member to prevent that it is closed by the destructor
_socket = -1;
// if connection is allowed, we move to the next state
if (allowed) return new SslConnected(_connection, socket, std::move(_ssl), std::move(_out), _handler);
if (allowed) return new SslConnected(this, std::move(_ssl), std::move(_out));
// report that the connection is broken
_handler->onError(_connection, "TLS connection has been rejected");
// @todo do we need this?
//_handler->onError(_connection, "TLS connection has been rejected");
// the onError method could have destructed this object
if (!monitor.valid()) return nullptr;
// shutdown the connection
return new SslShutdown(_connection, socket, std::move(_ssl), true, _handler);
// @todo the onClosed() does not have to be called
return new SslShutdown(this, std::move(_ssl));
}
/**
@ -116,7 +112,8 @@ private:
close();
// we have an error - report this to the user
_handler->onError(_connection, "failed to setup ssl connection");
// @todo do we need this?
//_handler->onError(_connection, "failed to setup ssl connection");
// done, go to the closed state (plus check if connection still exists, because
// after the onError() call the user space program may have destructed that object)
@ -131,7 +128,7 @@ private:
TcpState *proceed(int events)
{
// tell the handler that we want to listen for certain events
_handler->monitor(_connection, _socket, events);
_parent->onIdle(this, _socket, events);
// allow chaining
return this;
@ -143,18 +140,15 @@ public:
*
* @todo catch the exception!
*
* @param connection Parent TCP connection object
* @param socket The socket filedescriptor
* @param state Earlier state
* @param hostname The hostname to connect to
* @param context SSL context
* @param buffer The buffer that was already built
* @param handler User-supplied handler object
* @throws std::runtime_error
*/
SslHandshake(TcpConnection *connection, int socket, const std::string &hostname, TcpOutBuffer &&buffer, TcpHandler *handler) :
TcpState(connection, handler),
SslHandshake(TcpExtState *state, const std::string &hostname, TcpOutBuffer &&buffer) :
TcpExtState(state),
_ssl(SslContext(OpenSSL::TLS_client_method())),
_socket(socket),
_out(std::move(buffer))
{
// we will be using the ssl context as a client
@ -164,10 +158,10 @@ public:
OpenSSL::SSL_ctrl(_ssl, SSL_CTRL_SET_TLSEXT_HOSTNAME, TLSEXT_NAMETYPE_host_name, (void *)hostname.data());
// associate the ssl context with the socket filedescriptor
if (OpenSSL::SSL_set_fd(_ssl, socket) == 0) throw std::runtime_error("failed to associate filedescriptor with ssl socket");
if (OpenSSL::SSL_set_fd(_ssl, _socket) == 0) throw std::runtime_error("failed to associate filedescriptor with ssl socket");
// we are going to wait until the socket becomes writable before we start the handshake
_handler->monitor(_connection, _socket, writable);
_parent->onIdle(this, _socket, writable);
}
/**
@ -238,7 +232,7 @@ public:
// the handshake is still busy, outgoing data must be cached
_out.add(buffer, size);
}
/**
* Flush the connection, sent all buffered data to the socket
* @param monitor Object to check if connection still exists
@ -286,7 +280,8 @@ public:
close();
// report to the user that the handshake was aborted
_handler->onError(_connection, "ssl handshake aborted");
// @todo do we need this?
//_handler->onError(_connection, "ssl handshake aborted");
// done, go to the closed state (plus check if connection still exists, because
// after the onError() call the user space program may have destructed that object)

View File

@ -20,7 +20,7 @@ namespace AMQP {
/**
* Class definition
*/
class SslShutdown : public TcpState, private Watchable
class SslShutdown : public TcpExtState
{
private:
/**
@ -28,64 +28,7 @@ private:
* @var SslWrapper
*/
SslWrapper _ssl;
/**
* Socket file descriptor
* @var int
*/
int _socket;
/**
* Have we already notified user space of connection end?
* @var bool
*/
bool _finalized;
/**
* Close the socket
* @return bool
*/
bool close()
{
// skip if already closed
if (_socket < 0) return false;
// we're no longer interested in events
_handler->monitor(_connection, _socket, 0);
// close the socket
::close(_socket);
// forget the socket
_socket = -1;
// done
return true;
}
/**
* Report an error
* @param monitor object to check if connection still exists
* @return TcpState*
*/
TcpState *reporterror(const Monitor &monitor)
{
// close the socket
close();
// if we have already told user space that connection is gone
if (_finalized) return new TcpClosed(this);
// object will be finalized now
_finalized = true;
// inform user space that the party is over
_handler->onError(_connection, "ssl shutdown error");
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
/**
* Proceed with the next operation after the previous operation was
@ -93,22 +36,10 @@ private:
* @param monitor object to check if connection still exists
* @return TcpState*
*/
TcpState *proceed(const Monitor &monitor)
virtual TcpState *proceed(const Monitor &monitor)
{
// close the socket
close();
// if we have already told user space that connection is gone
if (_finalized) return new TcpClosed(this);
// object will be finalized now
_finalized = true;
// inform user space that the party is over
_handler->onClosed(_connection);
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;
// next state is to shutdown the connection
return new TcpShutdown(this);
}
/**
@ -126,17 +57,18 @@ private:
switch (error) {
case SSL_ERROR_WANT_READ:
// the operation must be repeated when readable
_handler->monitor(_connection, _socket, readable);
_parent->onIdle(this, _socket, readable);
return this;
case SSL_ERROR_WANT_WRITE:
// wait until socket becomes writable again
_handler->monitor(_connection, _socket, readable | writable);
_parent->onIdle(this, _socket, readable | writable);
return this;
default:
// go to the final state (if not yet disconnected)
return reporterror(monitor);
// @todo special treatment for ssl-protocol errors
return proceed(monitor);
}
}
@ -144,37 +76,28 @@ private:
public:
/**
* Constructor
* @param connection Parent TCP connection object
* @param socket The socket filedescriptor
* @param state Previous state
* @param ssl The SSL structure
* @param finalized Is the user already notified of connection end (onError() has been called)
* @param handler User-supplied handler object
*/
SslShutdown(TcpConnection *connection, int socket, SslWrapper &&ssl, bool finalized, TcpHandler *handler) :
TcpState(connection, handler),
_ssl(std::move(ssl)),
_socket(socket),
_finalized(finalized)
SslShutdown(TcpExtState *state, SslWrapper &&ssl) :
TcpExtState(state),
_ssl(std::move(ssl))
{
// wait until the socket is accessible
_handler->monitor(_connection, _socket, readable | writable);
}
_parent->onIdle(this, _socket, readable | writable);
}
/**
* No copying
* @param that
*/
SslShutdown(const SslShutdown &that) = delete;
/**
* Destructor
*/
virtual ~SslShutdown() noexcept
{
// close the socket
close();
}
virtual ~SslShutdown() noexcept = default;
/**
* The filedescriptor of this connection
* @return int
*/
virtual int fileno() const override { return _socket; }
/**
* Process the filedescriptor in the object
* @param monitor Object to check if connection still exists
@ -211,6 +134,8 @@ public:
*/
virtual TcpState *flush(const Monitor &monitor) override
{
// @todo do we even need this? isn't flushing reserved for data?
// create an object to wait for the filedescriptor to becomes active
Poll poll(_socket);
@ -238,29 +163,25 @@ public:
case SSL_ERROR_WANT_WRITE: poll.active(true); break;
// something is wrong, we proceed to the next state
default: return reporterror(monitor);
default: return proceed(monitor);
}
}
}
/**
* Abort the shutdown operation
* Abort the shutdown operation immediately
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @return TcpState
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// if we have already told user space that connection is gone
if (_finalized) return new TcpClosed(this);
// cleanup the connection
// @todo this also calls onClosed()
cleanup();
// object will be finalized now
_finalized = true;
// close the connection
close();
// inform user space that the party is over
_handler->onError(_connection, "ssl shutdown aborted");
// report to user-space that the ssl shutdown was aborted
// @todo
//_handler->onError(_connection, "ssl shutdown aborted");
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;

View File

@ -4,7 +4,7 @@
* Class that is used when the TCP connection ends up in a closed state
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 - 2016 Copernica BV
* @copyright 2015 - 2018 Copernica BV
*/
/**
@ -25,11 +25,10 @@ class TcpClosed : public TcpState
public:
/**
* Constructor
* @param connection The parent TcpConnection object
* @param handler User supplied handler
* @param parent The parent object
*/
TcpClosed(TcpConnection *connection, TcpHandler *handler) :
TcpState(connection, handler) {}
TcpClosed(TcpParent *parent) :
TcpState(parent) {}
/**
* Constructor
@ -42,7 +41,7 @@ public:
* Destructor
*/
virtual ~TcpClosed() noexcept = default;
/**
* Abort the operation
* @param monitor Monitor that can be used to check if the tcp connection is still alive

View File

@ -18,6 +18,7 @@
*/
#include "tcpoutbuffer.h"
#include "tcpinbuffer.h"
#include "tcpshutdown.h"
#include "poll.h"
/**
@ -28,15 +29,9 @@ namespace AMQP {
/**
* Class definition
*/
class TcpConnected : public TcpState, private Watchable
class TcpConnected : public TcpExtState
{
private:
/**
* The socket file descriptor
* @var int
*/
int _socket;
/**
* The outgoing buffer
* @var TcpOutBuffer
@ -63,48 +58,36 @@ private:
/**
* Close the connection
* @return bool
* Start an elegant shutdown
*
* @todo remove this method
*/
bool close()
void shutdown2()
{
// do nothing if already closed
if (_socket < 0) return false;
// we will shutdown the socket in a very elegant way, we notify the peer
// that we will not be sending out more write operations
::shutdown(_socket, SHUT_WR);
// and stop monitoring it
_handler->monitor(_connection, _socket, 0);
// close the socket
::close(_socket);
// forget filedescriptor
_socket = -1;
// done
return true;
// we still monitor the socket for readability to see if our close call was
// confirmed by the peer
_parent->onIdle(this, _socket, readable);
}
/**
* Helper method to report an error
* @param monitor Monitor to check validity of "this"
* @return bool Was an error reported?
*/
bool reportError()
bool reportError(const Monitor &monitor)
{
// some errors are ok and do not (necessarily) mean that we're disconnected
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false;
// connection can be closed now
close();
// if the user has already been notified, we do not have to do anything else
if (_finalized) return true;
// update the _finalized member before we make the call to user space because
// the user space may destruct this object
_finalized = true;
// we have an error - report this to the user
_handler->onError(_connection, strerror(errno));
// tell the connection that it failed
// @todo we should report an error, but that could be wrong, because it calls back to us
// we're no longer interested in the socket (this also calls onClosed())
cleanup();
// done
return true;
@ -125,14 +108,11 @@ private:
public:
/**
* Constructor
* @param connection Parent TCP connection object
* @param socket The socket filedescriptor
* @param state The previous state
* @param buffer The buffer that was already built
* @param handler User-supplied handler object
*/
TcpConnected(TcpConnection *connection, int socket, TcpOutBuffer &&buffer, TcpHandler *handler) :
TcpState(connection, handler),
_socket(socket),
TcpConnected(TcpExtState *state, TcpOutBuffer &&buffer) :
TcpExtState(state),
_out(std::move(buffer)),
_in(4096)
{
@ -140,17 +120,13 @@ public:
if (_out) _out.sendto(_socket);
// tell the handler to monitor the socket, if there is an out
_handler->monitor(_connection, _socket, _out ? readable | writable : readable);
_parent->onIdle(this, _socket, _out ? readable | writable : readable);
}
/**
* Destructor
*/
virtual ~TcpConnected() noexcept
{
// close the socket
close();
}
virtual ~TcpConnected() noexcept = default;
/**
* The filedescriptor of this connection
@ -183,28 +159,30 @@ public:
auto result = _out.sendto(_socket);
// are we in an error state?
if (result < 0 && reportError()) return nextState(monitor);
if (result < 0 && reportError(monitor)) return nextState(monitor);
// if buffer is empty by now, we no longer have to check for
// writability, but only for readability
if (!_out) _handler->monitor(_connection, _socket, readable);
if (!_out) _parent->onIdle(this, _socket, readable);
}
// should we check for readability too?
if (flags & readable)
{
// read data from buffer
ssize_t result = _in.receivefrom(_socket, _connection->expected());
ssize_t result = _in.receivefrom(_socket, _parent->expected());
// are we in an error state?
if (result < 0 && reportError()) return nextState(monitor);
if (result < 0 && reportError(monitor)) return nextState(monitor);
// @todo should we also check for result == 0
// we need a local copy of the buffer - because it is possible that "this"
// object gets destructed halfway through the call to the parse() method
TcpInBuffer buffer(std::move(_in));
// parse the buffer
auto processed = _connection->parse(buffer);
auto processed = _parent->onReceived(this, buffer);
// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;
@ -249,7 +227,7 @@ public:
_out.add(buffer + bytes, size - bytes);
// start monitoring the socket to find out when it is writable
_handler->monitor(_connection, _socket, readable | writable);
_parent->onIdle(this, _socket, readable | writable);
}
/**
@ -286,77 +264,55 @@ public:
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// if we have already told user space that connection is gone
if (_finalized) return new TcpClosed(this);
// close the connection right now
if (!close(monitor)) return nullptr;
// object will be finalized now
_finalized = true;
// close the connection
close();
// inform user space that the party is over
_handler->onError(_connection, "tcp connection terminated");
// fail the connection (this ends up in our reportError() method)
// @todo should this not happen
//_connection->fail();
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
/**
* Report that heartbeat negotiation is going on
* @param heartbeat suggested heartbeat
* @return uint16_t accepted heartbeat
* When the AMQP transport layer is closed
* @param monitor Object that can be used if connection is still alive
* @return TcpState New implementation object
*/
virtual uint16_t reportNegotiate(uint16_t heartbeat) override
virtual TcpState *onAmqpClosed(const Monitor &monitor) override
{
// move to the tcp shutdown state
return new TcpShutdown(this);
}
/**
* When an error occurs in the AMQP protocol
* @param monitor Monitor that can be used to check if the connection is still alive
* @param message The error message
* @return TcpState New implementation object
*/
virtual TcpState *onAmqpError(const Monitor &monitor, const char *message) override
{
// tell the user about it
// @todo do this somewhere else
//_handler->onError(_connection, message);
// stop if the object was destructed
if (!monitor.valid()) return nullptr;
// move to the tcp shutdown state
return new TcpShutdown(this);
}
/**
* Install max-frame size
* @param heartbeat suggested heartbeat
*/
virtual void maxframe(size_t maxframe) override
{
// remember that we have to reallocate (_in member can not be accessed because it is moved away)
_reallocate = _connection->maxFrame();
// pass to base
return TcpState::reportNegotiate(heartbeat);
}
/**
* Report to the handler that the object is in an error state.
* @param error
*/
virtual void reportError(const char *error) override
{
// close the socket
close();
// if the user was already notified of an final state, we do not have to proceed
if (_finalized) return;
// remember that this is the final call to user space
_finalized = true;
// pass to handler
_handler->onError(_connection, error);
}
/**
* Report to the handler that the connection was nicely closed
* This is the counter-part of the connection->close() call.
*/
virtual void reportClosed() override
{
// we will shutdown the socket in a very elegant way, we notify the peer
// that we will not be sending out more write operations
shutdown(_socket, SHUT_WR);
// we still monitor the socket for readability to see if our close call was
// confirmed by the peer
_handler->monitor(_connection, _socket, readable);
// if the user was already notified of an final state, we do not have to proceed
if (_finalized) return;
// remember that this is the final call to user space
_finalized = true;
// pass to handler
_handler->onClosed(_connection);
_reallocate = maxframe;
}
};

View File

@ -25,7 +25,8 @@ namespace AMQP {
* @param hostname The address to connect to
*/
TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) :
_state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), handler)),
_handler(handler),
_state(new TcpResolver(this, address.hostname(), address.port(), address.secure())),
_connection(this, address.login(), address.vhost()) {}
/**
@ -65,7 +66,7 @@ void TcpConnection::process(int fd, int flags)
{
// monitor the object for destruction, because you never know what the user
Monitor monitor(this);
// store the old state
auto *oldstate = _state.get();
@ -122,6 +123,8 @@ void TcpConnection::flush()
*/
bool TcpConnection::close(bool immediate)
{
// @todo what if not yet connected / still doing a lookup?
// if no immediate disconnect is needed, we can simply start the closing handshake
if (!immediate) return _connection.close();
@ -134,54 +137,29 @@ bool TcpConnection::close(bool immediate)
// if the user-space code destructed the connection, there is nothing else to do
if (!monitor.valid()) return true;
// store the old state
// remember the old state (this is necessary because _state may be modified by user-code)
auto *oldstate = _state.get();
// abort the operation
// @todo does this call user-space stuff?
auto *newstate = _state->abort(monitor);
// if the state did not change, we do not have to update a member,
// when the newstate is nullptr, the object is (being) destructed
// and we do not have to do anything else either
if (oldstate == newstate || newstate == nullptr) return true;
if (newstate == nullptr || newstate == oldstate) return true;
// in a bizarre set of circumstances, the user may have implemented the
// handler in such a way that the connection object was destructed
if (!monitor.valid())
{
// ok, user code is weird, connection object no longer exist, get rid of the state too
delete newstate;
}
else
{
// replace it with the new implementation
_state.reset(newstate);
}
// replace it with the new implementation
_state.reset(newstate);
// fail the connection / report the error to user-space
// @todo what if channels are not even ready?
_connection.fail("connection prematurely closed by client");
// done, we return true because the connection is closed
return true;
}
/**
* Method that is called after the connection was constructed
* @param connection The connection that was attached to the handler
*/
void TcpConnection::onAttached(Connection *connection)
{
// pass on to the state
_state->reportAttached();
}
/**
* Method that is called when the connection is destructed
* @param connection The connection that was detached from the handler
*/
void TcpConnection::onDetached(Connection *connection)
{
// pass on to the state
_state->reportDetached();
}
/**
* Method that is called when the heartbeat frequency is negotiated.
* @param connection The connection that suggested a heartbeat interval
@ -190,8 +168,11 @@ void TcpConnection::onDetached(Connection *connection)
*/
uint16_t TcpConnection::onNegotiate(Connection *connection, uint16_t interval)
{
// the state object should do this
return _state->reportNegotiate(interval);
// tell the max-frame size
_state->maxframe(connection->maxFrame());
// tell the handler
return _handler->onNegotiate(this, interval);
}
/**
@ -206,16 +187,6 @@ void TcpConnection::onData(Connection *connection, const char *buffer, size_t si
_state->send(buffer, size);
}
/**
* Method that is called when the server sends a heartbeat to the client
* @param connection The connection over which the heartbeat was received
*/
void TcpConnection::onHeartbeat(Connection *connection)
{
// let the state object do this
_state->reportHeartbeat();
}
/**
* Method called when the connection ends up in an error state
* @param connection The connection that entered the error state
@ -223,18 +194,20 @@ void TcpConnection::onHeartbeat(Connection *connection)
*/
void TcpConnection::onError(Connection *connection, const char *message)
{
// tell the implementation to report the error
_state->reportError(message);
}
/**
* Method that is called when the connection is established
* @param connection The connection that can now be used
*/
void TcpConnection::onConnected(Connection *connection)
{
// tell the implementation to report the status
_state->reportConnected();
// monitor to check if "this" is destructed
Monitor monitor(this);
// remember the old state (this is necessary because _state may be modified by user-code)
auto *oldstate = _state.get();
// tell the state that an error occured at the amqp level
auto *newstate = _state->onAmqpError(monitor, message);
// leap out if nothing changes
if (newstate == nullptr || newstate == oldstate) return;
// assign the new state
_state.reset(newstate);
}
/**
@ -243,8 +216,20 @@ void TcpConnection::onConnected(Connection *connection)
*/
void TcpConnection::onClosed(Connection *connection)
{
// tell the implementation to report that connection is closed now
_state->reportClosed();
// monitor to check if "this" is destructed
Monitor monitor(this);
// remember the old state (this is necessary because _state may be modified by user-code)
auto *oldstate = _state.get();
// tell the state that the connection was closed at the amqp level
auto *newstate = _state->onAmqpClosed(monitor);
// leap out if nothing changes
if (newstate == nullptr || newstate == oldstate) return;
// assign the new state
_state.reset(newstate);
}
/**

107
src/linux_tcp/tcpextstate.h Normal file
View File

@ -0,0 +1,107 @@
/**
* TcpExtState.h
*
* Extended state that also contains the socket filedescriptor
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Begin of namespace
*/
namespace AMQP {
/**
* Class definition
*/
class TcpExtState : public TcpState
{
protected:
/**
* The filedescriptor
* @var int
*/
int _socket;
/**
* Clean-up the socket
*/
void cleanup()
{
// do nothing if no longer connected
if (_socket < 0) return;
// tell handler that the socket is idle and we're no longer interested in events
_parent->onIdle(this, _socket, 0);
// close the socket
::close(_socket);
// forget the socket
_socket = -1;
// tell the handler that the connection is now closed
_parent->onClosed(this);
}
protected:
/**
* Constructor
* @param parent
*/
TcpExtState(TcpParent *parent) :
TcpState(parent),
_socket(-1) {}
/**
* Constructor
* @param state
*/
TcpExtState(TcpExtState *state) :
TcpState(state),
_socket(state->_socket)
{
// invalidate the other state
state->_socket = -1;
}
public:
/**
* No copying
* @param that
*/
TcpExtState(const TcpExtState &that) = delete;
/**
* Destructor
*/
virtual ~TcpExtState()
{
// cleanup the socket
cleanup();
}
/**
* The filedescriptor of this connection
* @return int
*/
virtual int fileno() const override
{
// expose the socket
return _socket;
}
};
/**
* End of namespace
*/
}

View File

@ -32,7 +32,7 @@ namespace AMQP {
/**
* Class definition
*/
class TcpResolver : public TcpState
class TcpResolver : public TcpExtState
{
private:
/**
@ -59,12 +59,6 @@ private:
*/
Pipe _pipe;
/**
* Non-blocking socket that is connected to RabbitMQ
* @var int
*/
int _socket = -1;
/**
* Possible error that occured
* @var std::string
@ -150,20 +144,19 @@ private:
public:
/**
* Constructor
* @param connection Parent connection object
* @param parent Parent connection object
* @param hostname The hostname for the lookup
* @param portnumber The portnumber for the lookup
* @param secure Do we need a secure tls connection when ready?
* @param handler User implemented handler object
*/
TcpResolver(TcpConnection *connection, const std::string &hostname, uint16_t port, bool secure, TcpHandler *handler) :
TcpState(connection, handler),
_hostname(hostname),
TcpResolver(TcpParent *parent, std::string hostname, uint16_t port, bool secure) :
TcpExtState(parent),
_hostname(std::move(hostname)),
_secure(secure),
_port(port)
{
// tell the event loop to monitor the filedescriptor of the pipe
handler->monitor(connection, _pipe.in(), readable);
parent->onIdle(this, _pipe.in(), readable);
// we can now start the thread (must be started after filedescriptor is monitored!)
std::thread thread(std::bind(&TcpResolver::run, this));
@ -178,7 +171,7 @@ public:
virtual ~TcpResolver() noexcept
{
// stop monitoring the pipe filedescriptor
_handler->monitor(_connection, _pipe.in(), 0);
_parent->onIdle(this, _pipe.in(), 0);
// wait for the thread to be ready
_thread.join();
@ -201,21 +194,21 @@ public:
{
// if we need a secure connection, we move to the tls handshake
// @todo catch possible exception
if (_secure) return new SslHandshake(_connection, _socket, _hostname, std::move(_buffer), _handler);
if (_secure) return new SslHandshake(this, _hostname, std::move(_buffer));
// otherwise we have a valid regular tcp connection
return new TcpConnected(_connection, _socket, std::move(_buffer), _handler);
return new TcpConnected(this, std::move(_buffer));
}
else
{
// report error
_handler->onError(_connection, _error.data());
_parent->onError(this, _error.data());
// handler callback might have destroyed connection
if (!monitor.valid()) return nullptr;
// create dummy implementation
return new TcpClosed(_connection, _handler);
return new TcpClosed(this);
}
}
@ -260,10 +253,10 @@ public:
_thread.join();
// close the socket
if (_socket >= 0) ::close(_socket);
if (_socket >= 0) { ::close(_socket); _socket = -1; }
// inform user space that the connection is cancelled
_handler->onError(_connection, "tcp connect aborted");
_parent->onError(this, "tcp connect aborted");
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;

View File

@ -0,0 +1,106 @@
/**
* TcpSecureState.h
*
* Utility class that takes care of setting a new state. It contains
* a number of checks that prevents that the state is overwritten
* if the object is destructed in the meantime
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Begin of namespace
*/
namespace AMQP {
/**
* Class definition
*/
class TcpSecureState
{
private:
/**
* Monitor to check the validity of the connection
* @var Monitor
*/
Monitor _monitor;
/**
* Reference to the pointer to the state that should be updated
* @var std::unique_ptr<TcpState>
*/
std::unique_ptr<TcpState> &_state;
/**
* The old pointer
* @var TcpState*
*/
const TcpState *_old;
public:
/**
* Constructor
* @param watchable the object that can be destructor
* @param state the old state value
*/
TcpSecureState(Watchable *watchable, std::unique_ptr<TcpState> &state) :
_monitor(watchable), _state(state), _old(state.get()) {}
/**
* No copying
* @param that
*/
TcpSecureState(const TcpSecureState &that) = delete;
/**
* Destructor
*/
virtual ~TcpSecureState() = default;
/**
* Expose the monitor
* @return Monitor
*/
const Monitor &monitor() const { return _monitor; }
/**
* Assign a new state
* @param state this is a newly allocated state
* @return bool true if the object is still valid
*/
bool assign(TcpState *state)
{
// do nothing if the state did not change, or if object was destructed
if (_old == state || state == nullptr) return _monitor.valid();
// can we assign a new state?
if (_monitor.valid())
{
// assign the
_state.reset(state);
// object is still valid
return true;
}
else
{
// otherwise the object is destructed and the new state should be destructed too
delete state;
// object is no longer valid
return false;
}
}
};
/**
* End of namespace
*/
}

141
src/linux_tcp/tcpshutdown.h Normal file
View File

@ -0,0 +1,141 @@
/**
* TcpShutdown.h
*
* State in the TCP handshake that is responsible for gracefully
* shutting down the connection by closing our side of the connection,
* and waiting for the server to close the connection on the other
* side too.
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "tcpextstate.h"
/**
* Begin of namespace
*/
namespace AMQP {
/**
* Class definition
*/
class TcpShutdown : public TcpExtState
{
protected:
/**
* Method to report the result to the user
*/
virtual void report()
{
// report that the connection was closed
_parent->onClosed(this);
}
public:
/**
* Constructor
* @param state The previous state
*/
TcpShutdown(TcpExtState *state) : TcpExtState(state)
{
// we will shutdown the socket in a very elegant way, we notify the peer
// that we will not be sending out more write operations
shutdown(_socket, SHUT_WR);
// we still monitor the socket for readability to see if our close call was
// confirmed by the peer
_parent->onIdle(this, _socket, readable);
}
/**
* Forbidden to copy
* @param that
*/
TcpShutdown(const TcpShutdown &that) = delete;
/**
* Destructor
*/
virtual ~TcpShutdown() = default;
/**
* Process the filedescriptor in the object
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @param fd The filedescriptor that is active
* @param flags AMQP::readable and/or AMQP::writable
* @return New implementation object
*/
virtual TcpState *process(const Monitor &monitor, int fd, int flags)
{
// must be the right filedescriptor
if (_socket != fd) return this;
// if the socket is not readable, we do not have to check anything
if (!(flags & readable)) return this;
// buffer to read data in
char buffer[64];
// read in data (we only do this to discover if the connection is really closed)
auto result = read(_socket, buffer, sizeof(buffer));
// if we read something, we keep on reading
if (result > 0) return this;
// or should we retry?
if (result < 0 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) return this;
// flush the connection to close the connection and report to the user
return flush(monitor);
}
/**
* Flush the connection, make sure all network operations are finished
* @param monitor Object to check if connection still exists
* @return TcpState New state
*/
virtual TcpState *flush(const Monitor &monitor) override
{
// immediately close the socket
cleanup();
// report to the user that the operation is finished
report();
// move to next state
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
/**
* Abort the operation, immediately proceed to the final state
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @return TcpState New implementation object
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// close the socket completely
cleanup();
// report the error to user-space
// @todo do we have to report this?
//_handler->onError(_connection, "tcp shutdown aborted");
// move to next state
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
};
/**
* End of namespace
*/
}

View File

@ -24,32 +24,26 @@ class TcpState
{
protected:
/**
* Parent TcpConnection object as is seen by the user
* @var TcpConnection
* Parent object that constructed the state
* @var TcpParent
*/
TcpConnection *_connection;
/**
* User-supplied handler
* @var TcpHandler
*/
TcpHandler *_handler;
TcpParent *_parent;
protected:
/**
* Protected constructor
* @param connection Original TCP connection object
* @param parent The parent object
* @param handler User-supplied handler class
*/
TcpState(TcpConnection *connection, TcpHandler *handler) :
_connection(connection), _handler(handler) {}
TcpState(TcpParent *parent) :
_parent(parent) {}
/**
* Protected "copy" constructor
* @param state Original TcpState object
*/
TcpState(const TcpState *state) :
_connection(state->_connection), _handler(state->_handler) {}
_parent(state->_parent) {}
public:
/**
@ -124,81 +118,25 @@ public:
virtual TcpState *abort(const Monitor &monitor) = 0;
/**
* Report to the handler that the connection was constructed
*/
virtual void reportAttached()
{
// pass to the handler
_handler->onAttached(_connection);
}
/**
* Report to the handler that the connection was destructed
*/
virtual void reportDetached()
{
// pass to the handler
_handler->onDetached(_connection);
}
/**
* Report to the handler that heartbeat negotiation is going on
* Install max-frame size
* @param heartbeat suggested heartbeat
* @return uint16_t accepted heartbeat
*/
virtual uint16_t reportNegotiate(uint16_t heartbeat)
{
// pass to handler
return _handler->onNegotiate(_connection, heartbeat);
}
/**
* Report to the handler that the object is in an error state.
*
* This is the last method to be called on the handler object, from now on
* the handler will no longer be called to report things to user space.
* The state object itself stays active, and further calls to process()
* may be possible.
*
* @param error
*/
virtual void reportError(const char *error)
{
// pass to handler
_handler->onError(_connection, error);
}
virtual void maxframe(size_t maxframe) {}
/**
* Report that a heartbeat frame was received
*/
virtual void reportHeartbeat()
{
// pass to handler
_handler->onHeartbeat(_connection);
}
/**
* Report to the handler that the connection is ready for use
*/
virtual void reportConnected()
{
// pass to handler
_handler->onConnected(_connection);
}
/**
* Report to the handler that the connection was correctly closed, after
* the user has called the Connection::close() method. The underlying TCP
* connection still has to be closed.
* Events that can take place during the AMQP protocol
*
* This is the last method that is called on the object, from now on no
* other methods may be called on the _handler variable.
* Both events also trigger the end of a valid connection, and should
* be used to tear down the TCP connection.
*
* @todo are these appropriate names?
*
* @param monitor
* @param TcpState
*/
virtual void reportClosed()
{
// pass to handler
_handler->onClosed(_connection);
}
virtual TcpState *onAmqpError(const Monitor &monitor, const char *error) { return this; }
virtual TcpState *onAmqpClosed(const Monitor &monitor) { return this; }
};
/**