The Connection::parse() method can now also be called with a buffer object, which allows the user to implement the buffer much more efficiently: it no longer has to be an array

This commit is contained in:
Emiel Bruijntjes 2014-08-13 13:01:27 +02:00
parent d99b117a60
commit 945a01f659
10 changed files with 235 additions and 54 deletions

View File

@ -31,14 +31,14 @@
#include <amqpcpp/classes.h>
// utility classes
#include <amqpcpp/buffer.h>
#include <amqpcpp/bytebuffer.h>
#include <amqpcpp/receivedframe.h>
#include <amqpcpp/outbuffer.h>
#include <amqpcpp/watchable.h>
#include <amqpcpp/monitor.h>
// amqp types
#include <amqpcpp/buffer.h>
#include <amqpcpp/bytebuffer.h>
#include <amqpcpp/field.h>
#include <amqpcpp/numericfield.h>
#include <amqpcpp/decimalfield.h>

View File

@ -30,7 +30,44 @@ namespace AMQP {
*/
class Buffer
{
public:
/**
* Total size of the buffer
* @return size_t
*/
virtual size_t size() const = 0;
/**
* Get access to a single byte
*
* No safety checks are necessary: this method will only be called
* for bytes that actually exist
*
* @param pos position in the buffer
* @return char value of the byte in the buffer
*/
virtual char byte(size_t pos) const = 0;
/**
* Get access to the raw data
* @param pos position in the buffer
* @param size number of continuous bytes
* @return char*
*/
virtual const char *data(size_t pos, size_t size) const = 0;
/**
* Copy bytes to a buffer
*
* No safety checks are necessary: this method will only be called
* for bytes that actually exist
*
* @param pos position in the buffer
* @param size number of bytes to copy
* @param buffer buffer to copy into
* @return void* pointer to buffer
*/
virtual void *copy(size_t pos, size_t size, void *buffer) const = 0;
};

View File

@ -49,6 +49,48 @@ public:
*/
virtual ~ByteBuffer() {}
/**
* Total size of the buffer
* @return size_t
*/
virtual size_t size() const override
{
return _size;
}
/**
* Get access to a single byte
* @param pos position in the buffer
* @return char value of the byte in the buffer
*/
virtual char byte(size_t pos) const override
{
return _data[pos];
}
/**
* Get access to the raw data
* @param pos position in the buffer
* @param size number of continuous bytes
* @return char*
*/
virtual const char *data(size_t pos, size_t size) const override
{
return _data + pos;
}
/**
* Copy bytes to a buffer
* @param pos position in the buffer
* @param size number of bytes to copy
* @param buffer buffer to copy into
* @return size_t pointer to buffer
*/
virtual void *copy(size_t pos, size_t size, void *buffer) const override
{
return memcpy(buffer, _data + pos, size);
}
};
/**

View File

@ -77,8 +77,7 @@ public:
*/
size_t parse(const char *buffer, size_t size)
{
//return _implementation.parse(ByteBuffer(buffer, size));
return _implementation.parse(buffer, size);
return _implementation.parse(ByteBuffer(buffer, size));
}
/**
@ -99,10 +98,10 @@ public:
* @param buffer buffer to decode
* @return number of bytes that were processed
*/
//size_t parse(const Buffer &buffer)
//{
// return _implementation.parse(buffer);
//}
size_t parse(const Buffer &buffer)
{
return _implementation.parse(buffer);
}
/**
* Close the connection

View File

@ -231,10 +231,9 @@ public:
* later call.
*
* @param buffer buffer to decode
* @param size size of the buffer to decode
* @return number of bytes that were processed
*/
size_t parse(const char *buffer, size_t size);
size_t parse(const Buffer &buffer);
/**
* Close the connection

View File

@ -1,4 +1,3 @@
#pragma once
/**
* ReceivedFrame.h
*
@ -12,6 +11,11 @@
* @documentation public
*/
/**
* Include guard
*/
#pragma once
/**
* Set up namespace
*/
@ -25,15 +29,15 @@ class ReceivedFrame
private:
/**
* The buffer we are reading from
* @var char*
* @var Buffer
*/
const char *_buffer = nullptr;
const Buffer &_buffer;
/**
* Number of bytes left to retrieve
* Number of bytes already processed
* @var uint32_t
*/
uint32_t _left = 0;
uint32_t _skip = 0;
/**
* Type of frame
@ -115,10 +119,9 @@ public:
/**
* Constructor
* @param buffer Binary buffer
* @param size Size of the buffer
* @param max Max buffer size
*/
ReceivedFrame(const char *buffer, uint32_t size, uint32_t max);
ReceivedFrame(const Buffer &buffer, uint32_t max);
/**
* Destructor

View File

@ -9,6 +9,7 @@
#include "protocolheaderframe.h"
#include "connectioncloseokframe.h"
#include "connectioncloseframe.h"
#include "reducedbuffer.h"
/**
* set namespace
@ -101,10 +102,9 @@ void ConnectionImpl::remove(ChannelImpl *channel)
* later call.
*
* @param buffer buffer to decode
* @param size size of the buffer to decode
* @return number of bytes that were processed
*/
size_t ConnectionImpl::parse(const char *buffer, size_t size)
size_t ConnectionImpl::parse(const Buffer &buffer)
{
// do not parse if already in an error state
if (_state == state_closed) return 0;
@ -117,13 +117,13 @@ size_t ConnectionImpl::parse(const char *buffer, size_t size)
// keep looping until we have processed all bytes, and the monitor still
// indicates that the connection is in a valid state
while (size > 0 && monitor.valid())
while (processed < buffer.size() && monitor.valid())
{
// prevent protocol exceptions
try
{
// try to recognize the frame
ReceivedFrame receivedFrame(buffer, size, _maxFrame);
ReceivedFrame receivedFrame(ReducedBuffer(buffer, processed), _maxFrame);
if (!receivedFrame.complete()) return processed;
// process the frame
@ -133,7 +133,7 @@ size_t ConnectionImpl::parse(const char *buffer, size_t size)
size_t bytes = receivedFrame.totalSize();
// add bytes
processed += bytes; size -= bytes; buffer += bytes;
processed += bytes;
}
catch (const ProtocolException &exception)
{

View File

@ -38,7 +38,7 @@ public:
FrameCheck(ReceivedFrame *frame, size_t size) : _frame(frame), _size(size)
{
// no problem is there are still enough bytes left
if (frame->_left >= size) return;
if (frame->_buffer.size() - frame->_skip >= size) return;
// frame buffer is too small
throw ProtocolException("frame out of range");
@ -49,9 +49,8 @@ public:
*/
virtual ~FrameCheck()
{
// update the buffer and the number of bytes left
_frame->_buffer += _size;
_frame->_left -= _size;
// update the number of bytes to skip
_frame->_skip += _size;
}
};

View File

@ -80,13 +80,12 @@ namespace AMQP {
/**
* Constructor
* @param buffer Binary buffer
* @param size Size of the buffer
* @param max Max size for a frame
*/
ReceivedFrame::ReceivedFrame(const char *buffer, uint32_t size, uint32_t max) : _buffer(buffer), _left(size)
ReceivedFrame::ReceivedFrame(const Buffer &buffer, uint32_t max) : _buffer(buffer)
{
// we need enough room for type, channel, the payload size and the end-of-frame byte
if (size < 8) return;
if (buffer.size() < 8) return;
// get the information
_type = nextUint8();
@ -97,10 +96,10 @@ ReceivedFrame::ReceivedFrame(const char *buffer, uint32_t size, uint32_t max) :
if (max > 0 && _payloadSize > max - 8) throw ProtocolException("frame size exceeded");
// check if the buffer is big enough to contain all data
if (size >= _payloadSize + 8)
if (buffer.size() >= _payloadSize + 8)
{
// buffer is big enough, check for a valid end-of-frame marker
if ((int)buffer[_payloadSize+7] != -50) throw ProtocolException("invalid end of frame marker");
if ((int)buffer.byte(_payloadSize+7) != -50) throw ProtocolException("invalid end of frame marker");
}
else
{
@ -130,7 +129,7 @@ uint8_t ReceivedFrame::nextUint8()
FrameCheck check(this, 1);
// get a byte
return (uint8_t)_buffer[0];
return _buffer.byte(_skip);
}
/**
@ -145,7 +144,7 @@ int8_t ReceivedFrame::nextInt8()
FrameCheck check(this, 1);
// get a byte
return (int8_t)_buffer[0];
return (int8_t)_buffer.byte(_skip);
}
/**
@ -156,11 +155,11 @@ int8_t ReceivedFrame::nextInt8()
uint16_t ReceivedFrame::nextUint16()
{
// check if there is enough size
FrameCheck check(this, 2);
FrameCheck check(this, sizeof(uint16_t));
// get two bytes, and convert to host-byte-order
uint16_t value;
memcpy(&value, _buffer, sizeof(uint16_t));
_buffer.copy(_skip, sizeof(uint16_t), &value);
return be16toh(value);
}
@ -172,11 +171,11 @@ uint16_t ReceivedFrame::nextUint16()
int16_t ReceivedFrame::nextInt16()
{
// check if there is enough size
FrameCheck check(this, 2);
FrameCheck check(this, sizeof(int16_t));
// get two bytes, and convert to host-byte-order
int16_t value;
memcpy(&value, _buffer, sizeof(int16_t));
_buffer.copy(_skip, sizeof(int16_t), &value);
return be16toh(value);
}
@ -188,11 +187,11 @@ int16_t ReceivedFrame::nextInt16()
uint32_t ReceivedFrame::nextUint32()
{
// check if there is enough size
FrameCheck check(this, 4);
FrameCheck check(this, sizeof(uint32_t));
// get four bytes, and convert to host-byte-order
uint32_t value;
memcpy(&value, _buffer, sizeof(uint32_t));
_buffer.copy(_skip, sizeof(uint32_t), &value);
return be32toh(value);
}
@ -204,11 +203,11 @@ uint32_t ReceivedFrame::nextUint32()
int32_t ReceivedFrame::nextInt32()
{
// check if there is enough size
FrameCheck check(this, 4);
FrameCheck check(this, sizeof(int32_t));
// get four bytes, and convert to host-byte-order
int32_t value;
memcpy(&value, _buffer, sizeof(int32_t));
_buffer.copy(_skip, sizeof(int32_t), &value);
return be32toh(value);
}
@ -220,11 +219,11 @@ int32_t ReceivedFrame::nextInt32()
uint64_t ReceivedFrame::nextUint64()
{
// check if there is enough size
FrameCheck check(this, 8);
FrameCheck check(this, sizeof(uint64_t));
// get eight bytes, and convert to host-byte-order
uint64_t value;
memcpy(&value, _buffer, sizeof(uint64_t));
_buffer.copy(_skip, sizeof(uint64_t), &value);
return be64toh(value);
}
@ -236,11 +235,11 @@ uint64_t ReceivedFrame::nextUint64()
int64_t ReceivedFrame::nextInt64()
{
// check if there is enough size
FrameCheck check(this, 8);
FrameCheck check(this, sizeof(int64_t));
// get eight bytes, and convert to host-byte-order
int64_t value;
memcpy(&value, _buffer, sizeof(int64_t));
_buffer.copy(_skip, sizeof(int64_t), &value);
return be64toh(value);
}
@ -252,12 +251,12 @@ int64_t ReceivedFrame::nextInt64()
float ReceivedFrame::nextFloat()
{
// check if there is enough size
FrameCheck check(this, 4);
FrameCheck check(this, sizeof(float));
// get four bytes
float result;
memcpy(&result, _buffer, sizeof(float));
return result;
float value;
_buffer.copy(_skip, sizeof(float), &value);
return value;
}
/**
@ -268,12 +267,12 @@ float ReceivedFrame::nextFloat()
double ReceivedFrame::nextDouble()
{
// check if there is enough size
FrameCheck check(this, 8);
FrameCheck check(this, sizeof(double));
// get eight bytes, and convert to host-byte-order
double result;
memcpy(&result, _buffer, sizeof(double));
return result;
double value;
_buffer.copy(_skip, sizeof(double), &value);
return value;
}
/**
@ -287,7 +286,7 @@ const char * ReceivedFrame::nextData(uint32_t size)
FrameCheck check(this, size);
// get the data
return _buffer;
return _buffer.data(_skip, size);
}
/**

103
src/reducedbuffer.h Normal file
View File

@ -0,0 +1,103 @@
/**
* ReducedBuffer.h
*
* Wrapper around a buffer with a number of bytes to skip
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2014 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Open namespace
*/
namespace AMQP {
/**
* Class definition
*/
class ReducedBuffer : public Buffer
{
private:
/**
* Pointer to the original buffer
* @var Buffer
*/
const Buffer &_buffer;
/**
* Number of bytes to skip
* @var size_t
*/
size_t _skip;
public:
/**
* Constructor
* @param buffer
* @param skip
*/
ReducedBuffer(const Buffer &buffer, size_t skip) : _buffer(buffer), _skip(skip) {}
/**
* Destructor
*/
virtual ~ReducedBuffer();
/**
* Total size of the buffer
* @return size_t
*/
virtual size_t size() const override
{
return _buffer.size() - _skip;
}
/**
* Get access to a single byte
* @param pos position in the buffer
* @return char value of the byte in the buffer
*/
virtual char byte(size_t pos) const override
{
return _buffer.byte(pos + _skip);
}
/**
* Get access to the raw data
* @param pos position in the buffer
* @param size number of continuous bytes
* @return char*
*/
virtual const char *data(size_t pos, size_t size) const override
{
return _buffer.data(pos + _skip, size);
}
/**
* Copy bytes to a buffer
*
* No safety checks are necessary: this method will only be called
* for bytes that actually exist
*
* @param pos position in the buffer
* @param size number of bytes to copy
* @param buffer buffer to copy into
* @return void* pointer to buffer
*/
virtual void *copy(size_t pos, size_t size, void *buffer) const override
{
return _buffer.copy(pos + _skip, size, buffer);
}
};
/**
* End of namespace
*/
}