it now is possible to publish messages to an exchange with this library

This commit is contained in:
Emiel Bruijntjes 2014-01-05 04:08:35 -08:00
parent 9c1e44f512
commit 5e96e2d832
26 changed files with 719 additions and 973 deletions

View File

@ -12,6 +12,6 @@ install:
mkdir -p ${INCLUDE_DIR}/libamqp
mkdir -p ${LIBRARY_DIR}
cp -f libamqp.h ${INCLUDE_DIR}
cp -f include/*.h ${INCLUDE_DIR}/amqp
cp -f src/libamqp.so ${LIBRARY_DIR}
cp -f src/libamqp.a ${LIBRARY_DIR}
cp -f include/*.h ${INCLUDE_DIR}/libamqp
cp -f src/liblibamqp.so ${LIBRARY_DIR}
cp -f src/liblibamqp.a ${LIBRARY_DIR}

View File

@ -41,6 +41,12 @@ public:
*/
Array(const Array &array);
/**
* Move constructor
* @param array
*/
Array(Array &&array) : _fields(std::move(array._fields)) {}
/**
* Constructor for an empty Array
*/

View File

@ -199,27 +199,27 @@ public:
*
* - nowait do not wait on response
*
* @param queue the target queue
* @param exchange the source exchange
* @param queue the target queue
* @param routingkey the routing key
* @param flags additional flags
* @param arguments additional bind arguments
* @return bool
*/
bool bindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); }
bool bindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); }
bool bindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); }
bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); }
bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); }
bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); }
/**
* Unbind a queue from an exchange
* @param queue the target queue
* @param exchange the source exchange
* @param queue the target queue
* @param routingkey the routing key
* @param arguments additional bind arguments
* @return bool
*/
bool unbindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); }
bool unbindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); }
bool unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); }
bool unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); }
/**
* Purge a queue

View File

@ -84,7 +84,7 @@ protected:
* Queued messages that should be sent after the connection has been established
* @var queue
*/
std::queue<std::string> _queue;
std::queue<OutBuffer> _queue;
private:
@ -172,6 +172,25 @@ public:
_maxFrame = size;
}
/**
* The max frame size
* @return uint32_t
*/
uint32_t maxFrame()
{
return _maxFrame;
}
/**
* The max payload size for body frames
* @return uint32_t
*/
uint32_t maxPayload()
{
// 8 bytes for header and end-of-frame byte
return _maxFrame - 8;
}
/**
* Add a channel to the connection, and return the channel ID that it
* is allowed to use, or 0 when no more ID's are available

View File

@ -15,7 +15,7 @@ namespace AMQP {
/**
* Class definition
*/
class Envelope
class Envelope : public MetaData
{
private:
/**
@ -23,92 +23,13 @@ private:
* library!)
* @var const char *
*/
const char *_data;
const char *_body;
/**
* Size of the data
* @var size_t
* @var uint64_t
*/
size_t _size;
/**
* The content type
* @var EnvelopeField
*/
EnvelopeField<std::string> _contentType;
/**
* The content encoding
* @var EnvelopeField
*/
EnvelopeField<std::string> _contentEncoding;
/**
* The priority
* @var EnvelopeField
*/
EnvelopeField<uint8_t> _priority;
/**
* The delivery mode (1=non-persistent, 2=persistent)
* @var EnvelopeField
*/
EnvelopeField<uint8_t> _deliveryMode;
/**
* The correlation ID
* @var EnvelopeField
*/
EnvelopeField<std::string> _correlationID;
/**
* Reply-to field
* @var EnvelopeField
*/
EnvelopeField<std::string> _replyTo;
/**
* Expiration value
* @var EnvelopeField
*/
EnvelopeField<std::string> _expiration;
/**
* The message id
* @var EnvelopeField
*/
EnvelopeField<std::string> _messageID;
/**
* Timestamp
* @var EnvelopeField
*/
EnvelopeField<uint64_t> _timestamp;
/**
* The type name
* @var EnvelopeField
*/
EnvelopeField<std::string> _typeName;
/**
* The user ID
* @var EnvelopeField
*/
EnvelopeField<std::string> _userID;
/**
* The application ID
* @var EnvelopeField
*/
EnvelopeField<std::string> _appID;
/**
* Additional custom headers
* @var EnvelopeField
*/
EnvelopeField<Table> _headers;
uint64_t _bodysize;
public:
/**
@ -117,16 +38,16 @@ public:
* The data buffer that you pass to this constructor must be valid during
* the lifetime of the Envelope object.
*
* @param data
* @param body
* @param size
*/
Envelope(const char *data, size_t size) : _data(data), _size(size) {}
Envelope(const char *body, uint64_t size) : MetaData(), _body(body), _bodysize(size) {}
/**
* Constructor based on a string
* @param message
* @param body
*/
Envelope(const std::string &data) : _data(data.data()), _size(data.size()) {}
Envelope(const std::string &body) : MetaData(), _body(body.data()), _bodysize(body.size()) {}
/**
* Destructor
@ -137,110 +58,18 @@ public:
* Access to the full message data
* @return buffer
*/
const char *body()
const char *body() const
{
return _data;
return _body;
}
/**
* Size of the body
* @return size_t
* @return uint64_t
*/
size_t size()
uint64_t bodySize() const
{
return _size;
}
/**
* Check if a certain field is set
* @return bool
*/
bool hasPriority () { return _priority.valid(); }
bool hasDeliveryMode () { return _deliveryMode.valid(); }
bool hasTimestamp () { return _timestamp.valid(); }
bool hasContentType () { return _contentType.valid(); }
bool hasContentEncoding () { return _contentEncoding.valid(); }
bool hasCorrelationID () { return _correlationID.valid(); }
bool hasReplyTo () { return _replyTo.valid(); }
bool hasExpiration () { return _expiration.valid(); }
bool hasMessageID () { return _messageID.valid(); }
bool hasTypeName () { return _typeName.valid(); }
bool hasUserID () { return _userID.valid(); }
bool hasAppID () { return _appID.valid(); }
bool hasHeaders () { return _headers.valid(); }
/**
* Set the various supported fields
* @param value
*/
void setPriority (uint8_t value) { _priority = value; }
void setDeliveryMode (uint8_t value) { _deliveryMode = value; }
void setTimestamp (uint64_t value) { _timestamp = value; }
void setContentType (const std::string &value) { _contentType = value; }
void setContentEncoding (const std::string &value) { _contentEncoding = value; }
void setCorrelationID (const std::string &value) { _correlationID = value; }
void setReplyTo (const std::string &value) { _replyTo = value; }
void setExpiration (const std::string &value) { _expiration = value; }
void setMessageID (const std::string &value) { _messageID = value; }
void setTypeName (const std::string &value) { _typeName = value; }
void setUserID (const std::string &value) { _userID = value; }
void setAppID (const std::string &value) { _appID = value; }
void setHeaders (const Table &value) { _headers = value; }
/**
* Reset the various supported fields
* @param value
*/
void setPriority (nullptr_t value = nullptr) { _priority = value; }
void setDeliveryMode (nullptr_t value = nullptr) { _deliveryMode = value; }
void setTimestamp (nullptr_t value = nullptr) { _timestamp = value; }
void setContentType (nullptr_t value = nullptr) { _contentType = value; }
void setContentEncoding (nullptr_t value = nullptr) { _contentEncoding = value; }
void setCorrelationID (nullptr_t value = nullptr) { _correlationID = value; }
void setReplyTo (nullptr_t value = nullptr) { _replyTo = value; }
void setExpiration (nullptr_t value = nullptr) { _expiration = value; }
void setMessageID (nullptr_t value = nullptr) { _messageID = value; }
void setTypeName (nullptr_t value = nullptr) { _typeName = value; }
void setUserID (nullptr_t value = nullptr) { _userID = value; }
void setAppID (nullptr_t value = nullptr) { _appID = value; }
void setHeaders (nullptr_t value = nullptr) { _headers = value; }
/**
* Retrieve the fields
* @return string
*/
uint8_t priority () { return _priority; }
uint8_t deliveryMode () { return _deliveryMode; }
uint64_t timestamp () { return _timestamp; }
std::string &contentType () { return _contentType; }
std::string &contentEncoding() { return _contentEncoding; }
std::string &correlationID () { return _correlationID; }
std::string &replyTo () { return _replyTo; }
std::string &expiration () { return _expiration; }
std::string &messageID () { return _messageID; }
std::string &typeName () { return _typeName; }
std::string &userID () { return _userID; }
std::string &appID () { return _appID; }
Table &headers () { return _headers; }
/**
* Is this a message with persistent storage
* This is an alias for retrieving the delivery mode and checking if it is set to 2
* @return bool
*/
bool persistent()
{
return hasDeliveryMode() && deliveryMode() == 2;
}
/**
* Set whether storage should be persistent or not
* @param bool
*/
void setPersistent(bool value = true)
{
if (value) setDeliveryMode(2);
else setDeliveryMode(nullptr);
return _bodysize;
}
};

View File

@ -1,108 +0,0 @@
/**
* EnvelopeField.h
*
* An envelope field is a field that also keeps track whether it is set
* or not. Used internally by the Envelope class.
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
template <typename T>
class EnvelopeField
{
private:
/**
* The actual value
* @var T
*/
T _value;
/**
* Is it set or not
* @var bool
*/
bool _isset;
public:
/**
* Empty constructor
*/
EnvelopeField() : _isset(false) {}
/**
* Constructor
* @param value
* @param isset
*/
EnvelopeField(const T &value, bool isset = true) : _value(value), _isset(isset) {}
/**
* Destructor
*/
virtual ~EnvelopeField() {}
/**
* Assign a new value
* @param value
* @return EnvelopeField
*/
EnvelopeField &operator=(const T &value)
{
_value = value;
_isset = true;
return *this;
}
/**
* Reset the value
* @param value
* @return EnvelopeField
*/
EnvelopeField &operator=(nullptr_t value)
{
_value = T();
_isset = false;
return *this;
}
/**
* Cast to the set value
* @return T
*/
operator T& ()
{
return _value;
}
/**
* Reset the value to not being set
*/
void reset()
{
_isset = false;
_value = T();
}
/**
* Is it set?
* @return bool
*/
bool valid()
{
return _isset;
}
};
/**
* End of namespace
*/
}

302
include/metadata.h Normal file
View File

@ -0,0 +1,302 @@
/**
* MetaData.h
*
* With every published message a set of meta data is passed to. This class
* holds all that meta data.
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class MetaData
{
protected:
/**
* First set of booleans
* @var BooleanSet
*/
BooleanSet _bools1;
/**
* Second set of booleans
* @var BooleanSet
*/
BooleanSet _bools2;
/**
* MIME content type
* @var ShortString
*/
ShortString _contentType;
/**
* MIME content encoding
* @var ShortString
*/
ShortString _contentEncoding;
/**
* message header field table
* @var Table
*/
Table _headers;
/**
* Delivery mode (non-persistent (1) or persistent (2))
* @var UOctet
*/
UOctet _deliveryMode = 0;
/**
* boolean whether field was sent to us
* @var UOctet
*/
UOctet _priority = 0;
/**
* application correlation identifier
* @var ShortString
*/
ShortString _correlationID;
/**
* address to reply to
* @var ShortString
*/
ShortString _replyTo;
/**
* message expiration identifier
* @var ShortString
*/
ShortString _expiration;
/**
* application message identifier
* @var ShortString
*/
ShortString _messageID;
/**
* message timestamp
* @var Timestamp
*/
Timestamp _timestamp;
/**
* message type name
* @var ShortString
*/
ShortString _typeName;
/**
* creating user id
* @var ShortString
*/
ShortString _userID;
/**
* creating application id
* @var ShortString
*/
ShortString _appID;
/**
* Deprecated cluster ID
* @var ShortString
*/
ShortString _clusterID;
/**
* Protected constructor to ensure that this class can only be constructed
* in a derived class
*/
MetaData() {}
public:
/**
* Read incoming frame
* @param frame
*/
MetaData(ReceivedFrame &frame) :
_bools1(frame),
_bools2(frame)
{
// only copy the properties that were sent
if (hasContentType()) _contentType = ShortString(frame);
if (hasContentEncoding()) _contentEncoding = ShortString(frame);
if (hasHeaders()) _headers = Table(frame);
if (hasDeliveryMode()) _deliveryMode = UOctet(frame);
if (hasPriority()) _priority = UOctet(frame);
if (hasCorrelationID()) _correlationID = ShortString(frame);
if (hasReplyTo()) _replyTo = ShortString(frame);
if (hasExpiration()) _expiration = ShortString(frame);
if (hasMessageID()) _messageID = ShortString(frame);
if (hasTimestamp()) _timestamp = Timestamp(frame);
if (hasTypeName()) _typeName = ShortString(frame);
if (hasUserID()) _userID = ShortString(frame);
if (hasAppID()) _appID = ShortString(frame);
if (hasClusterID()) _clusterID = ShortString(frame);
}
/**
* Destructor
*/
virtual ~MetaData() {}
/**
* Check if a certain field is set
* @return bool
*/
bool hasExpiration () const { return _bools1.get(0); }
bool hasReplyTo () const { return _bools1.get(1); }
bool hasCorrelationID () const { return _bools1.get(2); }
bool hasPriority () const { return _bools1.get(3); }
bool hasDeliveryMode () const { return _bools1.get(4); }
bool hasHeaders () const { return _bools1.get(5); }
bool hasContentEncoding () const { return _bools1.get(6); }
bool hasContentType () const { return _bools1.get(7); }
bool hasClusterID () const { return _bools2.get(2); }
bool hasAppID () const { return _bools2.get(3); }
bool hasUserID () const { return _bools2.get(4); }
bool hasTypeName () const { return _bools2.get(5); }
bool hasTimestamp () const { return _bools2.get(6); }
bool hasMessageID () const { return _bools2.get(7); }
/**
* Set the various supported fields
* @param value
*/
void setExpiration (const std::string &value) { _expiration = value; _bools1.set(0,true); }
void setReplyTo (const std::string &value) { _replyTo = value; _bools1.set(1,true); }
void setCorrelationID (const std::string &value) { _correlationID = value; _bools1.set(2,true); }
void setPriority (uint8_t value) { _priority = value; _bools1.set(3,true); }
void setDeliveryMode (uint8_t value) { _deliveryMode = value; _bools1.set(4,true); }
void setHeaders (const Table &value) { _headers = value; _bools1.set(5,true); }
void setContentEncoding (const std::string &value) { _contentEncoding = value; _bools1.set(6,true); }
void setContentType (const std::string &value) { _contentType = value; _bools1.set(7,true); }
void setClusterID (const std::string &value) { _clusterID = value; _bools2.set(2,true); }
void setAppID (const std::string &value) { _appID = value; _bools2.set(3,true); }
void setUserID (const std::string &value) { _userID = value; _bools2.set(4,true); }
void setTypeName (const std::string &value) { _typeName = value; _bools2.set(5,true); }
void setTimestamp (uint64_t value) { _timestamp = value; _bools2.set(6,true); }
void setMessageID (const std::string &value) { _messageID = value; _bools2.set(7,true); }
/**
* Retrieve the fields
* @return string
*/
const std::string &expiration () const { return _expiration; }
const std::string &replyTo () const { return _replyTo; }
const std::string &correlationID () const { return _correlationID; }
const uint8_t priority () const { return _priority; }
const uint8_t deliveryMode () const { return _deliveryMode; }
const Table &headers () const { return _headers; }
const std::string &contentEncoding() const { return _contentEncoding; }
const std::string &contentType () const { return _contentType; }
const std::string &clusterID () const { return _clusterID; }
const std::string &appID () const { return _appID; }
const std::string &userID () const { return _userID; }
const std::string &typeName () const { return _typeName; }
const uint64_t timestamp () const { return _timestamp; }
const std::string &messageID () const { return _messageID; }
/**
* Is this a message with persistent storage
* This is an alias for retrieving the delivery mode and checking if it is set to 2
* @return bool
*/
bool persistent()
{
return hasDeliveryMode() && deliveryMode() == 2;
}
/**
* Set whether storage should be persistent or not
* @param bool
*/
void setPersistent(bool value = true)
{
if (value)
{
// simply set the delivery mode
setDeliveryMode(2);
}
else
{
// we remove the field from the header
_deliveryMode = 0;
_bools1.set(4,false);
}
}
/**
* Total size
* @return uint32_t
*/
uint32_t size() const
{
// the result (2 for the two boolean sets)
uint32_t result = 2;
if (hasExpiration()) result += _expiration.size();
if (hasReplyTo()) result += _replyTo.size();
if (hasCorrelationID()) result += _correlationID.size();
if (hasPriority()) result += _priority.size();
if (hasDeliveryMode()) result += _deliveryMode.size();
if (hasHeaders()) result += _headers.size();
if (hasContentEncoding()) result += _contentEncoding.size();
if (hasContentType()) result += _contentType.size();
if (hasClusterID()) result += _clusterID.size();
if (hasAppID()) result += _appID.size();
if (hasUserID()) result += _userID.size();
if (hasTypeName()) result += _typeName.size();
if (hasTimestamp()) result += _timestamp.size();
if (hasMessageID()) result += _messageID.size();
// done
return result;
}
/**
* Fill an output buffer
* @param buffer
*/
void fill(OutBuffer &buffer) const
{
// the two boolean sets are always present
_bools1.fill(buffer);
_bools2.fill(buffer);
// only copy the properties that were sent
if (hasContentType()) _contentType.fill(buffer);
if (hasContentEncoding()) _contentEncoding.fill(buffer);
if (hasHeaders()) _headers.fill(buffer);
if (hasDeliveryMode()) _deliveryMode.fill(buffer);
if (hasPriority()) _priority.fill(buffer);
if (hasCorrelationID()) _correlationID.fill(buffer);
if (hasReplyTo()) _replyTo.fill(buffer);
if (hasExpiration()) _expiration.fill(buffer);
if (hasMessageID()) _messageID.fill(buffer);
if (hasTimestamp()) _timestamp.fill(buffer);
if (hasTypeName()) _typeName.fill(buffer);
if (hasUserID()) _userID.fill(buffer);
if (hasAppID()) _appID.fill(buffer);
if (hasClusterID()) _clusterID.fill(buffer);
}
};
/**
* End of namespace
*/
}

