improved docblocks in tcpstate.h header file, when an error or closed is reported to user space, the _handler variable is now reset to prevent that it will be used to report more than once (we still need to check if this does not trigger other errors), and the state object is no longer destructed after a reportClosed() call, so that it can clean up nicely (which we need to the tls shutdown anyway)

This commit is contained in:
Emiel Bruijntjes 2018-03-08 10:02:42 +01:00
parent caa7277bb1
commit f23bcf19f1
9 changed files with 160 additions and 53 deletions

View File

@ -5,7 +5,7 @@
* IO between the client application and the RabbitMQ server.
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 - 2016 Copernica BV
* @copyright 2015 - 2018 Copernica BV
*/
/**

View File

@ -211,6 +211,20 @@ int SSL_do_handshake(SSL *ssl)
return func(ssl);
}
/**
* Obtain shutdown statue for TLS/SSL I/O operation
* @param ssl SSL object
* @return int returns error values
*/
int SSL_get_shutdown(const SSL *ssl)
{
// create a function
static Function<decltype(::SSL_get_shutdown)> func("SSL_get_shutdown");
// call the openssl function
return func(ssl);
}
/**
* Obtain result code for TLS/SSL I/O operation
* @param ssl SSL object

View File

@ -41,6 +41,7 @@ int SSL_read(SSL *ssl, void *buf, int num);
int SSL_write(SSL *ssl, const void *buf, int num);
int SSL_shutdown(SSL *ssl);
int SSL_set_fd(SSL *ssl, int fd);
int SSL_get_shutdown(const SSL *ssl);
int SSL_get_error(const SSL *ssl, int ret);
int SSL_up_ref(SSL *ssl);
void SSL_set_connect_state(SSL *ssl);

View File

@ -149,23 +149,54 @@ private:
{
// error was returned, so we must investigate what is going on
auto error = OpenSSL::SSL_get_error(_ssl, result);
// create a monitor because the handler could make things ugly
Monitor monitor(this);
// check the error
switch (error) {
case SSL_ERROR_WANT_READ:
// the operation must be repeated when readable
_handler->monitor(_connection, _socket, readable);
return this;
// allow chaining
return monitor.valid() ? this : nullptr;
case SSL_ERROR_WANT_WRITE:
// wait until socket becomes writable again
_handler->monitor(_connection, _socket, readable | writable);
return this;
// allow chaining
return monitor.valid() ? this : nullptr;
case SSL_ERROR_NONE:
// turns out no error occured, an no action has to be rescheduled
_handler->monitor(_connection, _socket, _out ? readable | writable : readable);
// we're ready for the next instruction from userspace
_state = state_idle;
// allow chaining
return monitor.valid() ? this : nullptr;
default:
// is the peer trying to shutdown? (we dont expect this)
bool shutdown = OpenSSL::SSL_get_shutdown(_ssl);
// @todo check how to handle this
return this;
// send back a nice shutdown
if (shutdown) OpenSSL::SSL_shutdown(_ssl);
// tell the handler
_handler->onError(_connection, "ssl error");
// no need to chain if object is already destructed
if (!monitor) return nullptr;
// create a new new object
//return shutdown ?
// allow chaining
return nullptr; //monitor.valid() ? new TcpClosed(this) : nullptr;
}
}
@ -258,9 +289,6 @@ public:
// the socket must be the one this connection writes to
if (fd != _socket) return this;
// because the object might soon be destructed, we create a monitor to check this
Monitor monitor(this);
// are we busy with sending or receiving data?
if (_state == state_sending)
{
@ -288,14 +316,34 @@ public:
/**
* Flush the connection, sent all buffered data to the socket
* @param monitor Object to check if connection still exists
* @return TcpState new tcp state
*/
virtual TcpState *flush() override
virtual TcpState *flush(const Monitor &monitor) override
{
// we are not going to do this is object is busy reading
if (_state == state_receiving) return this;
// create an object to wait for the filedescriptor to becomes active
Wait wait(_socket);
// @todo implementation
// keep looping while we have an outgoing buffer
while (_out)
{
// try to send more data from the outgoing buffer
auto result = _out.sendto(_ssl);
// go to the next state
auto *state = result > 0 ? proceed() : repeat(result);
return state;
// if (result > 0) return proceed();
//
// // the operation failed, we may have to repeat our call
// else return repeat(result);
}
return this;
}

