Merge branch 'master' of https://github.com/CopernicaMarketingSoftware/AMQP-CPP
This commit is contained in:
commit
9ec63a1869
|
|
@ -328,9 +328,13 @@ public:
|
||||||
/**
|
/**
|
||||||
* Send the buffer to a socket
|
* Send the buffer to a socket
|
||||||
* @param socket
|
* @param socket
|
||||||
|
* @return ssize_t
|
||||||
*/
|
*/
|
||||||
void sendto(int socket)
|
ssize_t sendto(int socket)
|
||||||
{
|
{
|
||||||
|
// total number of bytes written
|
||||||
|
ssize_t total = 0;
|
||||||
|
|
||||||
// keep looping
|
// keep looping
|
||||||
while (_size > 0)
|
while (_size > 0)
|
||||||
{
|
{
|
||||||
|
|
@ -354,12 +358,18 @@ public:
|
||||||
// send the data
|
// send the data
|
||||||
auto result = writev(socket, (const struct iovec *)&buffer, index);
|
auto result = writev(socket, (const struct iovec *)&buffer, index);
|
||||||
|
|
||||||
// skip on error
|
// skip on error, or when nothing was written
|
||||||
if (result <= 0) return;
|
if (result <= 0) return total > 0 ? total : result;
|
||||||
|
|
||||||
// shrink the buffer
|
// shrink the buffer
|
||||||
if (result > 0) shrink(result);
|
shrink(result);
|
||||||
|
|
||||||
|
// update total number of bytes written
|
||||||
|
total += 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// done
|
||||||
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,35 @@ private:
|
||||||
TcpBuffer _in;
|
TcpBuffer _in;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to report an error
|
||||||
|
* @return bool Was an error reported?
|
||||||
|
*/
|
||||||
|
bool reportError()
|
||||||
|
{
|
||||||
|
// some errors are ok and do not (necessarily) mean that we're disconnected
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false;
|
||||||
|
|
||||||
|
// we have an error - report this to the user
|
||||||
|
_handler->onError(_connection, strerror(errno));
|
||||||
|
|
||||||
|
// done
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct the next state
|
||||||
|
* @param monitor Object that monitors whether connection still exists
|
||||||
|
* @return TcpState*
|
||||||
|
*/
|
||||||
|
TcpState *nextState(const Monitor &monitor)
|
||||||
|
{
|
||||||
|
// if the object is still in a valid state, we can move to the close-state,
|
||||||
|
// otherwise there is no point in moving to a next state
|
||||||
|
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
|
@ -90,12 +119,18 @@ public:
|
||||||
{
|
{
|
||||||
// must be the socket
|
// must be the socket
|
||||||
if (fd != _socket) return this;
|
if (fd != _socket) return this;
|
||||||
|
|
||||||
|
// because the object might soon be destructed, we create a monitor to check this
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
// can we write more data to the socket?
|
// can we write more data to the socket?
|
||||||
if (flags & writable)
|
if (flags & writable)
|
||||||
{
|
{
|
||||||
// send out the buffered data
|
// send out the buffered data
|
||||||
_out.sendto(_socket);
|
auto result = _out.sendto(_socket);
|
||||||
|
|
||||||
|
// are we in an error state?
|
||||||
|
if (result < 0 && reportError()) return nextState(monitor);
|
||||||
|
|
||||||
// if buffer is empty by now, we no longer have to check for
|
// if buffer is empty by now, we no longer have to check for
|
||||||
// writability, but only for readability
|
// writability, but only for readability
|
||||||
|
|
@ -108,39 +143,24 @@ public:
|
||||||
// read data from buffer
|
// read data from buffer
|
||||||
auto result = _in.receivefrom(_socket);
|
auto result = _in.receivefrom(_socket);
|
||||||
|
|
||||||
// because the object might soon be destructed, we create a monitor to check this
|
// are we in an error state?
|
||||||
Monitor monitor(this);
|
if (result < 0 && reportError()) return nextState(monitor);
|
||||||
|
|
||||||
// is the connection in an error state?
|
// we need a local copy of the buffer - because it is possible that "this"
|
||||||
if (result < 0 && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)
|
// object gets destructed halfway through the call to the parse() method
|
||||||
{
|
TcpBuffer buffer(std::move(_in));
|
||||||
// we have an error - report this to the user
|
|
||||||
_handler->onError(_connection, strerror(errno));
|
// parse the buffer
|
||||||
|
auto processed = _connection->parse(buffer);
|
||||||
// "this" could be removed by now, check this
|
|
||||||
if (!monitor.valid()) return nullptr;
|
|
||||||
|
|
||||||
// we have a new state
|
|
||||||
return new TcpClosed(this);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// we need a local copy of the buffer - because it is possible that "this"
|
|
||||||
// object gets destructed halfway through the call to the parse() method
|
|
||||||
TcpBuffer buffer(std::move(_in));
|
|
||||||
|
|
||||||
// parse the buffer
|
|
||||||
auto processed = _connection->parse(buffer);
|
|
||||||
|
|
||||||
// "this" could be removed by now, check this
|
// "this" could be removed by now, check this
|
||||||
if (!monitor.valid()) return nullptr;
|
if (!monitor.valid()) return nullptr;
|
||||||
|
|
||||||
// shrink buffer
|
// shrink buffer
|
||||||
buffer.shrink(processed);
|
buffer.shrink(processed);
|
||||||
|
|
||||||
// restore the buffer as member
|
// restore the buffer as member
|
||||||
std::swap(_in, buffer);
|
std::swap(_in, buffer);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// keep same object
|
// keep same object
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue