From 945a01f65982ca9bb1c26629b4f23fb48b9c489f Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Wed, 13 Aug 2014 13:01:27 +0200 Subject: [PATCH] The Connection::parse() method can now also be called with a buffer object, which allows the user to implement the buffer much more efficiently: it no longer has to be an array --- amqpcpp.h | 4 +- include/buffer.h | 37 ++++++++++++++ include/bytebuffer.h | 42 ++++++++++++++++ include/connection.h | 11 ++--- include/connectionimpl.h | 3 +- include/receivedframe.h | 17 ++++--- src/connectionimpl.cpp | 10 ++-- src/framecheck.h | 7 ++- src/receivedframe.cpp | 55 ++++++++++----------- src/reducedbuffer.h | 103 +++++++++++++++++++++++++++++++++++++++ 10 files changed, 235 insertions(+), 54 deletions(-) create mode 100644 src/reducedbuffer.h diff --git a/amqpcpp.h b/amqpcpp.h index 8940e73..c94a43a 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -31,14 +31,14 @@ #include // utility classes +#include +#include #include #include #include #include // amqp types -#include -#include #include #include #include diff --git a/include/buffer.h b/include/buffer.h index 73eaf13..71bee74 100644 --- a/include/buffer.h +++ b/include/buffer.h @@ -30,7 +30,44 @@ namespace AMQP { */ class Buffer { +public: + /** + * Total size of the buffer + * @return size_t + */ + virtual size_t size() const = 0; + /** + * Get access to a single byte + * + * No safety checks are necessary: this method will only be called + * for bytes that actually exist + * + * @param pos position in the buffer + * @return char value of the byte in the buffer + */ + virtual char byte(size_t pos) const = 0; + + /** + * Get access to the raw data + * @param pos position in the buffer + * @param size number of continuous bytes + * @return char* + */ + virtual const char *data(size_t pos, size_t size) const = 0; + + /** + * Copy bytes to a buffer + * + * No safety checks are necessary: this method will only be called + * for bytes that actually exist + * + * @param pos position in the buffer + * @param size number of bytes to copy + * @param buffer buffer to copy into + * @return void* pointer to buffer + */ + virtual void *copy(size_t pos, size_t size, void *buffer) const = 0; }; diff --git a/include/bytebuffer.h b/include/bytebuffer.h index 7074ee0..dd256ab 100644 --- a/include/bytebuffer.h +++ b/include/bytebuffer.h @@ -49,6 +49,48 @@ public: */ virtual ~ByteBuffer() {} + /** + * Total size of the buffer + * @return size_t + */ + virtual size_t size() const override + { + return _size; + } + + /** + * Get access to a single byte + * @param pos position in the buffer + * @return char value of the byte in the buffer + */ + virtual char byte(size_t pos) const override + { + return _data[pos]; + } + + /** + * Get access to the raw data + * @param pos position in the buffer + * @param size number of continuous bytes + * @return char* + */ + virtual const char *data(size_t pos, size_t size) const override + { + return _data + pos; + } + + /** + * Copy bytes to a buffer + * @param pos position in the buffer + * @param size number of bytes to copy + * @param buffer buffer to copy into + * @return size_t pointer to buffer + */ + virtual void *copy(size_t pos, size_t size, void *buffer) const override + { + return memcpy(buffer, _data + pos, size); + } + }; /** diff --git a/include/connection.h b/include/connection.h index c1fccea..99c616d 100644 --- a/include/connection.h +++ b/include/connection.h @@ -77,8 +77,7 @@ public: */ size_t parse(const char *buffer, size_t size) { - //return _implementation.parse(ByteBuffer(buffer, size)); - return _implementation.parse(buffer, size); + return _implementation.parse(ByteBuffer(buffer, size)); } /** @@ -99,10 +98,10 @@ public: * @param buffer buffer to decode * @return number of bytes that were processed */ - //size_t parse(const Buffer &buffer) - //{ - // return _implementation.parse(buffer); - //} + size_t parse(const Buffer &buffer) + { + return _implementation.parse(buffer); + } /** * Close the connection diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 530888a..53adf0c 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -231,10 +231,9 @@ public: * later call. * * @param buffer buffer to decode - * @param size size of the buffer to decode * @return number of bytes that were processed */ - size_t parse(const char *buffer, size_t size); + size_t parse(const Buffer &buffer); /** * Close the connection diff --git a/include/receivedframe.h b/include/receivedframe.h index 03bd0ae..68ed145 100644 --- a/include/receivedframe.h +++ b/include/receivedframe.h @@ -1,4 +1,3 @@ -#pragma once /** * ReceivedFrame.h * @@ -12,6 +11,11 @@ * @documentation public */ +/** + * Include guard + */ +#pragma once + /** * Set up namespace */ @@ -25,15 +29,15 @@ class ReceivedFrame private: /** * The buffer we are reading from - * @var char* + * @var Buffer */ - const char *_buffer = nullptr; + const Buffer &_buffer; /** - * Number of bytes left to retrieve + * Number of bytes already processed * @var uint32_t */ - uint32_t _left = 0; + uint32_t _skip = 0; /** * Type of frame @@ -115,10 +119,9 @@ public: /** * Constructor * @param buffer Binary buffer - * @param size Size of the buffer * @param max Max buffer size */ - ReceivedFrame(const char *buffer, uint32_t size, uint32_t max); + ReceivedFrame(const Buffer &buffer, uint32_t max); /** * Destructor diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index a26f903..6d91c04 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -9,6 +9,7 @@ #include "protocolheaderframe.h" #include "connectioncloseokframe.h" #include "connectioncloseframe.h" +#include "reducedbuffer.h" /** * set namespace @@ -101,10 +102,9 @@ void ConnectionImpl::remove(ChannelImpl *channel) * later call. * * @param buffer buffer to decode - * @param size size of the buffer to decode * @return number of bytes that were processed */ -size_t ConnectionImpl::parse(const char *buffer, size_t size) +size_t ConnectionImpl::parse(const Buffer &buffer) { // do not parse if already in an error state if (_state == state_closed) return 0; @@ -117,13 +117,13 @@ size_t ConnectionImpl::parse(const char *buffer, size_t size) // keep looping until we have processed all bytes, and the monitor still // indicates that the connection is in a valid state - while (size > 0 && monitor.valid()) + while (processed < buffer.size() && monitor.valid()) { // prevent protocol exceptions try { // try to recognize the frame - ReceivedFrame receivedFrame(buffer, size, _maxFrame); + ReceivedFrame receivedFrame(ReducedBuffer(buffer, processed), _maxFrame); if (!receivedFrame.complete()) return processed; // process the frame @@ -133,7 +133,7 @@ size_t ConnectionImpl::parse(const char *buffer, size_t size) size_t bytes = receivedFrame.totalSize(); // add bytes - processed += bytes; size -= bytes; buffer += bytes; + processed += bytes; } catch (const ProtocolException &exception) { diff --git a/src/framecheck.h b/src/framecheck.h index af161db..ab8c959 100644 --- a/src/framecheck.h +++ b/src/framecheck.h @@ -38,7 +38,7 @@ public: FrameCheck(ReceivedFrame *frame, size_t size) : _frame(frame), _size(size) { // no problem is there are still enough bytes left - if (frame->_left >= size) return; + if (frame->_buffer.size() - frame->_skip >= size) return; // frame buffer is too small throw ProtocolException("frame out of range"); @@ -49,9 +49,8 @@ public: */ virtual ~FrameCheck() { - // update the buffer and the number of bytes left - _frame->_buffer += _size; - _frame->_left -= _size; + // update the number of bytes to skip + _frame->_skip += _size; } }; diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index 87aaba0..5010591 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -80,13 +80,12 @@ namespace AMQP { /** * Constructor * @param buffer Binary buffer - * @param size Size of the buffer * @param max Max size for a frame */ -ReceivedFrame::ReceivedFrame(const char *buffer, uint32_t size, uint32_t max) : _buffer(buffer), _left(size) +ReceivedFrame::ReceivedFrame(const Buffer &buffer, uint32_t max) : _buffer(buffer) { // we need enough room for type, channel, the payload size and the end-of-frame byte - if (size < 8) return; + if (buffer.size() < 8) return; // get the information _type = nextUint8(); @@ -97,10 +96,10 @@ ReceivedFrame::ReceivedFrame(const char *buffer, uint32_t size, uint32_t max) : if (max > 0 && _payloadSize > max - 8) throw ProtocolException("frame size exceeded"); // check if the buffer is big enough to contain all data - if (size >= _payloadSize + 8) + if (buffer.size() >= _payloadSize + 8) { // buffer is big enough, check for a valid end-of-frame marker - if ((int)buffer[_payloadSize+7] != -50) throw ProtocolException("invalid end of frame marker"); + if ((int)buffer.byte(_payloadSize+7) != -50) throw ProtocolException("invalid end of frame marker"); } else { @@ -130,7 +129,7 @@ uint8_t ReceivedFrame::nextUint8() FrameCheck check(this, 1); // get a byte - return (uint8_t)_buffer[0]; + return _buffer.byte(_skip); } /** @@ -145,7 +144,7 @@ int8_t ReceivedFrame::nextInt8() FrameCheck check(this, 1); // get a byte - return (int8_t)_buffer[0]; + return (int8_t)_buffer.byte(_skip); } /** @@ -156,11 +155,11 @@ int8_t ReceivedFrame::nextInt8() uint16_t ReceivedFrame::nextUint16() { // check if there is enough size - FrameCheck check(this, 2); + FrameCheck check(this, sizeof(uint16_t)); // get two bytes, and convert to host-byte-order uint16_t value; - memcpy(&value, _buffer, sizeof(uint16_t)); + _buffer.copy(_skip, sizeof(uint16_t), &value); return be16toh(value); } @@ -172,11 +171,11 @@ uint16_t ReceivedFrame::nextUint16() int16_t ReceivedFrame::nextInt16() { // check if there is enough size - FrameCheck check(this, 2); + FrameCheck check(this, sizeof(int16_t)); // get two bytes, and convert to host-byte-order int16_t value; - memcpy(&value, _buffer, sizeof(int16_t)); + _buffer.copy(_skip, sizeof(int16_t), &value); return be16toh(value); } @@ -188,11 +187,11 @@ int16_t ReceivedFrame::nextInt16() uint32_t ReceivedFrame::nextUint32() { // check if there is enough size - FrameCheck check(this, 4); + FrameCheck check(this, sizeof(uint32_t)); // get four bytes, and convert to host-byte-order uint32_t value; - memcpy(&value, _buffer, sizeof(uint32_t)); + _buffer.copy(_skip, sizeof(uint32_t), &value); return be32toh(value); } @@ -204,11 +203,11 @@ uint32_t ReceivedFrame::nextUint32() int32_t ReceivedFrame::nextInt32() { // check if there is enough size - FrameCheck check(this, 4); + FrameCheck check(this, sizeof(int32_t)); // get four bytes, and convert to host-byte-order int32_t value; - memcpy(&value, _buffer, sizeof(int32_t)); + _buffer.copy(_skip, sizeof(int32_t), &value); return be32toh(value); } @@ -220,11 +219,11 @@ int32_t ReceivedFrame::nextInt32() uint64_t ReceivedFrame::nextUint64() { // check if there is enough size - FrameCheck check(this, 8); + FrameCheck check(this, sizeof(uint64_t)); // get eight bytes, and convert to host-byte-order uint64_t value; - memcpy(&value, _buffer, sizeof(uint64_t)); + _buffer.copy(_skip, sizeof(uint64_t), &value); return be64toh(value); } @@ -236,11 +235,11 @@ uint64_t ReceivedFrame::nextUint64() int64_t ReceivedFrame::nextInt64() { // check if there is enough size - FrameCheck check(this, 8); + FrameCheck check(this, sizeof(int64_t)); // get eight bytes, and convert to host-byte-order int64_t value; - memcpy(&value, _buffer, sizeof(int64_t)); + _buffer.copy(_skip, sizeof(int64_t), &value); return be64toh(value); } @@ -252,12 +251,12 @@ int64_t ReceivedFrame::nextInt64() float ReceivedFrame::nextFloat() { // check if there is enough size - FrameCheck check(this, 4); + FrameCheck check(this, sizeof(float)); // get four bytes - float result; - memcpy(&result, _buffer, sizeof(float)); - return result; + float value; + _buffer.copy(_skip, sizeof(float), &value); + return value; } /** @@ -268,12 +267,12 @@ float ReceivedFrame::nextFloat() double ReceivedFrame::nextDouble() { // check if there is enough size - FrameCheck check(this, 8); + FrameCheck check(this, sizeof(double)); // get eight bytes, and convert to host-byte-order - double result; - memcpy(&result, _buffer, sizeof(double)); - return result; + double value; + _buffer.copy(_skip, sizeof(double), &value); + return value; } /** @@ -287,7 +286,7 @@ const char * ReceivedFrame::nextData(uint32_t size) FrameCheck check(this, size); // get the data - return _buffer; + return _buffer.data(_skip, size); } /** diff --git a/src/reducedbuffer.h b/src/reducedbuffer.h new file mode 100644 index 0000000..9e37149 --- /dev/null +++ b/src/reducedbuffer.h @@ -0,0 +1,103 @@ +/** + * ReducedBuffer.h + * + * Wrapper around a buffer with a number of bytes to skip + * + * @author Emiel Bruijntjes + * @copyright 2014 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Open namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class ReducedBuffer : public Buffer +{ +private: + /** + * Pointer to the original buffer + * @var Buffer + */ + const Buffer &_buffer; + + /** + * Number of bytes to skip + * @var size_t + */ + size_t _skip; + +public: + /** + * Constructor + * @param buffer + * @param skip + */ + ReducedBuffer(const Buffer &buffer, size_t skip) : _buffer(buffer), _skip(skip) {} + + /** + * Destructor + */ + virtual ~ReducedBuffer(); + + /** + * Total size of the buffer + * @return size_t + */ + virtual size_t size() const override + { + return _buffer.size() - _skip; + } + + /** + * Get access to a single byte + * @param pos position in the buffer + * @return char value of the byte in the buffer + */ + virtual char byte(size_t pos) const override + { + return _buffer.byte(pos + _skip); + } + + /** + * Get access to the raw data + * @param pos position in the buffer + * @param size number of continuous bytes + * @return char* + */ + virtual const char *data(size_t pos, size_t size) const override + { + return _buffer.data(pos + _skip, size); + } + + /** + * Copy bytes to a buffer + * + * No safety checks are necessary: this method will only be called + * for bytes that actually exist + * + * @param pos position in the buffer + * @param size number of bytes to copy + * @param buffer buffer to copy into + * @return void* pointer to buffer + */ + virtual void *copy(size_t pos, size_t size, void *buffer) const override + { + return _buffer.copy(pos + _skip, size, buffer); + } +}; + +/** + * End of namespace + */ +} + +