fixed to parsing incoming tcp data

This commit is contained in:
Emiel Bruijntjes 2015-11-01 09:43:17 +01:00
parent df801953a1
commit b4bd61580b
6 changed files with 40 additions and 13 deletions

View File

@ -15,13 +15,13 @@
#include <map>
#include <unordered_map>
#include <queue>
#include <set>
#include <limits>
#include <cstddef>
#include <cstring>
#include <stdexcept>
#include <utility>
#include <iostream>
#include <algorithm>
// base C include files
#include <stdint.h>

View File

@ -21,9 +21,9 @@ class Watchable
private:
/**
* The monitors
* @var set
* @var std::vector
*/
std::set<Monitor*> _monitors;
std::vector<Monitor*> _monitors;
/**
* Add a monitor
@ -31,7 +31,8 @@ private:
*/
void add(Monitor *monitor)
{
_monitors.insert(monitor);
// add to the vector
_monitors.push_back(monitor);
}
/**
@ -40,7 +41,11 @@ private:
*/
void remove(Monitor *monitor)
{
_monitors.erase(monitor);
// put the monitor at the end of the vector
auto iter = std::remove(_monitors.begin(), _monitors.end(), monitor);
// make the vector smaller
_monitors.erase(iter, _monitors.end());
}
public:

View File

@ -12,12 +12,12 @@
#include <string.h>
#include <stdint.h>
#include <string>
#include <set>
#include <memory>
#include <limits>
#include <ostream>
#include <math.h>
#include <map>
#include <algorithm>
#include <unordered_map>
#include <vector>
#include <queue>

View File

@ -179,6 +179,7 @@ public:
{
// prepare for next iteration
pos -= buffer.size();
togo -= buffer.size();
}
}
@ -186,7 +187,7 @@ public:
if (empty > 0) _buffers.resize(_buffers.size() - empty);
// done
return result->data();
return result->data() + pos;
}
/**
@ -353,7 +354,10 @@ public:
// check the number of bytes that are available - in case of an error or
// when the buffer is very small, we use a lower limit of 512 bytes
if (ioctl(socket, FIONREAD, &available) != 0 || available < 512) return available = 512;
if (ioctl(socket, FIONREAD, &available) != 0) return -1;
// no need to read anything if there is no input
if (available == 0) return 0;
// add a new buffer
_buffers.emplace_back(available);
@ -362,7 +366,16 @@ public:
auto &buffer = _buffers.back();
// read data into the buffer
return read(socket, buffer.data(), available);
auto result = read(socket, buffer.data(), available);
// update total buffer size
if (result > 0) _size += result;
// if buffer is not full
if (result < available) buffer.resize(std::max(0L, result));
// done
return result;
}
};

View File

@ -26,7 +26,7 @@ namespace AMQP {
/**
* Class definition
*/
class TcpConnected : public TcpState
class TcpConnected : public TcpState, private Watchable
{
private:
/**
@ -108,13 +108,17 @@ public:
// read data from buffer
auto result = _in.receivefrom(_socket);
// because the object might soon be destructed, we create a monitor to check this
Monitor monitor(this);
// is the connection in an error state?
if (result < 0 && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)
{
// we have an error - report this to the user
_handler->onError(_connection, strerror(errno));
// @todo object might no longer exist after this error
// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;
// we have a new state
return new TcpClosed(this);
@ -123,6 +127,9 @@ public:
{
// parse the buffer
auto processed = _connection->parse(_in);
// "this" could be removed by now, check this
if (!monitor.valid()) return nullptr;
// shrink buffer
_in.shrink(processed);

View File

@ -50,8 +50,10 @@ void TcpConnection::process(int fd, int flags)
// pass on the the state, that returns a new impl
auto *result = _state->process(fd, flags);
// skip if the same state is continued to be used
if (result == _state) return;
// skip if the same state is continued to be used, or when the process()
// method returns nullptr (which only happens when the object is destructed,
// and "this" is no longer valid)
if (!result || result == _state) return;
// remove old state
delete _state;