View File

@ -148,11 +148,12 @@ public:
/**
* Process the filedescriptor in the object
* @param monitor Object to check if connection still exists
* @param fd Filedescriptor that is active
* @param flags AMQP::readable and/or AMQP::writable
* @return New state object
*/
virtual TcpState *process(int fd, int flags) override
virtual TcpState *process(const Monitor &monitor, int fd, int flags) override
{
// must be the socket
if (fd != _socket) return this;
@ -187,9 +188,10 @@ public:
/**
* Flush the connection, sent all buffered data to the socket
* @param monitor Object to check if connection still exists
* @return TcpState new tcp state
*/
virtual TcpState *flush() override
virtual TcpState *flush(const Monitor &monitor) override
{
// create an object to wait for the filedescriptor to becomes active
Wait wait(_socket);

View File

@ -128,18 +128,16 @@ public:
/**
* Process the filedescriptor in the object
* @param monitor Monitor to check if the object is still alive
* @param fd Filedescriptor that is active
* @param flags AMQP::readable and/or AMQP::writable
* @return New state object
*/
virtual TcpState *process(int fd, int flags) override
virtual TcpState *process(const Monitor &monitor, int fd, int flags) override
{
// must be the socket
if (fd != _socket) return this;
// because the object might soon be destructed, we create a monitor to check this
Monitor monitor(this);
// can we write more data to the socket?
if (flags & writable)
{
@ -218,9 +216,10 @@ public:
/**
* Flush the connection, sent all buffered data to the socket
* @param monitor Object to check if connection still lives
* @return TcpState new tcp state
*/
virtual TcpState *flush() override
virtual TcpState *flush(const Monitor &monitor) override
{
// create an object to wait for the filedescriptor to becomes active
Wait wait(_socket);
@ -232,7 +231,7 @@ public:
if (!wait.writable()) return this;
// socket is writable, send as much data as possible
auto *newstate = process(_socket, writable);
auto *newstate = process(monitor, _socket, writable);
// are we done
if (newstate != this) return newstate;

View File

@ -4,7 +4,7 @@
* Implementation file for the TCP connection
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 - 2016 Copernica BV
* @copyright 2015 - 2018 Copernica BV
*/
/**
@ -12,6 +12,7 @@
*/
#include "includes.h"
#include "tcpresolver.h"
#include "tcpstate.h"
/**
* Set up namespace
@ -24,7 +25,7 @@ namespace AMQP {
* @param hostname The address to connect to
*/
TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) :
_state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), handler)),
// _state(new TcpResolver(this, address.hostname(), address.port(), address.secure(), handler)),
_connection(this, address.login(), address.vhost()) {}
/**
@ -48,11 +49,11 @@ int TcpConnection::fileno() const
*/
void TcpConnection::process(int fd, int flags)
{
// monitor the object for destruction
Monitor monitor{ this };
// monitor the object for destruction, because you never know what the user
Monitor monitor(this);
// pass on the the state, that returns a new impl
auto *result = _state->process(fd, flags);
auto *result = _state->process(monitor, fd, flags);
// are we still valid
if (!monitor.valid()) return;
@ -78,7 +79,7 @@ void TcpConnection::flush()
while (true)
{
// flush the object
auto *newstate = _state->flush();
auto *newstate = _state->flush(monitor);
// done if object no longer exists
if (!monitor.valid()) return;
@ -136,7 +137,7 @@ void TcpConnection::onError(Connection *connection, const char *message)
auto ptr = std::move(_state);
// object is now in a closed state
_state.reset(new TcpClosed(ptr.get()));
//_state.reset(new TcpClosed(ptr.get()));
// tell the implementation to report the error
ptr->reportError(message);
@ -158,14 +159,8 @@ void TcpConnection::onConnected(Connection *connection)
*/
void TcpConnection::onClosed(Connection *connection)
{
// current object is going to be removed, but we have to keep it for a while
auto ptr = std::move(_state);
// object is now in a closed state
_state.reset(new TcpClosed(ptr.get()));
// tell the implementation to report that connection is closed now
ptr->reportClosed();
_state->reportClosed();
}
/**

View File

@ -212,11 +212,12 @@ public:
/**
* Wait for the resolver to be ready
* @param monitor Object to check if connection still exists
* @param fd The filedescriptor that is active
* @param flags Flags to indicate that fd is readable and/or writable
* @return New implementation object
*/
virtual TcpState *process(int fd, int flags) override
virtual TcpState *process(const Monitor &monitor, int fd, int flags) override
{
// only works if the incoming pipe is readable
if (fd != _pipe.in() || !(flags & readable)) return this;
@ -227,9 +228,10 @@ public:
/**
* Flush state / wait for the connection to complete
* @param monitor Object to check if connection still exists
* @return New implementation object
*/
virtual TcpState *flush() override
virtual TcpState *flush(const Monitor &monitor) override
{
// just wait for the other thread to be ready
_thread.join();

View File

@ -36,6 +36,22 @@ protected:
TcpHandler *_handler;
protected:
/**
* Helper function to reset the handler, and to return the old handler object
* @return TcpHandler* User-supplied handler that was just reset
*/
TcpHandler *reset(TcpHandler *handler)
{
// remember old handler
auto *oldhandler = _handler;
// install the new handler
_handler = handler;
// return the old handler
return oldhandler;
}
/**
* Protected constructor
* @param connection Original TCP connection object
@ -64,12 +80,19 @@ public:
virtual int fileno() const { return -1; }
/**
* Process the filedescriptor in the object
* Process the filedescriptor in the object
*
* This method should return the handler object that will be responsible for
* all future readable/writable events for the file descriptor, or nullptr
* if the underlying connection object has already been destructed by the
* user and it would be pointless to set up a new handler.
*
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @param fd The filedescriptor that is active
* @param flags AMQP::readable and/or AMQP::writable
* @return New implementation object
*/
virtual TcpState *process(int fd, int flags)
virtual TcpState *process(const Monitor &monitor, int fd, int flags)
{
// default implementation does nothing and preserves same implementation
return this;
@ -77,8 +100,8 @@ public:
/**
* Send data over the connection
* @param buffer buffer to send
* @param size size of the buffer
* @param buffer Buffer to send
* @param size Size of the buffer
*/
virtual void send(const char *buffer, size_t size)
{
@ -86,7 +109,25 @@ public:
}
/**
* Report that heartbeat negotiation is going on
* Flush the connection, all outgoing operations should be completed.
*
* If the state changes during the operation, the new state object should
* be returned instead, or nullptr if the user has closed the connection
* in the meantime. If the connection object got destructed by a user space
* call, this method should return nullptr. A monitor object is pass in to
* allow the flush() method to check if the connection still exists.
*
* If this object returns a new state object (instead of "this"), the
* connection object will immediately proceed with calling flush() on that
* new state object too.
*
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @return TcpState New implementation object
*/
virtual TcpState *flush(const Monitor &monitor) { return this; }
/**
* Report to the handler that heartbeat negotiation is going on
* @param heartbeat suggested heartbeat
* @return uint16_t accepted heartbeat
*/
@ -97,25 +138,25 @@ public:
}
/**
* Flush the connection
* @return TcpState new implementation object
*/
virtual TcpState *flush() { return this; }
/**
* Report to the handler that the object is in an error state
* Report to the handler that the object is in an error state.
*
* This is the last method to be called on the handler object, from now on
* the handler will no longer be called to report things to user space.
* The state object itself stays active, and further calls to process()
* may be possible.
*
* @param error
*/
void reportError(const char *error)
virtual void reportError(const char *error)
{
// pass to handler
_handler->onError(_connection, error);
reset(nullptr)->onError(_connection, error);
}
/**
* Report that a heartbeat frame was received
*/
void reportHeartbeat()
virtual void reportHeartbeat()
{
// pass to handler
_handler->onHeartbeat(_connection);
@ -124,19 +165,24 @@ public:
/**
* Report to the handler that the connection is ready for use
*/
void reportConnected()
virtual void reportConnected()
{
// pass to handler
_handler->onConnected(_connection);
}
/**
* Report to the handler that the connection was nicely closed
* Report to the handler that the connection was correctly closed, after
* the user has called the Connection::close() method. The underlying TCP
* connection still has to be closed.
*
* This is the last method that is called on the object, from now on no
* other methods may be called on the _handler variable.
*/
virtual void reportClosed()
{
// pass to handler
_handler->onClosed(_connection);
reset(nullptr)->onClosed(_connection);
}
};