View File

@ -92,6 +92,7 @@ public:
NumericField& operator=(T value)
{
_value = value;
return *this;
};
/**

View File

@ -36,6 +36,12 @@ private:
*/
size_t _size;
/**
* The total capacity of the out buffer
* @var size_t
*/
size_t _capacity;
public:
/**
@ -44,16 +50,53 @@ public:
*/
OutBuffer(uint32_t capacity)
{
// initialize members
_size = 0;
_capacity = capacity;
_buffer = _current = new char[capacity];
}
/**
* Copy constructor
* @param that
*/
OutBuffer(const OutBuffer &that)
{
// initialize members
_size = that._size;
_capacity = that._capacity;
_buffer = new char[_capacity];
_current = _buffer + _size;
// copy memory
memcpy(_buffer, that._buffer, _size);
}
/**
* Move constructor
* @param that
*/
OutBuffer(OutBuffer &&that)
{
// copy all members
_size = that._size;
_capacity = that._capacity;
_buffer = that._buffer;
_current = that._current;
// reset the other object
that._size = 0;
that._capacity = 0;
that._buffer = nullptr;
that._current = nullptr;
}
/**
* Destructor
*/
virtual ~OutBuffer()
{
delete[] _buffer;
if (_buffer) delete[] _buffer;
}
/**

View File

@ -46,11 +46,31 @@ public:
*/
Table(const Table &table);
/**
* Move constructor
* @param table
*/
Table(Table &&table) : _fields(std::move(table._fields)) {}
/**
* Destructor
*/
virtual ~Table() {}
/**
* Assignment operator
* @param table
* @return Table
*/
Table &operator=(const Table &table);
/**
* Move assignment operator
* @param table
* @return Table
*/
Table &operator=(Table &&table);
/**
* Create a new instance on the heap of this object, identical to the object passed
* @return Field*

View File

@ -42,7 +42,7 @@
#include <libamqp/array.h>
// envelope for publishing and consuming
#include <libamqp/envelopefield.h>
#include <libamqp/metadata.h>
#include <libamqp/envelope.h>
// mid level includes

View File

@ -3,7 +3,7 @@ RM = rm -f
CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g
LD = g++
LD_FLAGS = -Wall -shared -O2
RESULT = libamqp.so
RESULT = liblibamqp.so
STATIC = $(RESULT:%.so=%.a)
SOURCES = $(wildcard *.cpp)

View File

@ -28,102 +28,10 @@ private:
uint64_t _bodySize;
/**
* First set of booleans
* @var BooleanSet
* The meta data
* @var MetaData
*/
BooleanSet _bools1;
/**
* Second set of booleans
* @var BooleanSet
*/
BooleanSet _bools2;
/**
* MIME content type
* @var ShortString
*/
ShortString _contentType;
/**
* MIME content encoding
* @var ShortString
*/
ShortString _contentEncoding;
/**
* message header field table
* @var Table
*/
Table _headers;
/**
* Delivery mode (non-persistent (1) or persistent (2))
* @var uint8_t
*/
uint8_t _deliveryMode;
/**
* boolean whether field was sent to us
* @var uint8_t
*/
uint8_t _priority;
/**
* application correlation identifier
* @var ShortString
*/
ShortString _correlationID;
/**
* address to reply to
* @var ShortString
*/
ShortString _replyTo;
/**
* message expiration identifier
* @var ShortString
*/
ShortString _expiration;
/**
* application message identifier
* @var ShortString
*/
ShortString _messageID;
/**
* message timestamp
* @var Timestamp
*/
Timestamp _timestamp;
/**
* message type name
* @var ShortString
*/
ShortString _typeName;
/**
* creating user id
* @var ShortString
*/
ShortString _userID;
/**
* creating application id
* @var ShortString
*/
ShortString _appID;
/**
* Deprecated cluster ID
* @var ShortString
*/
ShortString _clusterID;
MetaData _metadata;
protected:
/**
@ -131,7 +39,7 @@ protected:
*
* @param buffer buffer to write frame to
*/
virtual void fill(OutBuffer& buffer) const override
virtual void fill(OutBuffer &buffer) const override
{
// call base
HeaderFrame::fill(buffer);
@ -139,22 +47,9 @@ protected:
// fill own fields.
buffer.add(_weight);
buffer.add(_bodySize);
_bools1.fill(buffer);
_bools2.fill(buffer);
if (contentTypeSent() ) { _contentType.fill(buffer); }
if (contentEncodingSent() ) { _contentEncoding.fill(buffer); }
if (headersSent() ) { _headers.fill(buffer); }
if (deliveryModeSent() ) { buffer.add(_deliveryMode); }
if (prioritySent() ) { buffer.add(_priority); }
if (correlationIDSent() ) { _correlationID.fill(buffer); }
if (replyToSent() ) { _replyTo.fill(buffer); }
if (expirationSent() ) { _expiration.fill(buffer); }
if (messageIDSent() ) { _messageID.fill(buffer); }
if (timestampSent() ) { _timestamp.fill(buffer); }
if (typeNameSent() ) { _typeName.fill(buffer); }
if (userIDSent() ) { _userID.fill(buffer); }
if (appIDSent() ) { _appID.fill(buffer); }
// the meta data
_metadata.fill(buffer);
}
public:
@ -164,13 +59,13 @@ public:
* All options are set using setter functions.
*
* @param channel channel we're working on
* @param envelope the envelope
*/
BasicHeaderFrame(uint16_t channel, uint64_t bodySize) :
HeaderFrame(channel, 12), // there are at least 12 bytes sent, weight (2), bodySize (8), property flags (2)
BasicHeaderFrame(uint16_t channel, const Envelope &envelope) :
HeaderFrame(channel, 10 + envelope.size()), // there are at least 10 bytes sent, weight (2), bodySize (8), plus the size of the meta data
_weight(0),
_bodySize(bodySize),
_deliveryMode(0),
_priority(0)
_bodySize(envelope.bodySize()),
_metadata(envelope)
{}
/**
@ -181,27 +76,9 @@ public:
HeaderFrame(frame),
_weight(frame.nextUint16()),
_bodySize(frame.nextUint64()),
_bools1(frame),
_bools2(frame),
_deliveryMode(0),
_priority(0)
{
if (contentTypeSent()) _contentType = ShortString(frame);
if (contentEncodingSent()) _contentEncoding = ShortString(frame);
if (headersSent()) _headers = Table(frame);
if (deliveryModeSent()) _deliveryMode = frame.nextUint8();
if (prioritySent()) _priority = frame.nextUint8();
if (correlationIDSent()) _correlationID = ShortString(frame);
if (replyToSent()) _replyTo = ShortString(frame);
if (expirationSent()) _expiration = ShortString(frame);
if (messageIDSent()) _messageID = ShortString(frame);
if (timestampSent()) _timestamp = Timestamp(frame);
if (typeNameSent()) _typeName = ShortString(frame);
if (userIDSent()) _userID = ShortString(frame);
if (appIDSent()) _appID = ShortString(frame);
if (clusterIDSent()) _clusterID = ShortString(frame);
}
_metadata(frame)
{}
/**
* Destructor
*/
@ -224,489 +101,6 @@ public:
{
return 60;
}
/**
* Set the body size
* @param uint64_t sum of all body-sizes sent after this headerframe
*/
void setBodySize(uint64_t size)
{
_bodySize = size;
}
/**
* return the MIME content type
* @return string
*/
const std::string& contentType() const
{
return _contentType;
}
/**
* Set the content type
* @param string
*/
void setContentType(std::string& string)
{
// was there already a content type
if (contentTypeSent()) modifySize(-_contentType.size());
// set the new content type
setContentTypeSent(string.size() > 0);
_contentType = ShortString(string);
// modify the size to include the new content type
if (contentTypeSent()) modifySize(_contentType.size());
}
/**
* Set the bool for content type sent
* @param bool
*/
void setContentTypeSent(bool b)
{
_bools1.set(7, b);
}
/**
* return the MIME content encoding
* @return string
*/
const std::string& contentEncoding() const
{
return _contentEncoding;
}
/**
* Set content encoding
* @param string
*/
void setContentEncoding(std::string& string)
{
// was there already a content encoding?
if(contentEncodingSent()) modifySize(-_contentEncoding.size());
// set new content encoding
setContentEncodingSent(string.size() > 0);
_contentEncoding = ShortString(string);
// modify size to include the new content type
modifySize(_contentEncoding.size());
}
/**
* set contentencoding sent
* @param bool
*/
void setContentEncodingSent(bool b)
{
_bools1.set(6, b);
}
/**
* return the message header field table
* @return Table
*/
const Table& headers() const
{
return _headers;
}
/**
* Set headers
* @param Table
*/
void setHeaders(Table& t)
{
// were the headers already set
if(headersSent()) modifySize(-_headers.size());
// set new headers
setHeadersSent(true);
_headers = t;
// modify size to include the new headers
modifySize(_headers.size());
}
/**
* Set headers sent
* @param bool
*/
void setHeadersSent(bool b)
{
_bools1.set(5, b);
}
/**
* return whether non-persistent (1) or persistent (2)
* @return uint8_t
*/
uint8_t deliveryMode() const
{
return _deliveryMode;
}
/**
* Set deliverymode
* @param uint8_t
*/
void setDeliveryMode(uint8_t val)
{
// was the delivery mode already set
if(deliveryModeSent()) modifySize(-1);
// set delivery mode
setDeliverModeSent(true);
_deliveryMode = Octet(val);
// add new size
modifySize(1);
}
/**
* set delivermode sent
* @param bool
*/
void setDeliverModeSent(bool b)
{
_bools1.set(4, b);
}
/**
* return the message priority (0-9)
* @return uint8_t
*/
uint8_t priority() const
{
return _priority;
}
/**
* Set priority
* @param uint8_t
*/
void setPriority(uint8_t val)
{
// was the priority already sent
if(prioritySent()) modifySize(-1);
// set priority
setPrioritySent(true);
_priority = Octet(val);
// add new size
modifySize(1);
}
/**
* Set priority sent
* @param bool
*/
void setPrioritySent(bool b)
{
_bools1.set(3, b);
}
/**
* return the application correlation identifier
* @return string
*/
const std::string& correlationID() const
{
return _correlationID;
}
/**
* set correlation ID
* @param string
*/
void setCorrelationID(std::string &s)
{
// was the correlation ID sent
if(correlationIDSent()) modifySize(-_correlationID.size());
// set new correlation ID
setCorrelationIDSent(true);
_correlationID = ShortString(s);
// add new size
modifySize(_correlationID.size());
}
/**
* Set correlationIDSent
* @param bool
*/
void setCorrelationIDSent(bool b)
{
_bools1.set(2, b);
}
/**
* return the address to reply to
* @return string
*/
const std::string& replyTo() const
{
return _replyTo;
}
/**
* Set reply to
* @param string
*/
void setReplyTo(std::string &s)
{
// was replyTo set?
if(replyToSent()) modifySize(-_replyTo.size());
// add new replyTo
setReplyToSent(true);
_replyTo = ShortString(s);
modifySize(_replyTo.size());
}
/**
* set reply to sent
* @param bool
*/
void setReplyToSent(bool b)
{
_bools1.set(1, b);
}
/**
* return the message expiration identifier
* @return string
*/
const std::string& expiration() const
{
return _expiration;
}
/**
* Set expiration
* @param string
*/
void setExpiration(std::string &s)
{
// was expiration set?
if(expirationSent()) modifySize(-_expiration.size());
// set expiration
setExpirationSent(true);
_expiration = ShortString(s);
// add new size
modifySize(_expiration.size());
}
/**
* set expiration sent
* @param bool
*/
void setExpirationSent(bool b)
{
_bools1.set(0, b);
}
/**
* return the application message identifier
* @return string
*/
const std::string& messageID() const
{
return _messageID;
}
/**
* set message ID
* @param string
*/
void setMessageID(std::string &s)
{
// was message ID sent?
if(messageIDSent()) modifySize(-_messageID.size());
// set messageID
setMessageIDSent(true);
_messageID = ShortString(s);
// add size
modifySize(_messageID.size());
}
/**
* set messageID sent
* @param bool
*/
void setMessageIDSent(bool b)
{
_bools2.set(7, b);
}
/**
* return the message timestamp
* @return uint64_t
*/
Timestamp timestamp() const
{
return _timestamp;
}
/**
* set timestamp
* @param uint64_t
*/
void setTimestamp(uint64_t val)
{
// was timestamp sent?
if(timestampSent()) modifySize(-_timestamp.size());
// set timestamp
setTimestampSent(true);
_timestamp = Timestamp(val);
// add new size
modifySize(_timestamp.size());
}
/**
* set timestamp sent
* @param bool
*/
void setTimestampSent(bool b)
{
_bools2.set(6, b);
}
/**
* return the message type name
* @return string
*/
const std::string& typeName() const
{
return _typeName;
}
/**
* set typename
* @param string
*/
void setTypeName(std::string &s)
{
// was typename sent?
if(typeNameSent()) modifySize(-_typeName.size());
// add typename
setTypeNameSent(true);
_typeName = ShortString(s);
// add new size
modifySize(_typeName.size());
}
/**
* set typename sent
* @param bool
*/
void setTypeNameSent(bool b)
{
_bools2.set(5, b);
}
/**
* return the creating user id
* @return string
*/
const std::string& userID() const
{
return _userID;
}
/**
* set User ID
* @param string
*/
void setUserID(std::string &s)
{
// was user id sent?
if(userIDSent()) modifySize(-_userID.size());
// set new userID
setUserIDSent(true);
_userID = ShortString(s);
// add size
modifySize(_userID.size());
}
/**
* set user id sent
* @param bool
*/
void setUserIDSent(bool b)
{
_bools2.set(4, b);
}
/**
* return the application id
* @return string
*/
const std::string& appID() const
{
return _appID;
}
/**
* set appID
* @param string
*/
void setAppID(std::string &s)
{
// was app id sent?
if(appIDSent()) modifySize(-_appID.size());
// add new app id
setAppIDSent(true);
_appID = ShortString(s);
// add size
modifySize(_appID.size());
}
/**
* set app id sent
* @param bool
*/
void setAppIDSent(bool b)
{
_bools2.set(3, b);
}
/**
* Return whether a field was sent
* @return bool
*/
bool expirationSent() const { return _bools1.get(0); }
bool replyToSent() const { return _bools1.get(1); }
bool correlationIDSent() const { return _bools1.get(2); }
bool prioritySent() const { return _bools1.get(3); }
bool deliveryModeSent() const { return _bools1.get(4); }
bool headersSent() const { return _bools1.get(5); }
bool contentEncodingSent() const { return _bools1.get(6); }
bool contentTypeSent() const { return _bools1.get(7); }
bool clusterIDSent() const { return _bools2.get(2); }
bool appIDSent() const { return _bools2.get(3); }
bool userIDSent() const { return _bools2.get(4); }
bool typeNameSent() const { return _bools2.get(5); }
bool timestampSent() const { return _bools2.get(6); }
bool messageIDSent() const { return _bools2.get(7); }
};
/**

View File

@ -18,9 +18,15 @@ private:
/**
* Payload of the frame
* Payload can be any number of octets
* @var vector<uint8_t>
* @var const char *
*/
std::string _payload;
const char *_payload;
/**
* Size of the payload
* @var uint64_t
*/
uint32_t _size;
protected:
/**
@ -34,7 +40,7 @@ protected:
ExtFrame::fill(buffer);
// add payload to buffer
buffer.add(_payload);
buffer.add(_payload, _size);
}
public:
@ -43,10 +49,12 @@ public:
*
* @param channel channel identifier
* @param payload payload of the body
* @param size size of the payload
*/
BodyFrame(uint16_t channel, const std::string &payload) :
ExtFrame(channel, payload.size()),
_payload(payload)
BodyFrame(uint16_t channel, const char *payload, uint32_t size) :
ExtFrame(channel, size),
_payload(payload),
_size(size)
{}
/**
@ -57,7 +65,8 @@ public:
*/
BodyFrame(ReceivedFrame& frame) :
ExtFrame(frame),
_payload(frame.nextData(frame.payloadSize()), frame.payloadSize())
_payload(frame.nextData(frame.payloadSize())),
_size(frame.payloadSize())
{}
/**
@ -76,9 +85,9 @@ public:
/**
* Return the payload of the body
* @return vector<UOctet>
* @return const char *
*/
const std::string& payload() const
const char *payload() const
{
return _payload;
}

