fixed that tcp connection ended up in an infinite loop when write operation failed because of broken pipe

This commit is contained in:
Emiel Bruijntjes 2015-11-23 09:40:54 +01:00
parent d5d5d41401
commit 38e4b97eed
2 changed files with 66 additions and 36 deletions

View File

@ -328,9 +328,13 @@ public:
/**
* Send the buffer to a socket
* @param socket
* @return ssize_t
*/
void sendto(int socket)
ssize_t sendto(int socket)
{
// total number of bytes written
ssize_t total = 0;
// keep looping
while (_size > 0)
{
@ -354,12 +358,18 @@ public:
// send the data
auto result = writev(socket, (const struct iovec *)&buffer, index);
// skip on error
if (result <= 0) return;
// skip on error, or when nothing was written
if (result <= 0) return total > 0 ? total : result;
// shrink the buffer
if (result > 0) shrink(result);
shrink(result);
// update total number of bytes written
total += 0;
}
// done
return total;
}
/**

View File

@ -48,6 +48,35 @@ private:
TcpBuffer _in;
/**
* Helper method to report an error
* @return bool Was an error reported?
*/
bool reportError()
{
// some errors are ok and do not (necessarily) mean that we're disconnected
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false;
// we have an error - report this to the user
_handler->onError(_connection, strerror(errno));
// done
return true;
}
/**
* Construct the next state
* @param monitor Object that monitors whether connection still exists
* @return TcpState*
*/
TcpState *nextState(const Monitor &monitor)
{
// if the object is still in a valid state, we can move to the close-state,
// otherwise there is no point in moving to a next state
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
public:
/**
* Constructor
@ -90,12 +119,18 @@ public:
{
// must be the socket
if (fd != _socket) return this;
// because the object might soon be destructed, we create a monitor to check this
Monitor monitor(this);
// can we write more data to the socket?
if (flags & writable)
{
// send out the buffered data
_out.sendto(_socket);
auto result = _out.sendto(_socket);
// are we in an error state?
if (result < 0 && reportError()) return nextState(monitor);
// if buffer is empty by now, we no longer have to check for
// writability, but only for readability
@ -108,39 +143,24 @@ public:
// read data from buffer
auto result = _in.receivefrom(_socket);
// because the object might soon be destructed, we create a monitor to check this
Monitor monitor(this);
// are we in an error state?
if (result < 0 && reportError()) return nextState(monitor);
// is the connection in an error state?
if (result < 0 && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)
{
// we have an error - report this to the user
_handler->onError(_connection, strerror(errno));
// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;
// we have a new state
return new TcpClosed(this);
}
else
{
// we need a local copy of the buffer - because it is possible that "this"
// object gets destructed halfway through the call to the parse() method
TcpBuffer buffer(std::move(_in));
// parse the buffer
auto processed = _connection->parse(buffer);
// we need a local copy of the buffer - because it is possible that "this"
// object gets destructed halfway through the call to the parse() method
TcpBuffer buffer(std::move(_in));
// parse the buffer
auto processed = _connection->parse(buffer);
// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;
// shrink buffer
buffer.shrink(processed);
// restore the buffer as member
std::swap(_in, buffer);
}
// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;
// shrink buffer
buffer.shrink(processed);
// restore the buffer as member
std::swap(_in, buffer);
}
// keep same object