From b4bd61580bed2eaa0c5cb060ee7217a851dd8fc0 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Sun, 1 Nov 2015 09:43:17 +0100 Subject: [PATCH] fixed to parsing incoming tcp data --- amqpcpp.h | 2 +- include/watchable.h | 13 +++++++++---- src/includes.h | 2 +- src/tcpbuffer.h | 19 ++++++++++++++++--- src/tcpconnected.h | 11 +++++++++-- src/tcpconnection.cpp | 6 ++++-- 6 files changed, 40 insertions(+), 13 deletions(-) diff --git a/amqpcpp.h b/amqpcpp.h index de21bf9..008a2c1 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -15,13 +15,13 @@ #include #include #include -#include #include #include #include #include #include #include +#include // base C include files #include diff --git a/include/watchable.h b/include/watchable.h index a81e71d..1b46e42 100644 --- a/include/watchable.h +++ b/include/watchable.h @@ -21,9 +21,9 @@ class Watchable private: /** * The monitors - * @var set + * @var std::vector */ - std::set _monitors; + std::vector _monitors; /** * Add a monitor @@ -31,7 +31,8 @@ private: */ void add(Monitor *monitor) { - _monitors.insert(monitor); + // add to the vector + _monitors.push_back(monitor); } /** @@ -40,7 +41,11 @@ private: */ void remove(Monitor *monitor) { - _monitors.erase(monitor); + // put the monitor at the end of the vector + auto iter = std::remove(_monitors.begin(), _monitors.end(), monitor); + + // make the vector smaller + _monitors.erase(iter, _monitors.end()); } public: diff --git a/src/includes.h b/src/includes.h index 5f3f53d..5759233 100644 --- a/src/includes.h +++ b/src/includes.h @@ -12,12 +12,12 @@ #include #include #include -#include #include #include #include #include #include +#include #include #include #include diff --git a/src/tcpbuffer.h b/src/tcpbuffer.h index 48cf70b..4320e99 100644 --- a/src/tcpbuffer.h +++ b/src/tcpbuffer.h @@ -179,6 +179,7 @@ public: { // prepare for next iteration pos -= buffer.size(); + togo -= buffer.size(); } } @@ -186,7 +187,7 @@ public: if (empty > 0) _buffers.resize(_buffers.size() - empty); // done - return result->data(); + return result->data() + pos; } /** @@ -353,7 +354,10 @@ public: // check the number of bytes that are available - in case of an error or // when the buffer is very small, we use a lower limit of 512 bytes - if (ioctl(socket, FIONREAD, &available) != 0 || available < 512) return available = 512; + if (ioctl(socket, FIONREAD, &available) != 0) return -1; + + // no need to read anything if there is no input + if (available == 0) return 0; // add a new buffer _buffers.emplace_back(available); @@ -362,7 +366,16 @@ public: auto &buffer = _buffers.back(); // read data into the buffer - return read(socket, buffer.data(), available); + auto result = read(socket, buffer.data(), available); + + // update total buffer size + if (result > 0) _size += result; + + // if buffer is not full + if (result < available) buffer.resize(std::max(0L, result)); + + // done + return result; } }; diff --git a/src/tcpconnected.h b/src/tcpconnected.h index 41ce26c..02db0ae 100644 --- a/src/tcpconnected.h +++ b/src/tcpconnected.h @@ -26,7 +26,7 @@ namespace AMQP { /** * Class definition */ -class TcpConnected : public TcpState +class TcpConnected : public TcpState, private Watchable { private: /** @@ -108,13 +108,17 @@ public: // read data from buffer auto result = _in.receivefrom(_socket); + // because the object might soon be destructed, we create a monitor to check this + Monitor monitor(this); + // is the connection in an error state? if (result < 0 && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { // we have an error - report this to the user _handler->onError(_connection, strerror(errno)); - // @todo object might no longer exist after this error + // "this" could be removed by now, check this + if (!monitor.valid()) return nullptr; // we have a new state return new TcpClosed(this); @@ -123,6 +127,9 @@ public: { // parse the buffer auto processed = _connection->parse(_in); + + // "this" could be removed by now, check this + if (!monitor.valid()) return nullptr; // shrink buffer _in.shrink(processed); diff --git a/src/tcpconnection.cpp b/src/tcpconnection.cpp index 8c92865..f383473 100644 --- a/src/tcpconnection.cpp +++ b/src/tcpconnection.cpp @@ -50,8 +50,10 @@ void TcpConnection::process(int fd, int flags) // pass on the the state, that returns a new impl auto *result = _state->process(fd, flags); - // skip if the same state is continued to be used - if (result == _state) return; + // skip if the same state is continued to be used, or when the process() + // method returns nullptr (which only happens when the object is destructed, + // and "this" is no longer valid) + if (!result || result == _state) return; // remove old state delete _state;