View File

@ -21,6 +21,9 @@
#include "queueunbindframe.h"
#include "queuepurgeframe.h"
#include "queuedeleteframe.h"
#include "basicpublishframe.h"
#include "basicheaderframe.h"
#include "bodyframe.h"
#include <iostream>
@ -155,7 +158,6 @@ bool ChannelImpl::commitTransaction()
*/
bool ChannelImpl::rollbackTransaction()
{
std::cout << "send rollback frame" << std::endl;
// must be connected
if (!connected()) return false;
@ -365,7 +367,59 @@ bool ChannelImpl::removeQueue(const std::string &name, int flags)
// done
return true;
}
/**
* Publish a message to an exchange
*
* The following flags can be used
*
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
*
* @todo implement to onReturned() method
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param flags optional flags (see above)
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
*/
bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope)
{
// @todo prevent crash when connection is destructed
// send the publish frame
send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate));
// send header
send(BasicHeaderFrame(_id, envelope));
// the max payload size is the max frame size minus the bytes for headers and trailer
uint32_t maxpayload = _connection->_implementation.maxPayload();
uint32_t bytessent = 0;
// the buffer
const char *data = envelope.body();
uint32_t bytesleft = envelope.bodySize();
// split up the body in multiple frames depending on the max frame size
while (bytesleft > 0)
{
// size of this chunk
uint32_t chunksize = std::min(maxpayload, bytesleft);
// send out a body frame
send(BodyFrame(_id, data + bytessent, chunksize));
// update counters
bytessent += chunksize;
bytesleft -= chunksize;
}
// done
return true;
}
/**
* Send a frame over the channel

View File

@ -196,13 +196,13 @@ void ConnectionImpl::setConnected()
while (!_queue.empty())
{
// get the next message
std::string message(_queue.front());
OutBuffer buffer(std::move(_queue.front()));
// remove it from the queue
_queue.pop();
// send it
_handler->onData(_parent, message.data(), message.size());
_handler->onData(_parent, buffer.data(), buffer.size());
// leap out if the connection was destructed
if (!monitor.valid()) return;
@ -223,13 +223,13 @@ size_t ConnectionImpl::send(const Frame &frame)
frame.fill(buffer);
// append an end of frame byte (but not when still negotiating the protocol)
if (_state != state_protocol) buffer.add((uint8_t)206);
if (frame.needsSeparator()) buffer.add((uint8_t)206);
// are we still setting up the connection?
if ((_state == state_protocol || _state == state_handshake) && !frame.partOfHandshake())
{
// the connection is still being set up, so we need to delay the message sending
_queue.push(std::string(buffer.data(), buffer.size()));
_queue.push(std::move(buffer));
}
else
{

33
src/extframe.cpp Normal file
View File

@ -0,0 +1,33 @@
/**
* ExtFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "exception.h"
#include "protocolexception.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ExtFrame::process(ConnectionImpl *connection)
{
// this is an exception
throw ProtocolException("unimplemented frame type " + std::to_string(type()));
// unreachable
return false;
}
/**
* End of namespace
*/
}

