From 59830c659c4ca6e3d335d738fb0f8f3572aae2bd Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Wed, 15 Jun 2016 04:57:46 -0400 Subject: [PATCH] added connection::maxFrame() and connection::expected() methods to give hints about the amount of data that the connection::parse() method ideally expects --- include/connection.h | 28 ++++++++++++++++++++++++++++ include/connectionimpl.h | 16 ++++++++++++++++ include/receivedframe.h | 9 ++++++++- src/connectionimpl.cpp | 37 +++++++++++++++++++++++++++++-------- src/receivedframe.cpp | 10 ++++++++++ 5 files changed, 91 insertions(+), 9 deletions(-) diff --git a/include/connection.h b/include/connection.h index eb65919..abe19e2 100644 --- a/include/connection.h +++ b/include/connection.h @@ -137,6 +137,34 @@ public: { return _implementation.parse(buffer); } + + /** + * Max frame size + * + * If you allocate memory to receive data that you are going to pass to the parse() method, + * it might be useful to have an insight in the max frame size. The parse() method process + * one frame at a time, so you must at least be able to read in buffers of this specific + * frame size. + * + * @return size_t + */ + uint32_t maxFrame() const + { + return _implementation.maxFrame(); + } + + /** + * Expected number of bytes for the next parse() call. + * + * This method returns the number of bytes that the next call to parse() at least expects to + * do something meaningful with it. + * + * @return size_t + */ + uint32_t expected() const + { + return _implementation.expected(); + } /** * Close the connection diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 564586b..13a8df0 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -78,6 +78,13 @@ protected: * @var uint32_t */ uint32_t _maxFrame = 10000; + + /** + * Number of expected bytes that will hold the next incoming frame + * We start with seven because that is the header of a frame + * @var uint32_t + */ + uint32_t _expected = 7; /** * The login for the server (login, password) @@ -238,6 +245,15 @@ public: // 8 bytes for header and end-of-frame byte return _maxFrame - 8; } + + /** + * The number of bytes that can best be passed to the next call to the parse() method + * @return uint32_t + */ + uint32_t expected() const + { + return _expected; + } /** * Add a channel to the connection, and return the channel ID that it diff --git a/include/receivedframe.h b/include/receivedframe.h index 7302a52..b2565b7 100644 --- a/include/receivedframe.h +++ b/include/receivedframe.h @@ -128,9 +128,16 @@ public: */ virtual ~ReceivedFrame() {} + /** + * Have we at least received the full frame header? + * The header contains the frame type, the channel ID and the payload size + * @return bool + */ + bool header() const; + /** * Is this a complete frame? - * @return integer + * @return bool */ bool complete() const; diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index f3f78a5..6541744 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -124,16 +124,30 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer) { // try to recognize the frame ReceivedFrame receivedFrame(ReducedBuffer(buffer, processed), _maxFrame); - if (!receivedFrame.complete()) return processed; + + // do we have the full frame? + if (receivedFrame.complete()) + { + // process the frame + receivedFrame.process(this); - // process the frame - receivedFrame.process(this); + // number of bytes processed + uint64_t bytes = receivedFrame.totalSize(); - // number of bytes processed - uint64_t bytes = receivedFrame.totalSize(); + // add bytes + processed += bytes; + } + else + { + // we do not yet have the complete frame, but if we do at least + // have the initial bytes of the header, we already know how much + // data we need for the next frame, otherwise we need at least 7 + // bytes for processing the header of the next frame + _expected = receivedFrame.header() ? receivedFrame.totalSize() : 7; - // add bytes - processed += bytes; + // we're ready for now + return processed; + } } catch (const ProtocolException &exception) { @@ -146,7 +160,14 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer) } // leap out if the connection object no longer exists - if (!monitor.valid() || !_closed || _state != state_connected) return processed; + if (!monitor.valid()) return processed; + + // the entire buffer has been processed, the next call to parse() should at least + // contain the size of the frame header to be meaningful for the amqp-cpp library + _expected = 7; + + // if the connection is being closed, we have to do more stuff, otherwise we're ready now + if (!_closed || _state != state_connected) return processed; // the close() function was called, but if the close frame was not yet sent // if there are no waiting channels, we can do that right now diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index eb7af5d..2c08d12 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -78,6 +78,7 @@ * Set up namespace */ namespace AMQP { + /** * Constructor * @param buffer Binary buffer @@ -111,6 +112,15 @@ ReceivedFrame::ReceivedFrame(const Buffer &buffer, uint32_t max) : _buffer(buffe } } +/** + * Have we received the header of the frame + * @return bool + */ +bool ReceivedFrame::header() const +{ + return _buffer.size() >= 7; +} + /** * Is this a complete frame? * @return integer