removed support for TcpConnection::flush() and removed internal TcpShutdown state

This commit is contained in:
Emiel Bruijntjes 2018-11-06 18:11:27 +01:00
parent 6ea2d8dffd
commit 64c876e65a
9 changed files with 29 additions and 358 deletions

View File

@ -217,14 +217,6 @@ public:
*/
void process(int fd, int flags);
/**
* Flush the connection - all unsent bytes are sent to the socket rigth away
* This is a blocking operation. The connection object normally only sends data
* when the socket is known to be writable, but with this method you can force
* the outgoing buffer to be fushed
*/
void flush();
/**
* Close the connection in an elegant fashion. This closes all channels and the
* TCP connection. Note that the connection is not immediately closed: first all

View File

@ -142,9 +142,11 @@ private:
return monitor.valid() ? this : nullptr;
default:
// report an error to user-space
_parent->onError(this, "ssl protocol error");
// ssl level error, we have to tear down the tcp connection
return monitor.valid() ? new TcpShutdown(this) : nullptr;
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
}
@ -343,64 +345,6 @@ public:
return proceed();
}
/**
* Flush the connection, sent all buffered data to the socket
* @param monitor Object to check if connection still exists
* @return TcpState new tcp state
*/
virtual TcpState *flush(const Monitor &monitor) override
{
// we are not going to do this if object is busy reading
if (_state == state_receiving) return this;
// create an object to wait for the filedescriptor to becomes active
Poll poll(_socket);
// we are going to check for errors after the openssl operations, so we make
// sure that the error queue is currently completely empty
OpenSSL::ERR_clear_error();
// keep looping while we have an outgoing buffer
while (_out)
{
// move to the idle-state
_state = state_idle;
// try to send more data from the outgoing buffer
auto result = _out.sendto(_ssl);
// was this a success?
if (result > 0)
{
// proceed to the next state
auto *nextstate = proceed();
// leap out if we move to a different state
if (nextstate != this) return nextstate;
}
else
{
// error was returned, so we must investigate what is going on
auto error = OpenSSL::SSL_get_error(_ssl, result);
// get the next state given the error
auto *nextstate = repeat(monitor, state_sending, error);
// leap out if we move to a different state
if (nextstate != this) return nextstate;
// check the type of error, and wait now
switch (error) {
case SSL_ERROR_WANT_READ: poll.readable(true); break;
case SSL_ERROR_WANT_WRITE: poll.active(true); break;
}
}
}
// done
return this;
}
/**
* Send data over the connection
* @param buffer buffer to send

View File

@ -85,7 +85,7 @@ private:
if (!monitor.valid()) return nullptr;
// done, shutdown the tcp connection
return new TcpShutdown(this);
return new TcpClosed(this);
}
/**
@ -186,42 +186,6 @@ public:
// the handshake is still busy, outgoing data must be cached
_out.add(buffer, size);
}
/**
* Flush the connection, sent all buffered data to the socket
* @param monitor Object to check if connection still exists
* @return TcpState new tcp state
*/
virtual TcpState *flush(const Monitor &monitor) override
{
// create an object to wait for the filedescriptor to becomes active
Poll poll(_socket);
// keep looping
while (true)
{
// start the ssl handshake
int result = OpenSSL::SSL_do_handshake(_ssl);
// if the connection succeeds, we can move to the ssl-connected state
if (result == 1) return nextstate(monitor);
// error was returned, so we must investigate what is going on
auto error = OpenSSL::SSL_get_error(_ssl, result);
// check the error
switch (error) {
// if openssl reports that socket readability or writability is needed,
// we wait for that until this situation is reached
case SSL_ERROR_WANT_READ: poll.readable(true); break;
case SSL_ERROR_WANT_WRITE: poll.active(true); break;
// something is wrong, we proceed to the next state
default: return reportError(monitor);
}
}
}
};
/**

View File

@ -38,8 +38,8 @@ private:
*/
virtual TcpState *proceed(const Monitor &monitor)
{
// next state is to shutdown the connection
return new TcpShutdown(this);
// next state is to close the connection
return new TcpClosed(this);
}
/**
@ -125,47 +125,6 @@ public:
// the operation failed, we may have to repeat our call
else return repeat(monitor, result);
}
/**
* Flush the connection, sent all buffered data to the socket
* @param monitor Object to check if connection still exists
* @return TcpState new tcp state
*/
virtual TcpState *flush(const Monitor &monitor) override
{
// @todo do we even need this? isn't flushing reserved for data?
// create an object to wait for the filedescriptor to becomes active
Poll poll(_socket);
// keep looping
while (true)
{
// close the connection
auto result = OpenSSL::SSL_shutdown(_ssl);
// on result==0 we need an additional call
while (result == 0) result = OpenSSL::SSL_shutdown(_ssl);
// if this is a success, we can proceed with the event loop
if (result > 0) return proceed(monitor);
// error was returned, so we must investigate what is going on
auto error = OpenSSL::SSL_get_error(_ssl, result);
// check the error
switch (error) {
// if openssl reports that socket readability or writability is needed,
// we wait for that until this situation is reached
case SSL_ERROR_WANT_READ: poll.readable(true); break;
case SSL_ERROR_WANT_WRITE: poll.active(true); break;
// something is wrong, we proceed to the next state
default: return proceed(monitor);
}
}
}
};
/**

View File

@ -18,7 +18,7 @@
*/
#include "tcpoutbuffer.h"
#include "tcpinbuffer.h"
#include "tcpshutdown.h"
#include "tcpextstate.h"
#include "poll.h"
/**
@ -51,10 +51,10 @@ private:
size_t _reallocate = 0;
/**
* Have we already made the last report to the user (about an error or closed connection?)
* Did the user ask to elegantly close the connection?
* @var bool
*/
bool _finalized = false;
bool _closed = false;
/**
@ -66,19 +66,19 @@ private:
// some errors are ok and do not (necessarily) mean that we're disconnected
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false;
// tell the parent that it failed
_parent->onError(this, "connection lost");
// tell the parent that it failed (but not if the connection was elegantly closed)
if (!_closed) _parent->onError(this, "connection lost");
// done
return true;
}
/**
* Construct the shutdown state
* Construct the final state
* @param monitor Object that monitors whether connection still exists
* @return TcpState*
*/
TcpState *nextState(const Monitor &monitor)
TcpState *finalState(const Monitor &monitor)
{
// if the object is still in a valid state, we can treat the connection
// as closed otherwise there is no point in moving to a next state
@ -133,7 +133,7 @@ public:
auto result = _out.sendto(_socket);
// are we in an error state?
if (result < 0 && reportError()) return nextState(monitor);
if (result < 0 && reportError()) return finalState(monitor);
// if buffer is empty by now, we no longer have to check for
// writability, but only for readability
@ -147,7 +147,7 @@ public:
ssize_t result = _in.receivefrom(_socket, _parent->expected());
// are we in an error state?
if (result < 0 && reportError()) return nextState(monitor);
if (result < 0 && reportError()) return finalState(monitor);
// @todo should we also check for result == 0
@ -204,43 +204,28 @@ public:
_parent->onIdle(this, _socket, readable | writable);
}
/**
* Flush the connection, sent all buffered data to the socket
* @param monitor Object to check if connection still lives
* @return TcpState new tcp state
*/
virtual TcpState *flush(const Monitor &monitor) override
{
// create an object to wait for the filedescriptor to becomes active
Poll poll(_socket);
// keep running until the out buffer is not empty
while (_out)
{
// poll the socket, is it already writable?
if (!poll.writable(true)) return this;
// socket is writable, send as much data as possible
auto *newstate = process(monitor, _socket, writable);
// are we done
if (newstate != this) return newstate;
}
// all has been sent
return this;
}
/**
* Gracefully close the connection
* @return TcpState The next state
*/
virtual TcpState *close() override
{
// @todo what if we're still busy receiving data?
// do nothing if already closed
if (_closed) return this;
// remember that the connection is closed
_closed = true;
// we will shutdown the socket in a very elegant way, we notify the peer
// that we will not be sending out more write operations
shutdown(_socket, SHUT_WR);
// we still monitor the socket for readability to see if our close call was
// confirmed by the peer
_parent->onIdle(this, _socket, readable);
// start the tcp shutdown
return new TcpShutdown(this);
return this;
}
/**

View File

@ -111,37 +111,6 @@ void TcpConnection::process(int fd, int flags)
if (!assign(monitor, newstate)) delete newstate;
}
/**
* Flush the tcp connection
*/
void TcpConnection::flush()
{
// monitor the object for destruction
Monitor monitor(this);
// keep looping
while (true)
{
// get the old state
auto *oldstate = _state.get();
// flush the object
auto *newstate = _state->flush(monitor);
// done if object no longer exists
if (newstate == nullptr || newstate == oldstate || !monitor.valid()) return;
// replace the new state
if (assign(monitor, newstate)) continue;
// the "this" object was destructed
delete newstate;
// leap out because there is nothing left to do
return;
}
}
/**
* Close the connection
* @return bool

View File

@ -234,20 +234,6 @@ public:
return proceed(monitor);
}
/**
* Flush state / wait for the connection to complete
* @param monitor Object to check if connection still exists
* @return New implementation object
*/
virtual TcpState *flush(const Monitor &monitor) override
{
// just wait for the other thread to be ready
_thread.join();
// proceed to the next state
return proceed(monitor);
}
/**
* Send data over the connection
* @param buffer buffer to send

View File

@ -1,110 +0,0 @@
/**
* TcpShutdown.h
*
* State in the TCP handshake that is responsible for gracefully
* shutting down the connection by closing our side of the connection,
* and waiting for the server to close the connection on the other
* side too.
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "tcpextstate.h"
/**
* Begin of namespace
*/
namespace AMQP {
/**
* Class definition
*/
class TcpShutdown : public TcpExtState
{
public:
/**
* Constructor
* @param state The previous state
*/
TcpShutdown(TcpExtState *state) : TcpExtState(state)
{
// we will shutdown the socket in a very elegant way, we notify the peer
// that we will not be sending out more write operations
shutdown(_socket, SHUT_WR);
// we still monitor the socket for readability to see if our close call was
// confirmed by the peer
_parent->onIdle(this, _socket, readable);
}
/**
* Forbidden to copy
* @param that
*/
TcpShutdown(const TcpShutdown &that) = delete;
/**
* Destructor
*/
virtual ~TcpShutdown() = default;
/**
* Process the filedescriptor in the object
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @param fd The filedescriptor that is active
* @param flags AMQP::readable and/or AMQP::writable
* @return New implementation object
*/
virtual TcpState *process(const Monitor &monitor, int fd, int flags)
{
// must be the right filedescriptor
if (_socket != fd) return this;
// if the socket is not readable, we do not have to check anything
if (!(flags & readable)) return this;
// buffer to read data in
char buffer[64];
// read in data (we only do this to discover if the connection is really closed)
auto result = read(_socket, buffer, sizeof(buffer));
// if we read something, we keep on reading
if (result > 0) return this;
// or should we retry?
if (result < 0 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) return this;
// flush the connection to close the connection and report to the user
return flush(monitor);
}
/**
* Flush the connection, make sure all network operations are finished
* @param monitor Object to check if connection still exists
* @return TcpState New state
*/
virtual TcpState *flush(const Monitor &monitor) override
{
// immediately close the socket
cleanup();
// move to next state
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
};
/**
* End of namespace
*/
}

View File

@ -98,24 +98,6 @@ public:
*/
virtual TcpState *close() { return this; }
/**
* Flush the connection, all outgoing operations should be completed.
*
* If the state changes during the operation, the new state object should
* be returned instead, or nullptr if the user has closed the connection
* in the meantime. If the connection object got destructed by a user space
* call, this method should return nullptr. A monitor object is pass in to
* allow the flush() method to check if the connection still exists.
*
* If this object returns a new state object (instead of "this"), the
* connection object will immediately proceed with calling flush() on that
* new state object too.
*
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @return TcpState New implementation object
*/
virtual TcpState *flush(const Monitor &monitor) { return this; }
/**
* Install max-frame size
* @param heartbeat suggested heartbeat