View File

@ -131,6 +131,14 @@ public:
* Get the message type
*/
virtual uint8_t type() const = 0;
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection);
};
/**

33
src/frame.cpp Normal file
View File

@ -0,0 +1,33 @@
/**
* Frame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "exception.h"
#include "protocolexception.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool Frame::process(ConnectionImpl *connection)
{
// this is an exception
throw ProtocolException("unimplemented frame");
// unreachable
return false;
}
/**
* End of namespace
*/
}

View File

@ -46,21 +46,20 @@ public:
* Is this a frame that is part of the connection setup?
* @return bool
*/
virtual bool partOfHandshake() const
{
return false;
}
virtual bool partOfHandshake() const { return false; }
/**
* Does this frame need an end-of-frame seperator?
* @return bool
*/
virtual bool needsSeparator() const { return true; }
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection)
{
// no process was implemented
return false;
}
virtual bool process(ConnectionImpl *connection);
};
/**

View File

@ -35,6 +35,7 @@ protected:
*/
virtual void fill(OutBuffer& buffer) const
{
// call base
ExtFrame::fill(buffer);
// add type

33
src/methodframe.cpp Normal file
View File

@ -0,0 +1,33 @@
/**
* MethodFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "exception.h"
#include "protocolexception.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool MethodFrame::process(ConnectionImpl *connection)
{
// this is an exception
throw ProtocolException("unimplemented frame type " + std::to_string(type()) + " class " + std::to_string(classID()) + " method " + std::to_string(methodID()));
// unreachable
return false;
}
/**
* End of namespace
*/
}

