added method tcpconnection::flush()

This commit is contained in:
Emiel Bruijntjes 2016-09-19 10:19:06 +02:00
parent 5bb7b1a36f
commit ed2ffd3f03
6 changed files with 111 additions and 7 deletions

View File

@ -130,6 +130,14 @@ public:
*/
void process(int fd, int flags);
/**
* Flush the connection - all unsent bytes are sent to the socket rigth away
* This is a blocking operation. The connection object normally only sends data
* when the socket is known to be writable, but with this method you can force
* the outgoing buffer to be fushed
*/
void flush();
/**
* Close the connection
* This closes all channels and the TCP connection

View File

@ -4,7 +4,7 @@
* Class that is used when the TCP connection ends up in a closed state
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 Copernica BV
* @copyright 2015 - 2016 Copernica BV
*/
/**

View File

@ -83,6 +83,29 @@ private:
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
/**
* Wait until the socket is writable
* @return bool
*/
bool wait4writable()
{
// we need the fd-sets
fd_set readables, writables, exceptions;
// initialize all the sets
FD_ZERO(&readables);
FD_ZERO(&writables);
FD_ZERO(&exceptions);
// add the one socket
FD_SET(_socket, &writables);
// wait for the socket
auto result = select(_socket + 1, &readables, &writables, &exceptions, nullptr);
// check for success
return result == 0;
}
public:
/**
@ -203,6 +226,29 @@ public:
// start monitoring the socket to find out when it is writable
_handler->monitor(_connection, _socket, readable | writable);
}
/**
* Flush the connection, sent all buffered data to the socket
* @return TcpState new tcp state
*/
virtual TcpState *flush() override
{
// keep running until the out buffer is empty
while (_out)
{
// poll the socket, is it already writable?
if (!wait4writable()) return this;
// socket is writable, send as much data as possible
auto *newstate = process(_socket, writable);
// are we done
if (newstate != this) return newstate;
}
// all has been sent
return this;
}
/**
* Report that heartbeat negotiation is going on

View File

@ -4,7 +4,7 @@
* Implementation file for the TCP connection
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 Copernica BV
* @copyright 2015 - 2016 Copernica BV
*/
/**
@ -56,6 +56,31 @@ void TcpConnection::process(int fd, int flags)
_state.reset(result);
}
/**
* Flush the tcp connection
*/
void TcpConnection::flush()
{
// monitor the object for destruction
Monitor monitor(this);
// keep looping
while (true)
{
// flush the object
auto *newstate = _state->flush();
// done if object no longer exists
if (!monitor.valid()) return;
// also done if the object is still in the same state
if (newstate == _state.get()) return;
// replace the new state
_state.reset(newstate);
}
}
/**
* Method that is called when the heartbeat frequency is negotiated.
* @param connection The connection that suggested a heartbeat interval

View File

@ -5,7 +5,7 @@
* server, and to make the initial connection
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 Copernica BV
* @copyright 2015 - 2016 Copernica BV
*/
/**
@ -173,9 +173,9 @@ public:
/**
* Wait for the resolver to be ready
* @param fd The filedescriptor that is active
* @param flags Flags to indicate that fd is readable and/or writable
* @return New implementation object
* @param fd The filedescriptor that is active
* @param flags Flags to indicate that fd is readable and/or writable
* @return New implementation object
*/
virtual TcpState *process(int fd, int flags) override
{
@ -191,6 +191,25 @@ public:
// create dummy implementation
return new TcpClosed(_connection, _handler);
}
/**
* Flush state / wait for the connection to complete
* @return New implementation object
*/
virtual TcpState *flush() override
{
// just wait for the other thread to be ready
_thread.join();
// do we have a valid socket?
if (_socket >= 0) return new TcpConnected(_connection, _socket, std::move(_buffer), _handler);
// report error
_handler->onError(_connection, _error.data());
// create dummy implementation
return new TcpClosed(_connection, _handler);
}
/**
* Send data over the connection

View File

@ -4,7 +4,7 @@
* Base class / interface of the various states of the TCP connection
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 Copernica BV
* @copyright 2015 - 2016 Copernica BV
*/
/**
@ -90,6 +90,12 @@ public:
return _handler->onNegotiate(_connection, heartbeat);
}
/**
* Flush the connection
* @return TcpState new implementation object
*/
virtual TcpState *flush() { return this; }
/**
* Report to the handler that the object is in an error state
* @param error