work in progress, simplified the tcp and ssl states
This commit is contained in:
parent
e617161c8c
commit
359eec189f
|
|
@ -65,7 +65,7 @@ public:
|
|||
virtual void onIdle(TcpState *state, int socket, int events) = 0;
|
||||
|
||||
/**
|
||||
* Method that is called when an error occurs (the connection is lost then)
|
||||
* Method that is called when an error occurs (the connection is lost)
|
||||
* @param state
|
||||
* @param error
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -307,24 +307,8 @@ public:
|
|||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~SslConnected() noexcept
|
||||
{
|
||||
// no cleanup if socket is gone
|
||||
if (_socket < 0) return;
|
||||
|
||||
// and stop monitoring it
|
||||
_parent->onIdle(this, _socket, 0);
|
||||
|
||||
// close the socket
|
||||
::close(_socket);
|
||||
}
|
||||
virtual ~SslConnected() noexcept = default;
|
||||
|
||||
/**
|
||||
* The filedescriptor of this connection
|
||||
* @return int
|
||||
*/
|
||||
virtual int fileno() const override { return _socket; }
|
||||
|
||||
/**
|
||||
* Number of bytes in the outgoing buffer
|
||||
* @return std::size_t
|
||||
|
|
@ -438,37 +422,11 @@ public:
|
|||
}
|
||||
|
||||
/**
|
||||
* When the AMQP transport layer is closed
|
||||
* @param monitor Object that can be used if connection is still alive
|
||||
* @return TcpState New implementation object
|
||||
* Gracefully close the connection
|
||||
* @return TcpState The next state
|
||||
*/
|
||||
virtual TcpState *onAmqpClosed(const Monitor &monitor) override
|
||||
{
|
||||
// remember that the object is going to be closed
|
||||
_closed = true;
|
||||
|
||||
// if the previous operation is still in progress we can wait for that
|
||||
if (_state != state_idle) return this;
|
||||
|
||||
// the connection can be closed right now, move to the next state
|
||||
return new SslShutdown(this, std::move(_ssl));
|
||||
}
|
||||
|
||||
/**
|
||||
* When an error occurs in the AMQP protocol
|
||||
* @param monitor Monitor that can be used to check if the connection is still alive
|
||||
* @param message The error message
|
||||
* @return TcpState New implementation object
|
||||
*/
|
||||
virtual TcpState *onAmqpError(const Monitor &monitor, const char *message) override
|
||||
{
|
||||
// tell the user about it
|
||||
// @todo do we need this here?
|
||||
//_handler->onError(_connection, message);
|
||||
|
||||
// stop if the object was destructed
|
||||
if (!monitor.valid()) return nullptr;
|
||||
|
||||
virtual TcpState *close() override
|
||||
{
|
||||
// remember that the object is going to be closed
|
||||
_closed = true;
|
||||
|
||||
|
|
|
|||
|
|
@ -45,28 +45,6 @@ private:
|
|||
TcpOutBuffer _out;
|
||||
|
||||
|
||||
/**
|
||||
* Close the socket
|
||||
* @return bool
|
||||
*/
|
||||
bool close()
|
||||
{
|
||||
// do nothing if already closed
|
||||
if (_socket < 0) return false;
|
||||
|
||||
// and stop monitoring it
|
||||
_parent->onIdle(this, _socket, 0);
|
||||
|
||||
// close the socket
|
||||
::close(_socket);
|
||||
|
||||
// forget filedescriptor
|
||||
_socket = -1;
|
||||
|
||||
// done
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Report a new state
|
||||
* @param monitor
|
||||
|
|
@ -80,24 +58,16 @@ private:
|
|||
// leap out if the user space function destructed the object
|
||||
if (!monitor.valid()) return nullptr;
|
||||
|
||||
// copy the socket because we might forget it
|
||||
// auto socket = _socket;
|
||||
|
||||
// forget the socket member to prevent that it is closed by the destructor
|
||||
_socket = -1;
|
||||
|
||||
// if connection is allowed, we move to the next state
|
||||
if (allowed) return new SslConnected(this, std::move(_ssl), std::move(_out));
|
||||
|
||||
// report that the connection is broken
|
||||
// @todo do we need this?
|
||||
//_handler->onError(_connection, "TLS connection has been rejected");
|
||||
_parent->onError(this, "TLS connection has been rejected");
|
||||
|
||||
// the onError method could have destructed this object
|
||||
if (!monitor.valid()) return nullptr;
|
||||
|
||||
// shutdown the connection
|
||||
// @todo the onClosed() does not have to be called
|
||||
return new SslShutdown(this, std::move(_ssl));
|
||||
}
|
||||
|
||||
|
|
@ -108,16 +78,14 @@ private:
|
|||
*/
|
||||
TcpState *reportError(const Monitor &monitor)
|
||||
{
|
||||
// close the socket
|
||||
close();
|
||||
|
||||
// we have an error - report this to the user
|
||||
// @todo do we need this?
|
||||
//_handler->onError(_connection, "failed to setup ssl connection");
|
||||
_parent->onError(this, "failed to setup ssl connection");
|
||||
|
||||
// done, go to the closed state (plus check if connection still exists, because
|
||||
// after the onError() call the user space program may have destructed that object)
|
||||
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
||||
// stop if connection is gone
|
||||
if (!monitor.valid()) return nullptr;
|
||||
|
||||
// done, shutdown the tcp connection
|
||||
return new TcpShutdown(this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -167,21 +135,7 @@ public:
|
|||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~SslHandshake() noexcept
|
||||
{
|
||||
// leap out if socket is invalidated
|
||||
if (_socket < 0) return;
|
||||
|
||||
// the object got destructed without moving to a new state, this
|
||||
// situation should normally not occur
|
||||
::close(_socket);
|
||||
}
|
||||
|
||||
/**
|
||||
* The filedescriptor of this connection
|
||||
* @return int
|
||||
*/
|
||||
virtual int fileno() const override { return _socket; }
|
||||
virtual ~SslHandshake() noexcept = default;
|
||||
|
||||
/**
|
||||
* Number of bytes in the outgoing buffer
|
||||
|
|
|
|||
|
|
@ -67,7 +67,6 @@ private:
|
|||
|
||||
default:
|
||||
// go to the final state (if not yet disconnected)
|
||||
// @todo special treatment for ssl-protocol errors
|
||||
return proceed(monitor);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,51 +57,31 @@ private:
|
|||
bool _finalized = false;
|
||||
|
||||
|
||||
/**
|
||||
* Start an elegant shutdown
|
||||
*
|
||||
* @todo remove this method
|
||||
*/
|
||||
void shutdown2()
|
||||
{
|
||||
// we will shutdown the socket in a very elegant way, we notify the peer
|
||||
// that we will not be sending out more write operations
|
||||
::shutdown(_socket, SHUT_WR);
|
||||
|
||||
// we still monitor the socket for readability to see if our close call was
|
||||
// confirmed by the peer
|
||||
_parent->onIdle(this, _socket, readable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to report an error
|
||||
* @param monitor Monitor to check validity of "this"
|
||||
* @return bool Was an error reported?
|
||||
*/
|
||||
bool reportError(const Monitor &monitor)
|
||||
bool reportError()
|
||||
{
|
||||
// some errors are ok and do not (necessarily) mean that we're disconnected
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false;
|
||||
|
||||
// tell the connection that it failed
|
||||
// @todo we should report an error, but that could be wrong, because it calls back to us
|
||||
// tell the parent that it failed
|
||||
_parent->onError(this, "connection lost");
|
||||
|
||||
// we're no longer interested in the socket (this also calls onClosed())
|
||||
cleanup();
|
||||
|
||||
// done
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct the next state
|
||||
* Construct the shutdown state
|
||||
* @param monitor Object that monitors whether connection still exists
|
||||
* @return TcpState*
|
||||
*/
|
||||
TcpState *nextState(const Monitor &monitor)
|
||||
{
|
||||
// if the object is still in a valid state, we can move to the close-state,
|
||||
// otherwise there is no point in moving to a next state
|
||||
// if the object is still in a valid state, we can treat the connection
|
||||
// as closed otherwise there is no point in moving to a next state
|
||||
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
||||
}
|
||||
|
||||
|
|
@ -128,12 +108,6 @@ public:
|
|||
*/
|
||||
virtual ~TcpConnected() noexcept = default;
|
||||
|
||||
/**
|
||||
* The filedescriptor of this connection
|
||||
* @return int
|
||||
*/
|
||||
virtual int fileno() const override { return _socket; }
|
||||
|
||||
/**
|
||||
* Number of bytes in the outgoing buffer
|
||||
* @return std::size_t
|
||||
|
|
@ -159,7 +133,7 @@ public:
|
|||
auto result = _out.sendto(_socket);
|
||||
|
||||
// are we in an error state?
|
||||
if (result < 0 && reportError(monitor)) return nextState(monitor);
|
||||
if (result < 0 && reportError()) return nextState(monitor);
|
||||
|
||||
// if buffer is empty by now, we no longer have to check for
|
||||
// writability, but only for readability
|
||||
|
|
@ -173,7 +147,7 @@ public:
|
|||
ssize_t result = _in.receivefrom(_socket, _parent->expected());
|
||||
|
||||
// are we in an error state?
|
||||
if (result < 0 && reportError(monitor)) return nextState(monitor);
|
||||
if (result < 0 && reportError()) return nextState(monitor);
|
||||
|
||||
// @todo should we also check for result == 0
|
||||
|
||||
|
|
@ -256,34 +230,16 @@ public:
|
|||
// all has been sent
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* When the AMQP transport layer is closed
|
||||
* @param monitor Object that can be used if connection is still alive
|
||||
* @return TcpState New implementation object
|
||||
*/
|
||||
virtual TcpState *onAmqpClosed(const Monitor &monitor) override
|
||||
{
|
||||
// move to the tcp shutdown state
|
||||
return new TcpShutdown(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* When an error occurs in the AMQP protocol
|
||||
* @param monitor Monitor that can be used to check if the connection is still alive
|
||||
* @param message The error message
|
||||
* @return TcpState New implementation object
|
||||
* Gracefully close the connection
|
||||
* @return TcpState The next state
|
||||
*/
|
||||
virtual TcpState *onAmqpError(const Monitor &monitor, const char *message) override
|
||||
virtual TcpState *close() override
|
||||
{
|
||||
// tell the user about it
|
||||
// @todo do this somewhere else
|
||||
//_handler->onError(_connection, message);
|
||||
// @todo what if we're still busy receiving data?
|
||||
|
||||
// stop if the object was destructed
|
||||
if (!monitor.valid()) return nullptr;
|
||||
|
||||
// move to the tcp shutdown state
|
||||
// start the tcp shutdown
|
||||
return new TcpShutdown(this);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -88,6 +88,7 @@ void TcpConnection::process(int fd, int flags)
|
|||
else
|
||||
{
|
||||
// replace it with the new implementation
|
||||
// @todo destructing the existing _state may destruct the entire object
|
||||
_state.reset(newstate);
|
||||
}
|
||||
}
|
||||
|
|
@ -178,8 +179,8 @@ void TcpConnection::onError(Connection *connection, const char *message)
|
|||
// remember the old state (this is necessary because _state may be modified by user-code)
|
||||
auto *oldstate = _state.get();
|
||||
|
||||
// tell the state that an error occured at the amqp level
|
||||
auto *newstate = _state->onAmqpError(monitor, message);
|
||||
// tell the state that the connection should be closed asap
|
||||
auto *newstate = _state->close();
|
||||
|
||||
// leap out if nothing changes
|
||||
if (newstate == nullptr || newstate == oldstate) return;
|
||||
|
|
@ -200,8 +201,8 @@ void TcpConnection::onClosed(Connection *connection)
|
|||
// remember the old state (this is necessary because _state may be modified by user-code)
|
||||
auto *oldstate = _state.get();
|
||||
|
||||
// tell the state that the connection was closed at the amqp level
|
||||
auto *newstate = _state->onAmqpClosed(monitor);
|
||||
// tell the state that the connection should be closed asap
|
||||
auto *newstate = _state->close();
|
||||
|
||||
// leap out if nothing changes
|
||||
if (newstate == nullptr || newstate == oldstate) return;
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ protected:
|
|||
int _socket;
|
||||
|
||||
/**
|
||||
* Clean-up the socket
|
||||
* Clean-up the socket, and call the onClosed() method
|
||||
*/
|
||||
void cleanup()
|
||||
{
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ private:
|
|||
_error = strerror(errno);
|
||||
|
||||
// close socket because connect failed
|
||||
close(_socket);
|
||||
::close(_socket);
|
||||
|
||||
// socket no longer is valid
|
||||
_socket = -1;
|
||||
|
|
|
|||
|
|
@ -30,16 +30,6 @@ namespace AMQP {
|
|||
*/
|
||||
class TcpShutdown : public TcpExtState
|
||||
{
|
||||
protected:
|
||||
/**
|
||||
* Method to report the result to the user
|
||||
*/
|
||||
virtual void report()
|
||||
{
|
||||
// report that the connection was closed
|
||||
_parent->onClosed(this);
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
|
|
@ -108,9 +98,6 @@ public:
|
|||
// immediately close the socket
|
||||
cleanup();
|
||||
|
||||
// report to the user that the operation is finished
|
||||
report();
|
||||
|
||||
// move to next state
|
||||
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,6 +92,12 @@ public:
|
|||
// default does nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Gracefully close the connection
|
||||
* @return TcpState The next state
|
||||
*/
|
||||
virtual TcpState *close() { return this; }
|
||||
|
||||
/**
|
||||
* Flush the connection, all outgoing operations should be completed.
|
||||
*
|
||||
|
|
@ -115,21 +121,6 @@ public:
|
|||
* @param heartbeat suggested heartbeat
|
||||
*/
|
||||
virtual void maxframe(size_t maxframe) {}
|
||||
|
||||
/**
|
||||
* Events that can take place during the AMQP protocol
|
||||
*
|
||||
* Both events also trigger the end of a valid connection, and should
|
||||
* be used to tear down the TCP connection.
|
||||
*
|
||||
* @todo are these appropriate names?
|
||||
*
|
||||
* @param monitor
|
||||
* @param TcpState
|
||||
*/
|
||||
virtual TcpState *onAmqpError(const Monitor &monitor, const char *error) { return this; }
|
||||
virtual TcpState *onAmqpClosed(const Monitor &monitor) { return this; }
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Reference in New Issue