View File

@ -69,6 +69,13 @@ public:
* @return uint16_t
*/
virtual uint16_t methodID() const = 0;
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection);
};
/**

View File

@ -133,6 +133,14 @@ public:
return true;
}
/**
* Does this frame need an end-of-frame seperator?
* @return bool
*/
virtual bool needsSeparator() const override
{
return false;
}
};
/**

View File

@ -96,10 +96,16 @@ ReceivedFrame::ReceivedFrame(const char *buffer, uint32_t size, uint32_t max) :
if (max > 0 && _payloadSize > max - 8) throw ProtocolException("frame size exceeded");
// check if the buffer is big enough to contain all data
if (size >= _payloadSize + 8) return;
// frame is not yet valid
_type = _channel = _payloadSize = 0;
if (size >= _payloadSize + 8)
{
// buffer is big enough, check for a valid end-of-frame marker
if ((int)buffer[_payloadSize+7] != -50) throw ProtocolException("invalid end of frame marker");
}
else
{
// frame is not yet valid
_type = _channel = _payloadSize = 0;
}
}
/**
@ -290,8 +296,6 @@ const char * ReceivedFrame::nextData(uint32_t size)
*/
bool ReceivedFrame::process(ConnectionImpl *connection)
{
std::cout << "_type = " << (int)_type << std::endl;
// check the type
switch (_type)
{
@ -300,8 +304,10 @@ bool ReceivedFrame::process(ConnectionImpl *connection)
case 3: return BodyFrame(*this).process(connection);
case 4: return HeartbeatFrame(*this).process(connection);
case 8: return HeartbeatFrame(*this).process(connection);
default: return false;
}
// this is a problem
throw ProtocolException("unrecognized frame type " + std::to_string(_type));
}
/**
@ -314,8 +320,6 @@ bool ReceivedFrame::processMethodFrame(ConnectionImpl *connection)
// read the class id from the method
uint16_t classID = nextUint16();
std::cout << "classID = " << (int)classID << std::endl;
// construct frame based on method id
switch (classID)
{
@ -325,8 +329,10 @@ bool ReceivedFrame::processMethodFrame(ConnectionImpl *connection)
case 50: return processQueueFrame(connection);
case 60: return processBasicFrame(connection);
case 90: return processTransactionFrame(connection);
default: return false;
}
// this is a problem
throw ProtocolException("unrecognized method frame class " + std::to_string(classID));
}
/**
@ -339,8 +345,6 @@ bool ReceivedFrame::processConnectionFrame(ConnectionImpl *connection)
// read the method id from the method
uint16_t methodID = nextUint16();
std::cout << "methodID = " << (int)methodID << std::endl;
// construct frame based on method id
switch (methodID)
{
@ -354,8 +358,10 @@ bool ReceivedFrame::processConnectionFrame(ConnectionImpl *connection)
case 41: return ConnectionOpenOKFrame(*this).process(connection);
case 50: return ConnectionCloseFrame(*this).process(connection);
case 51: return ConnectionCloseOKFrame(*this).process(connection);
default: return false;
}
// this is a problem
throw ProtocolException("unrecognized connection frame method " + std::to_string(methodID));
}
/**
@ -368,8 +374,6 @@ bool ReceivedFrame::processChannelFrame(ConnectionImpl *connection)
// read the method id from the method
uint16_t methodID = nextUint16();
std::cout << "methodID = " << methodID << std::endl;
// construct frame based on method id
switch (methodID)
{
@ -379,8 +383,10 @@ bool ReceivedFrame::processChannelFrame(ConnectionImpl *connection)
case 21: return ChannelFlowOKFrame(*this).process(connection);
case 40: return ChannelCloseFrame(*this).process(connection);
case 41: return ChannelCloseOKFrame(*this).process(connection);
default: return false;
}
// this is a problem
throw ProtocolException("unrecognized channel frame method " + std::to_string(methodID));
}
/**
@ -393,8 +399,6 @@ bool ReceivedFrame::processExchangeFrame(ConnectionImpl *connection)
// read the method id from the method
uint16_t methodID = nextUint16();
std::cout << "methodID = " << (int)methodID << std::endl;
// construct frame based on method id
switch(methodID)
{
@ -409,8 +413,10 @@ bool ReceivedFrame::processExchangeFrame(ConnectionImpl *connection)
// has method ID 51, instead of (the expected) 41. This is tested
// and it really has ID 51.
case 51: return ExchangeUnbindOKFrame(*this).process(connection);
default: return false;
}
// this is a problem
throw ProtocolException("unrecognized exchange frame method " + std::to_string(methodID));
}
/**
@ -436,8 +442,10 @@ bool ReceivedFrame::processQueueFrame(ConnectionImpl *connection)
case 41: return QueueDeleteOKFrame(*this).process(connection);
case 50: return QueueUnbindFrame(*this).process(connection);
case 51: return QueueUnbindOKFrame(*this).process(connection);
default: return false;
}
// this is a problem
throw ProtocolException("unrecognized queue frame method " + std::to_string(methodID));
}
/**
@ -470,8 +478,10 @@ bool ReceivedFrame::processBasicFrame(ConnectionImpl *connection)
case 100: return BasicRecoverAsyncFrame(*this).process(connection);
case 110: return BasicRecoverFrame(*this).process(connection);
case 111: return BasicRecoverOKFrame(*this).process(connection);
default: return false;
}
// this is a problem
throw ProtocolException("unrecognized basic frame method " + std::to_string(methodID));
}
/**
@ -493,8 +503,10 @@ bool ReceivedFrame::processTransactionFrame(ConnectionImpl *connection)
case 21: return TransactionCommitOKFrame(*this).process(connection);
case 30: return TransactionRollbackFrame(*this).process(connection);
case 31: return TransactionRollbackOKFrame(*this).process(connection);
default: return false;
}
// this is a problem
throw ProtocolException("unrecognized transaction frame method " + std::to_string(methodID));
}
/**
@ -511,8 +523,10 @@ bool ReceivedFrame::processHeaderFrame(ConnectionImpl *connection)
switch (classID)
{
case 60: return BasicHeaderFrame(*this).process(connection);
default: return false;
}
// this is a problem
throw ProtocolException("unrecognized header frame class " + std::to_string(classID));
}
/**

View File

@ -48,6 +48,47 @@ Table::Table(const Table &table)
}
}
/**
* Assignment operator
* @param table
* @return Table
*/
Table &Table::operator=(const Table &table)
{
// skip self assignment
if (this == &table) return *this;
// empty current fields
_fields.clear();
// loop through the table records
for (auto iter = table._fields.begin(); iter != table._fields.end(); iter++)
{
// add the field
_fields[iter->first] = std::shared_ptr<Field>(iter->second->clone());
}
// done
return *this;
}
/**
* Move assignment operator
* @param table
* @return Table
*/
Table &Table::operator=(Table &&table)
{
// skip self assignment
if (this == &table) return *this;
// copy fields
_fields = std::move(table._fields);
// done
return *this;
}
/**
* Get a field
*