From 5e96e2d832cf683df48b86e89dff519d96912bba Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Sun, 5 Jan 2014 04:08:35 -0800 Subject: [PATCH] it now is possible to publish messages to an exchange with this library --- Makefile | 6 +- include/array.h | 6 + include/channel.h | 14 +- include/connectionimpl.h | 21 +- include/envelope.h | 197 +----------- include/envelopefield.h | 108 ------- include/metadata.h | 302 ++++++++++++++++++ include/numericfield.h | 1 + include/outbuffer.h | 45 ++- include/table.h | 20 ++ libamqp.h | 2 +- src/Makefile | 2 +- src/basicheaderframe.h | 634 +------------------------------------- src/bodyframe.h | 27 +- src/channelimpl.cpp | 56 +++- src/connectionimpl.cpp | 10 +- src/extframe.cpp | 33 ++ src/extframe.h | 8 + src/frame.cpp | 33 ++ src/frame.h | 17 +- src/headerframe.h | 1 + src/methodframe.cpp | 33 ++ src/methodframe.h | 7 + src/protocolheaderframe.h | 8 + src/receivedframe.cpp | 60 ++-- src/table.cpp | 41 +++ 26 files changed, 719 insertions(+), 973 deletions(-) delete mode 100644 include/envelopefield.h create mode 100644 include/metadata.h create mode 100644 src/extframe.cpp create mode 100644 src/frame.cpp create mode 100644 src/methodframe.cpp diff --git a/Makefile b/Makefile index 6c0b0a7..f341d6c 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,6 @@ install: mkdir -p ${INCLUDE_DIR}/libamqp mkdir -p ${LIBRARY_DIR} cp -f libamqp.h ${INCLUDE_DIR} - cp -f include/*.h ${INCLUDE_DIR}/amqp - cp -f src/libamqp.so ${LIBRARY_DIR} - cp -f src/libamqp.a ${LIBRARY_DIR} + cp -f include/*.h ${INCLUDE_DIR}/libamqp + cp -f src/liblibamqp.so ${LIBRARY_DIR} + cp -f src/liblibamqp.a ${LIBRARY_DIR} diff --git a/include/array.h b/include/array.h index d0472c8..befd1d6 100644 --- a/include/array.h +++ b/include/array.h @@ -41,6 +41,12 @@ public: */ Array(const Array &array); + /** + * Move constructor + * @param array + */ + Array(Array &&array) : _fields(std::move(array._fields)) {} + /** * Constructor for an empty Array */ diff --git a/include/channel.h b/include/channel.h index aff013f..8f4ac2e 100644 --- a/include/channel.h +++ b/include/channel.h @@ -199,27 +199,27 @@ public: * * - nowait do not wait on response * - * @param queue the target queue * @param exchange the source exchange + * @param queue the target queue * @param routingkey the routing key * @param flags additional flags * @param arguments additional bind arguments * @return bool */ - bool bindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); } - bool bindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); } - bool bindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); } + bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); } + bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); } + bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); } /** * Unbind a queue from an exchange - * @param queue the target queue * @param exchange the source exchange + * @param queue the target queue * @param routingkey the routing key * @param arguments additional bind arguments * @return bool */ - bool unbindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); } - bool unbindQueue(const std::string &queue, const std::string &exchange, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); } + bool unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); } + bool unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); } /** * Purge a queue diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 63e8ef7..938bdca 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -84,7 +84,7 @@ protected: * Queued messages that should be sent after the connection has been established * @var queue */ - std::queue _queue; + std::queue _queue; private: @@ -172,6 +172,25 @@ public: _maxFrame = size; } + /** + * The max frame size + * @return uint32_t + */ + uint32_t maxFrame() + { + return _maxFrame; + } + + /** + * The max payload size for body frames + * @return uint32_t + */ + uint32_t maxPayload() + { + // 8 bytes for header and end-of-frame byte + return _maxFrame - 8; + } + /** * Add a channel to the connection, and return the channel ID that it * is allowed to use, or 0 when no more ID's are available diff --git a/include/envelope.h b/include/envelope.h index 7cc4545..eaa33ae 100644 --- a/include/envelope.h +++ b/include/envelope.h @@ -15,7 +15,7 @@ namespace AMQP { /** * Class definition */ -class Envelope +class Envelope : public MetaData { private: /** @@ -23,92 +23,13 @@ private: * library!) * @var const char * */ - const char *_data; + const char *_body; /** * Size of the data - * @var size_t + * @var uint64_t */ - size_t _size; - - /** - * The content type - * @var EnvelopeField - */ - EnvelopeField _contentType; - - /** - * The content encoding - * @var EnvelopeField - */ - EnvelopeField _contentEncoding; - - /** - * The priority - * @var EnvelopeField - */ - EnvelopeField _priority; - - /** - * The delivery mode (1=non-persistent, 2=persistent) - * @var EnvelopeField - */ - EnvelopeField _deliveryMode; - - /** - * The correlation ID - * @var EnvelopeField - */ - EnvelopeField _correlationID; - - /** - * Reply-to field - * @var EnvelopeField - */ - EnvelopeField _replyTo; - - /** - * Expiration value - * @var EnvelopeField - */ - EnvelopeField _expiration; - - /** - * The message id - * @var EnvelopeField - */ - EnvelopeField _messageID; - - /** - * Timestamp - * @var EnvelopeField - */ - EnvelopeField _timestamp; - - /** - * The type name - * @var EnvelopeField - */ - EnvelopeField _typeName; - - /** - * The user ID - * @var EnvelopeField - */ - EnvelopeField _userID; - - /** - * The application ID - * @var EnvelopeField - */ - EnvelopeField _appID; - - /** - * Additional custom headers - * @var EnvelopeField - */ - EnvelopeField _headers; - + uint64_t _bodysize; public: /** @@ -117,16 +38,16 @@ public: * The data buffer that you pass to this constructor must be valid during * the lifetime of the Envelope object. * - * @param data + * @param body * @param size */ - Envelope(const char *data, size_t size) : _data(data), _size(size) {} + Envelope(const char *body, uint64_t size) : MetaData(), _body(body), _bodysize(size) {} /** * Constructor based on a string - * @param message + * @param body */ - Envelope(const std::string &data) : _data(data.data()), _size(data.size()) {} + Envelope(const std::string &body) : MetaData(), _body(body.data()), _bodysize(body.size()) {} /** * Destructor @@ -137,110 +58,18 @@ public: * Access to the full message data * @return buffer */ - const char *body() + const char *body() const { - return _data; + return _body; } /** * Size of the body - * @return size_t + * @return uint64_t */ - size_t size() + uint64_t bodySize() const { - return _size; - } - - /** - * Check if a certain field is set - * @return bool - */ - bool hasPriority () { return _priority.valid(); } - bool hasDeliveryMode () { return _deliveryMode.valid(); } - bool hasTimestamp () { return _timestamp.valid(); } - bool hasContentType () { return _contentType.valid(); } - bool hasContentEncoding () { return _contentEncoding.valid(); } - bool hasCorrelationID () { return _correlationID.valid(); } - bool hasReplyTo () { return _replyTo.valid(); } - bool hasExpiration () { return _expiration.valid(); } - bool hasMessageID () { return _messageID.valid(); } - bool hasTypeName () { return _typeName.valid(); } - bool hasUserID () { return _userID.valid(); } - bool hasAppID () { return _appID.valid(); } - bool hasHeaders () { return _headers.valid(); } - - /** - * Set the various supported fields - * @param value - */ - void setPriority (uint8_t value) { _priority = value; } - void setDeliveryMode (uint8_t value) { _deliveryMode = value; } - void setTimestamp (uint64_t value) { _timestamp = value; } - void setContentType (const std::string &value) { _contentType = value; } - void setContentEncoding (const std::string &value) { _contentEncoding = value; } - void setCorrelationID (const std::string &value) { _correlationID = value; } - void setReplyTo (const std::string &value) { _replyTo = value; } - void setExpiration (const std::string &value) { _expiration = value; } - void setMessageID (const std::string &value) { _messageID = value; } - void setTypeName (const std::string &value) { _typeName = value; } - void setUserID (const std::string &value) { _userID = value; } - void setAppID (const std::string &value) { _appID = value; } - void setHeaders (const Table &value) { _headers = value; } - - /** - * Reset the various supported fields - * @param value - */ - void setPriority (nullptr_t value = nullptr) { _priority = value; } - void setDeliveryMode (nullptr_t value = nullptr) { _deliveryMode = value; } - void setTimestamp (nullptr_t value = nullptr) { _timestamp = value; } - void setContentType (nullptr_t value = nullptr) { _contentType = value; } - void setContentEncoding (nullptr_t value = nullptr) { _contentEncoding = value; } - void setCorrelationID (nullptr_t value = nullptr) { _correlationID = value; } - void setReplyTo (nullptr_t value = nullptr) { _replyTo = value; } - void setExpiration (nullptr_t value = nullptr) { _expiration = value; } - void setMessageID (nullptr_t value = nullptr) { _messageID = value; } - void setTypeName (nullptr_t value = nullptr) { _typeName = value; } - void setUserID (nullptr_t value = nullptr) { _userID = value; } - void setAppID (nullptr_t value = nullptr) { _appID = value; } - void setHeaders (nullptr_t value = nullptr) { _headers = value; } - - /** - * Retrieve the fields - * @return string - */ - uint8_t priority () { return _priority; } - uint8_t deliveryMode () { return _deliveryMode; } - uint64_t timestamp () { return _timestamp; } - std::string &contentType () { return _contentType; } - std::string &contentEncoding() { return _contentEncoding; } - std::string &correlationID () { return _correlationID; } - std::string &replyTo () { return _replyTo; } - std::string &expiration () { return _expiration; } - std::string &messageID () { return _messageID; } - std::string &typeName () { return _typeName; } - std::string &userID () { return _userID; } - std::string &appID () { return _appID; } - Table &headers () { return _headers; } - - /** - * Is this a message with persistent storage - * This is an alias for retrieving the delivery mode and checking if it is set to 2 - * @return bool - */ - bool persistent() - { - return hasDeliveryMode() && deliveryMode() == 2; - } - - /** - * Set whether storage should be persistent or not - * @param bool - */ - void setPersistent(bool value = true) - { - if (value) setDeliveryMode(2); - else setDeliveryMode(nullptr); + return _bodysize; } }; diff --git a/include/envelopefield.h b/include/envelopefield.h deleted file mode 100644 index d362927..0000000 --- a/include/envelopefield.h +++ /dev/null @@ -1,108 +0,0 @@ -/** - * EnvelopeField.h - * - * An envelope field is a field that also keeps track whether it is set - * or not. Used internally by the Envelope class. - * - * @copyright 2014 Copernica BV - */ - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Class definition - */ -template -class EnvelopeField -{ -private: - /** - * The actual value - * @var T - */ - T _value; - - /** - * Is it set or not - * @var bool - */ - bool _isset; - -public: - /** - * Empty constructor - */ - EnvelopeField() : _isset(false) {} - - /** - * Constructor - * @param value - * @param isset - */ - EnvelopeField(const T &value, bool isset = true) : _value(value), _isset(isset) {} - - /** - * Destructor - */ - virtual ~EnvelopeField() {} - - /** - * Assign a new value - * @param value - * @return EnvelopeField - */ - EnvelopeField &operator=(const T &value) - { - _value = value; - _isset = true; - return *this; - } - - /** - * Reset the value - * @param value - * @return EnvelopeField - */ - EnvelopeField &operator=(nullptr_t value) - { - _value = T(); - _isset = false; - return *this; - } - - /** - * Cast to the set value - * @return T - */ - operator T& () - { - return _value; - } - - /** - * Reset the value to not being set - */ - void reset() - { - _isset = false; - _value = T(); - } - - /** - * Is it set? - * @return bool - */ - bool valid() - { - return _isset; - } -}; - -/** - * End of namespace - */ -} - diff --git a/include/metadata.h b/include/metadata.h new file mode 100644 index 0000000..a88cc47 --- /dev/null +++ b/include/metadata.h @@ -0,0 +1,302 @@ +/** + * MetaData.h + * + * With every published message a set of meta data is passed to. This class + * holds all that meta data. + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class MetaData +{ +protected: + /** + * First set of booleans + * @var BooleanSet + */ + BooleanSet _bools1; + + /** + * Second set of booleans + * @var BooleanSet + */ + BooleanSet _bools2; + + /** + * MIME content type + * @var ShortString + */ + ShortString _contentType; + + /** + * MIME content encoding + * @var ShortString + */ + ShortString _contentEncoding; + + /** + * message header field table + * @var Table + */ + Table _headers; + + /** + * Delivery mode (non-persistent (1) or persistent (2)) + * @var UOctet + */ + UOctet _deliveryMode = 0; + + /** + * boolean whether field was sent to us + * @var UOctet + */ + UOctet _priority = 0; + + /** + * application correlation identifier + * @var ShortString + */ + ShortString _correlationID; + + /** + * address to reply to + * @var ShortString + */ + ShortString _replyTo; + + /** + * message expiration identifier + * @var ShortString + */ + ShortString _expiration; + + /** + * application message identifier + * @var ShortString + */ + ShortString _messageID; + + /** + * message timestamp + * @var Timestamp + */ + Timestamp _timestamp; + + /** + * message type name + * @var ShortString + */ + ShortString _typeName; + + /** + * creating user id + * @var ShortString + */ + ShortString _userID; + + /** + * creating application id + * @var ShortString + */ + ShortString _appID; + + /** + * Deprecated cluster ID + * @var ShortString + */ + ShortString _clusterID; + + + /** + * Protected constructor to ensure that this class can only be constructed + * in a derived class + */ + MetaData() {} + + +public: + /** + * Read incoming frame + * @param frame + */ + MetaData(ReceivedFrame &frame) : + _bools1(frame), + _bools2(frame) + { + // only copy the properties that were sent + if (hasContentType()) _contentType = ShortString(frame); + if (hasContentEncoding()) _contentEncoding = ShortString(frame); + if (hasHeaders()) _headers = Table(frame); + if (hasDeliveryMode()) _deliveryMode = UOctet(frame); + if (hasPriority()) _priority = UOctet(frame); + if (hasCorrelationID()) _correlationID = ShortString(frame); + if (hasReplyTo()) _replyTo = ShortString(frame); + if (hasExpiration()) _expiration = ShortString(frame); + if (hasMessageID()) _messageID = ShortString(frame); + if (hasTimestamp()) _timestamp = Timestamp(frame); + if (hasTypeName()) _typeName = ShortString(frame); + if (hasUserID()) _userID = ShortString(frame); + if (hasAppID()) _appID = ShortString(frame); + if (hasClusterID()) _clusterID = ShortString(frame); + } + + /** + * Destructor + */ + virtual ~MetaData() {} + + /** + * Check if a certain field is set + * @return bool + */ + bool hasExpiration () const { return _bools1.get(0); } + bool hasReplyTo () const { return _bools1.get(1); } + bool hasCorrelationID () const { return _bools1.get(2); } + bool hasPriority () const { return _bools1.get(3); } + bool hasDeliveryMode () const { return _bools1.get(4); } + bool hasHeaders () const { return _bools1.get(5); } + bool hasContentEncoding () const { return _bools1.get(6); } + bool hasContentType () const { return _bools1.get(7); } + bool hasClusterID () const { return _bools2.get(2); } + bool hasAppID () const { return _bools2.get(3); } + bool hasUserID () const { return _bools2.get(4); } + bool hasTypeName () const { return _bools2.get(5); } + bool hasTimestamp () const { return _bools2.get(6); } + bool hasMessageID () const { return _bools2.get(7); } + + /** + * Set the various supported fields + * @param value + */ + void setExpiration (const std::string &value) { _expiration = value; _bools1.set(0,true); } + void setReplyTo (const std::string &value) { _replyTo = value; _bools1.set(1,true); } + void setCorrelationID (const std::string &value) { _correlationID = value; _bools1.set(2,true); } + void setPriority (uint8_t value) { _priority = value; _bools1.set(3,true); } + void setDeliveryMode (uint8_t value) { _deliveryMode = value; _bools1.set(4,true); } + void setHeaders (const Table &value) { _headers = value; _bools1.set(5,true); } + void setContentEncoding (const std::string &value) { _contentEncoding = value; _bools1.set(6,true); } + void setContentType (const std::string &value) { _contentType = value; _bools1.set(7,true); } + void setClusterID (const std::string &value) { _clusterID = value; _bools2.set(2,true); } + void setAppID (const std::string &value) { _appID = value; _bools2.set(3,true); } + void setUserID (const std::string &value) { _userID = value; _bools2.set(4,true); } + void setTypeName (const std::string &value) { _typeName = value; _bools2.set(5,true); } + void setTimestamp (uint64_t value) { _timestamp = value; _bools2.set(6,true); } + void setMessageID (const std::string &value) { _messageID = value; _bools2.set(7,true); } + + /** + * Retrieve the fields + * @return string + */ + const std::string &expiration () const { return _expiration; } + const std::string &replyTo () const { return _replyTo; } + const std::string &correlationID () const { return _correlationID; } + const uint8_t priority () const { return _priority; } + const uint8_t deliveryMode () const { return _deliveryMode; } + const Table &headers () const { return _headers; } + const std::string &contentEncoding() const { return _contentEncoding; } + const std::string &contentType () const { return _contentType; } + const std::string &clusterID () const { return _clusterID; } + const std::string &appID () const { return _appID; } + const std::string &userID () const { return _userID; } + const std::string &typeName () const { return _typeName; } + const uint64_t timestamp () const { return _timestamp; } + const std::string &messageID () const { return _messageID; } + + /** + * Is this a message with persistent storage + * This is an alias for retrieving the delivery mode and checking if it is set to 2 + * @return bool + */ + bool persistent() + { + return hasDeliveryMode() && deliveryMode() == 2; + } + + /** + * Set whether storage should be persistent or not + * @param bool + */ + void setPersistent(bool value = true) + { + if (value) + { + // simply set the delivery mode + setDeliveryMode(2); + } + else + { + // we remove the field from the header + _deliveryMode = 0; + _bools1.set(4,false); + } + } + + /** + * Total size + * @return uint32_t + */ + uint32_t size() const + { + // the result (2 for the two boolean sets) + uint32_t result = 2; + + if (hasExpiration()) result += _expiration.size(); + if (hasReplyTo()) result += _replyTo.size(); + if (hasCorrelationID()) result += _correlationID.size(); + if (hasPriority()) result += _priority.size(); + if (hasDeliveryMode()) result += _deliveryMode.size(); + if (hasHeaders()) result += _headers.size(); + if (hasContentEncoding()) result += _contentEncoding.size(); + if (hasContentType()) result += _contentType.size(); + if (hasClusterID()) result += _clusterID.size(); + if (hasAppID()) result += _appID.size(); + if (hasUserID()) result += _userID.size(); + if (hasTypeName()) result += _typeName.size(); + if (hasTimestamp()) result += _timestamp.size(); + if (hasMessageID()) result += _messageID.size(); + + // done + return result; + } + + /** + * Fill an output buffer + * @param buffer + */ + void fill(OutBuffer &buffer) const + { + // the two boolean sets are always present + _bools1.fill(buffer); + _bools2.fill(buffer); + + // only copy the properties that were sent + if (hasContentType()) _contentType.fill(buffer); + if (hasContentEncoding()) _contentEncoding.fill(buffer); + if (hasHeaders()) _headers.fill(buffer); + if (hasDeliveryMode()) _deliveryMode.fill(buffer); + if (hasPriority()) _priority.fill(buffer); + if (hasCorrelationID()) _correlationID.fill(buffer); + if (hasReplyTo()) _replyTo.fill(buffer); + if (hasExpiration()) _expiration.fill(buffer); + if (hasMessageID()) _messageID.fill(buffer); + if (hasTimestamp()) _timestamp.fill(buffer); + if (hasTypeName()) _typeName.fill(buffer); + if (hasUserID()) _userID.fill(buffer); + if (hasAppID()) _appID.fill(buffer); + if (hasClusterID()) _clusterID.fill(buffer); + } +}; + +/** + * End of namespace + */ +} + diff --git a/include/numericfield.h b/include/numericfield.h index 35e4236..bcddf58 100644 --- a/include/numericfield.h +++ b/include/numericfield.h @@ -92,6 +92,7 @@ public: NumericField& operator=(T value) { _value = value; + return *this; }; /** diff --git a/include/outbuffer.h b/include/outbuffer.h index 5fce8d4..70d4b56 100644 --- a/include/outbuffer.h +++ b/include/outbuffer.h @@ -36,6 +36,12 @@ private: */ size_t _size; + /** + * The total capacity of the out buffer + * @var size_t + */ + size_t _capacity; + public: /** @@ -44,16 +50,53 @@ public: */ OutBuffer(uint32_t capacity) { + // initialize members _size = 0; + _capacity = capacity; _buffer = _current = new char[capacity]; } + + /** + * Copy constructor + * @param that + */ + OutBuffer(const OutBuffer &that) + { + // initialize members + _size = that._size; + _capacity = that._capacity; + _buffer = new char[_capacity]; + _current = _buffer + _size; + + // copy memory + memcpy(_buffer, that._buffer, _size); + } + + /** + * Move constructor + * @param that + */ + OutBuffer(OutBuffer &&that) + { + // copy all members + _size = that._size; + _capacity = that._capacity; + _buffer = that._buffer; + _current = that._current; + + // reset the other object + that._size = 0; + that._capacity = 0; + that._buffer = nullptr; + that._current = nullptr; + } /** * Destructor */ virtual ~OutBuffer() { - delete[] _buffer; + if (_buffer) delete[] _buffer; } /** diff --git a/include/table.h b/include/table.h index f662d28..c26fb6d 100644 --- a/include/table.h +++ b/include/table.h @@ -46,11 +46,31 @@ public: */ Table(const Table &table); + /** + * Move constructor + * @param table + */ + Table(Table &&table) : _fields(std::move(table._fields)) {} + /** * Destructor */ virtual ~Table() {} + /** + * Assignment operator + * @param table + * @return Table + */ + Table &operator=(const Table &table); + + /** + * Move assignment operator + * @param table + * @return Table + */ + Table &operator=(Table &&table); + /** * Create a new instance on the heap of this object, identical to the object passed * @return Field* diff --git a/libamqp.h b/libamqp.h index 793522d..7726b42 100644 --- a/libamqp.h +++ b/libamqp.h @@ -42,7 +42,7 @@ #include // envelope for publishing and consuming -#include +#include #include // mid level includes diff --git a/src/Makefile b/src/Makefile index ded13c7..d3e4050 100644 --- a/src/Makefile +++ b/src/Makefile @@ -3,7 +3,7 @@ RM = rm -f CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g LD = g++ LD_FLAGS = -Wall -shared -O2 -RESULT = libamqp.so +RESULT = liblibamqp.so STATIC = $(RESULT:%.so=%.a) SOURCES = $(wildcard *.cpp) diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index 411c32d..f57cdd5 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -28,102 +28,10 @@ private: uint64_t _bodySize; /** - * First set of booleans - * @var BooleanSet + * The meta data + * @var MetaData */ - BooleanSet _bools1; - - /** - * Second set of booleans - * @var BooleanSet - */ - BooleanSet _bools2; - - /** - * MIME content type - * @var ShortString - */ - ShortString _contentType; - - /** - * MIME content encoding - * @var ShortString - */ - ShortString _contentEncoding; - - /** - * message header field table - * @var Table - */ - Table _headers; - - /** - * Delivery mode (non-persistent (1) or persistent (2)) - * @var uint8_t - */ - uint8_t _deliveryMode; - - /** - * boolean whether field was sent to us - * @var uint8_t - */ - uint8_t _priority; - - /** - * application correlation identifier - * @var ShortString - */ - ShortString _correlationID; - - /** - * address to reply to - * @var ShortString - */ - ShortString _replyTo; - - /** - * message expiration identifier - * @var ShortString - */ - ShortString _expiration; - - /** - * application message identifier - * @var ShortString - */ - ShortString _messageID; - - /** - * message timestamp - * @var Timestamp - */ - Timestamp _timestamp; - - /** - * message type name - * @var ShortString - */ - ShortString _typeName; - - /** - * creating user id - * @var ShortString - */ - ShortString _userID; - - /** - * creating application id - * @var ShortString - */ - ShortString _appID; - - /** - * Deprecated cluster ID - * @var ShortString - */ - ShortString _clusterID; - - + MetaData _metadata; protected: /** @@ -131,7 +39,7 @@ protected: * * @param buffer buffer to write frame to */ - virtual void fill(OutBuffer& buffer) const override + virtual void fill(OutBuffer &buffer) const override { // call base HeaderFrame::fill(buffer); @@ -139,22 +47,9 @@ protected: // fill own fields. buffer.add(_weight); buffer.add(_bodySize); - _bools1.fill(buffer); - _bools2.fill(buffer); - if (contentTypeSent() ) { _contentType.fill(buffer); } - if (contentEncodingSent() ) { _contentEncoding.fill(buffer); } - if (headersSent() ) { _headers.fill(buffer); } - if (deliveryModeSent() ) { buffer.add(_deliveryMode); } - if (prioritySent() ) { buffer.add(_priority); } - if (correlationIDSent() ) { _correlationID.fill(buffer); } - if (replyToSent() ) { _replyTo.fill(buffer); } - if (expirationSent() ) { _expiration.fill(buffer); } - if (messageIDSent() ) { _messageID.fill(buffer); } - if (timestampSent() ) { _timestamp.fill(buffer); } - if (typeNameSent() ) { _typeName.fill(buffer); } - if (userIDSent() ) { _userID.fill(buffer); } - if (appIDSent() ) { _appID.fill(buffer); } + // the meta data + _metadata.fill(buffer); } public: @@ -164,13 +59,13 @@ public: * All options are set using setter functions. * * @param channel channel we're working on + * @param envelope the envelope */ - BasicHeaderFrame(uint16_t channel, uint64_t bodySize) : - HeaderFrame(channel, 12), // there are at least 12 bytes sent, weight (2), bodySize (8), property flags (2) + BasicHeaderFrame(uint16_t channel, const Envelope &envelope) : + HeaderFrame(channel, 10 + envelope.size()), // there are at least 10 bytes sent, weight (2), bodySize (8), plus the size of the meta data _weight(0), - _bodySize(bodySize), - _deliveryMode(0), - _priority(0) + _bodySize(envelope.bodySize()), + _metadata(envelope) {} /** @@ -181,27 +76,9 @@ public: HeaderFrame(frame), _weight(frame.nextUint16()), _bodySize(frame.nextUint64()), - _bools1(frame), - _bools2(frame), - _deliveryMode(0), - _priority(0) - { - if (contentTypeSent()) _contentType = ShortString(frame); - if (contentEncodingSent()) _contentEncoding = ShortString(frame); - if (headersSent()) _headers = Table(frame); - if (deliveryModeSent()) _deliveryMode = frame.nextUint8(); - if (prioritySent()) _priority = frame.nextUint8(); - if (correlationIDSent()) _correlationID = ShortString(frame); - if (replyToSent()) _replyTo = ShortString(frame); - if (expirationSent()) _expiration = ShortString(frame); - if (messageIDSent()) _messageID = ShortString(frame); - if (timestampSent()) _timestamp = Timestamp(frame); - if (typeNameSent()) _typeName = ShortString(frame); - if (userIDSent()) _userID = ShortString(frame); - if (appIDSent()) _appID = ShortString(frame); - if (clusterIDSent()) _clusterID = ShortString(frame); - } - + _metadata(frame) + {} + /** * Destructor */ @@ -224,489 +101,6 @@ public: { return 60; } - - /** - * Set the body size - * @param uint64_t sum of all body-sizes sent after this headerframe - */ - void setBodySize(uint64_t size) - { - _bodySize = size; - } - - /** - * return the MIME content type - * @return string - */ - const std::string& contentType() const - { - return _contentType; - } - - /** - * Set the content type - * @param string - */ - void setContentType(std::string& string) - { - // was there already a content type - if (contentTypeSent()) modifySize(-_contentType.size()); - - // set the new content type - setContentTypeSent(string.size() > 0); - _contentType = ShortString(string); - - // modify the size to include the new content type - if (contentTypeSent()) modifySize(_contentType.size()); - } - - /** - * Set the bool for content type sent - * @param bool - */ - void setContentTypeSent(bool b) - { - _bools1.set(7, b); - } - - /** - * return the MIME content encoding - * @return string - */ - const std::string& contentEncoding() const - { - return _contentEncoding; - } - - /** - * Set content encoding - * @param string - */ - void setContentEncoding(std::string& string) - { - // was there already a content encoding? - if(contentEncodingSent()) modifySize(-_contentEncoding.size()); - - // set new content encoding - setContentEncodingSent(string.size() > 0); - _contentEncoding = ShortString(string); - - // modify size to include the new content type - modifySize(_contentEncoding.size()); - } - - /** - * set contentencoding sent - * @param bool - */ - void setContentEncodingSent(bool b) - { - _bools1.set(6, b); - } - - /** - * return the message header field table - * @return Table - */ - const Table& headers() const - { - return _headers; - } - - /** - * Set headers - * @param Table - */ - void setHeaders(Table& t) - { - // were the headers already set - if(headersSent()) modifySize(-_headers.size()); - - // set new headers - setHeadersSent(true); - _headers = t; - - // modify size to include the new headers - modifySize(_headers.size()); - } - - /** - * Set headers sent - * @param bool - */ - void setHeadersSent(bool b) - { - _bools1.set(5, b); - } - - /** - * return whether non-persistent (1) or persistent (2) - * @return uint8_t - */ - uint8_t deliveryMode() const - { - return _deliveryMode; - } - - /** - * Set deliverymode - * @param uint8_t - */ - void setDeliveryMode(uint8_t val) - { - // was the delivery mode already set - if(deliveryModeSent()) modifySize(-1); - - // set delivery mode - setDeliverModeSent(true); - _deliveryMode = Octet(val); - - // add new size - modifySize(1); - } - - /** - * set delivermode sent - * @param bool - */ - void setDeliverModeSent(bool b) - { - _bools1.set(4, b); - } - - /** - * return the message priority (0-9) - * @return uint8_t - */ - uint8_t priority() const - { - return _priority; - } - - /** - * Set priority - * @param uint8_t - */ - void setPriority(uint8_t val) - { - // was the priority already sent - if(prioritySent()) modifySize(-1); - - // set priority - setPrioritySent(true); - _priority = Octet(val); - - // add new size - modifySize(1); - } - - /** - * Set priority sent - * @param bool - */ - void setPrioritySent(bool b) - { - _bools1.set(3, b); - } - - /** - * return the application correlation identifier - * @return string - */ - const std::string& correlationID() const - { - return _correlationID; - } - - /** - * set correlation ID - * @param string - */ - void setCorrelationID(std::string &s) - { - // was the correlation ID sent - if(correlationIDSent()) modifySize(-_correlationID.size()); - - // set new correlation ID - setCorrelationIDSent(true); - _correlationID = ShortString(s); - - // add new size - modifySize(_correlationID.size()); - } - - /** - * Set correlationIDSent - * @param bool - */ - void setCorrelationIDSent(bool b) - { - _bools1.set(2, b); - } - - /** - * return the address to reply to - * @return string - */ - const std::string& replyTo() const - { - return _replyTo; - } - - /** - * Set reply to - * @param string - */ - void setReplyTo(std::string &s) - { - // was replyTo set? - if(replyToSent()) modifySize(-_replyTo.size()); - - // add new replyTo - setReplyToSent(true); - _replyTo = ShortString(s); - - modifySize(_replyTo.size()); - } - - /** - * set reply to sent - * @param bool - */ - void setReplyToSent(bool b) - { - _bools1.set(1, b); - } - - /** - * return the message expiration identifier - * @return string - */ - const std::string& expiration() const - { - return _expiration; - } - - /** - * Set expiration - * @param string - */ - void setExpiration(std::string &s) - { - // was expiration set? - if(expirationSent()) modifySize(-_expiration.size()); - - // set expiration - setExpirationSent(true); - _expiration = ShortString(s); - - // add new size - modifySize(_expiration.size()); - } - - /** - * set expiration sent - * @param bool - */ - void setExpirationSent(bool b) - { - _bools1.set(0, b); - } - - /** - * return the application message identifier - * @return string - */ - const std::string& messageID() const - { - return _messageID; - } - - /** - * set message ID - * @param string - */ - void setMessageID(std::string &s) - { - // was message ID sent? - if(messageIDSent()) modifySize(-_messageID.size()); - - // set messageID - setMessageIDSent(true); - _messageID = ShortString(s); - - // add size - modifySize(_messageID.size()); - } - - /** - * set messageID sent - * @param bool - */ - void setMessageIDSent(bool b) - { - _bools2.set(7, b); - } - - /** - * return the message timestamp - * @return uint64_t - */ - Timestamp timestamp() const - { - return _timestamp; - } - - /** - * set timestamp - * @param uint64_t - */ - void setTimestamp(uint64_t val) - { - // was timestamp sent? - if(timestampSent()) modifySize(-_timestamp.size()); - - // set timestamp - setTimestampSent(true); - _timestamp = Timestamp(val); - - // add new size - modifySize(_timestamp.size()); - } - - /** - * set timestamp sent - * @param bool - */ - void setTimestampSent(bool b) - { - _bools2.set(6, b); - } - - /** - * return the message type name - * @return string - */ - const std::string& typeName() const - { - return _typeName; - } - - /** - * set typename - * @param string - */ - void setTypeName(std::string &s) - { - // was typename sent? - if(typeNameSent()) modifySize(-_typeName.size()); - - // add typename - setTypeNameSent(true); - _typeName = ShortString(s); - - // add new size - modifySize(_typeName.size()); - } - - /** - * set typename sent - * @param bool - */ - void setTypeNameSent(bool b) - { - _bools2.set(5, b); - } - - /** - * return the creating user id - * @return string - */ - const std::string& userID() const - { - return _userID; - } - - /** - * set User ID - * @param string - */ - void setUserID(std::string &s) - { - // was user id sent? - if(userIDSent()) modifySize(-_userID.size()); - - // set new userID - setUserIDSent(true); - _userID = ShortString(s); - - // add size - modifySize(_userID.size()); - } - - /** - * set user id sent - * @param bool - */ - void setUserIDSent(bool b) - { - _bools2.set(4, b); - } - - /** - * return the application id - * @return string - */ - const std::string& appID() const - { - return _appID; - } - - /** - * set appID - * @param string - */ - void setAppID(std::string &s) - { - // was app id sent? - if(appIDSent()) modifySize(-_appID.size()); - - // add new app id - setAppIDSent(true); - _appID = ShortString(s); - - // add size - modifySize(_appID.size()); - } - - /** - * set app id sent - * @param bool - */ - void setAppIDSent(bool b) - { - _bools2.set(3, b); - } - - /** - * Return whether a field was sent - * @return bool - */ - bool expirationSent() const { return _bools1.get(0); } - bool replyToSent() const { return _bools1.get(1); } - bool correlationIDSent() const { return _bools1.get(2); } - bool prioritySent() const { return _bools1.get(3); } - bool deliveryModeSent() const { return _bools1.get(4); } - bool headersSent() const { return _bools1.get(5); } - bool contentEncodingSent() const { return _bools1.get(6); } - bool contentTypeSent() const { return _bools1.get(7); } - bool clusterIDSent() const { return _bools2.get(2); } - bool appIDSent() const { return _bools2.get(3); } - bool userIDSent() const { return _bools2.get(4); } - bool typeNameSent() const { return _bools2.get(5); } - bool timestampSent() const { return _bools2.get(6); } - bool messageIDSent() const { return _bools2.get(7); } - }; /** diff --git a/src/bodyframe.h b/src/bodyframe.h index ecd143b..811a278 100644 --- a/src/bodyframe.h +++ b/src/bodyframe.h @@ -18,9 +18,15 @@ private: /** * Payload of the frame * Payload can be any number of octets - * @var vector + * @var const char * */ - std::string _payload; + const char *_payload; + + /** + * Size of the payload + * @var uint64_t + */ + uint32_t _size; protected: /** @@ -34,7 +40,7 @@ protected: ExtFrame::fill(buffer); // add payload to buffer - buffer.add(_payload); + buffer.add(_payload, _size); } public: @@ -43,10 +49,12 @@ public: * * @param channel channel identifier * @param payload payload of the body + * @param size size of the payload */ - BodyFrame(uint16_t channel, const std::string &payload) : - ExtFrame(channel, payload.size()), - _payload(payload) + BodyFrame(uint16_t channel, const char *payload, uint32_t size) : + ExtFrame(channel, size), + _payload(payload), + _size(size) {} /** @@ -57,7 +65,8 @@ public: */ BodyFrame(ReceivedFrame& frame) : ExtFrame(frame), - _payload(frame.nextData(frame.payloadSize()), frame.payloadSize()) + _payload(frame.nextData(frame.payloadSize())), + _size(frame.payloadSize()) {} /** @@ -76,9 +85,9 @@ public: /** * Return the payload of the body - * @return vector + * @return const char * */ - const std::string& payload() const + const char *payload() const { return _payload; } diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 8af4389..62865e2 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -21,6 +21,9 @@ #include "queueunbindframe.h" #include "queuepurgeframe.h" #include "queuedeleteframe.h" +#include "basicpublishframe.h" +#include "basicheaderframe.h" +#include "bodyframe.h" #include @@ -155,7 +158,6 @@ bool ChannelImpl::commitTransaction() */ bool ChannelImpl::rollbackTransaction() { - std::cout << "send rollback frame" << std::endl; // must be connected if (!connected()) return false; @@ -365,7 +367,59 @@ bool ChannelImpl::removeQueue(const std::string &name, int flags) // done return true; } + +/** + * Publish a message to an exchange + * + * The following flags can be used + * + * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method + * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method + * + * @todo implement to onReturned() method + * + * @param exchange the exchange to publish to + * @param routingkey the routing key + * @param flags optional flags (see above) + * @param envelope the full envelope to send + * @param message the message to send + * @param size size of the message + */ +bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope) +{ + // @todo prevent crash when connection is destructed + // send the publish frame + send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate)); + + // send header + send(BasicHeaderFrame(_id, envelope)); + + // the max payload size is the max frame size minus the bytes for headers and trailer + uint32_t maxpayload = _connection->_implementation.maxPayload(); + uint32_t bytessent = 0; + + // the buffer + const char *data = envelope.body(); + uint32_t bytesleft = envelope.bodySize(); + + // split up the body in multiple frames depending on the max frame size + while (bytesleft > 0) + { + // size of this chunk + uint32_t chunksize = std::min(maxpayload, bytesleft); + + // send out a body frame + send(BodyFrame(_id, data + bytessent, chunksize)); + + // update counters + bytessent += chunksize; + bytesleft -= chunksize; + } + + // done + return true; +} /** * Send a frame over the channel diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index 73d02b8..0e56fa2 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -196,13 +196,13 @@ void ConnectionImpl::setConnected() while (!_queue.empty()) { // get the next message - std::string message(_queue.front()); - + OutBuffer buffer(std::move(_queue.front())); + // remove it from the queue _queue.pop(); // send it - _handler->onData(_parent, message.data(), message.size()); + _handler->onData(_parent, buffer.data(), buffer.size()); // leap out if the connection was destructed if (!monitor.valid()) return; @@ -223,13 +223,13 @@ size_t ConnectionImpl::send(const Frame &frame) frame.fill(buffer); // append an end of frame byte (but not when still negotiating the protocol) - if (_state != state_protocol) buffer.add((uint8_t)206); + if (frame.needsSeparator()) buffer.add((uint8_t)206); // are we still setting up the connection? if ((_state == state_protocol || _state == state_handshake) && !frame.partOfHandshake()) { // the connection is still being set up, so we need to delay the message sending - _queue.push(std::string(buffer.data(), buffer.size())); + _queue.push(std::move(buffer)); } else { diff --git a/src/extframe.cpp b/src/extframe.cpp new file mode 100644 index 0000000..7d215b3 --- /dev/null +++ b/src/extframe.cpp @@ -0,0 +1,33 @@ +/** + * ExtFrame.cpp + * + * @copyright 2014 Copernica BV + */ +#include "includes.h" +#include "exception.h" +#include "protocolexception.h" + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ +bool ExtFrame::process(ConnectionImpl *connection) +{ + // this is an exception + throw ProtocolException("unimplemented frame type " + std::to_string(type())); + + // unreachable + return false; +} + +/** + * End of namespace + */ +} + diff --git a/src/extframe.h b/src/extframe.h index 56004c0..c37d4ed 100644 --- a/src/extframe.h +++ b/src/extframe.h @@ -131,6 +131,14 @@ public: * Get the message type */ virtual uint8_t type() const = 0; + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection); + }; /** diff --git a/src/frame.cpp b/src/frame.cpp new file mode 100644 index 0000000..e7cc89f --- /dev/null +++ b/src/frame.cpp @@ -0,0 +1,33 @@ +/** + * Frame.cpp + * + * @copyright 2014 Copernica BV + */ +#include "includes.h" +#include "exception.h" +#include "protocolexception.h" + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ +bool Frame::process(ConnectionImpl *connection) +{ + // this is an exception + throw ProtocolException("unimplemented frame"); + + // unreachable + return false; +} + +/** + * End of namespace + */ +} + diff --git a/src/frame.h b/src/frame.h index 1966e98..90298a3 100644 --- a/src/frame.h +++ b/src/frame.h @@ -46,21 +46,20 @@ public: * Is this a frame that is part of the connection setup? * @return bool */ - virtual bool partOfHandshake() const - { - return false; - } + virtual bool partOfHandshake() const { return false; } + + /** + * Does this frame need an end-of-frame seperator? + * @return bool + */ + virtual bool needsSeparator() const { return true; } /** * Process the frame * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) - { - // no process was implemented - return false; - } + virtual bool process(ConnectionImpl *connection); }; /** diff --git a/src/headerframe.h b/src/headerframe.h index b6fa0e2..46c4997 100644 --- a/src/headerframe.h +++ b/src/headerframe.h @@ -35,6 +35,7 @@ protected: */ virtual void fill(OutBuffer& buffer) const { + // call base ExtFrame::fill(buffer); // add type diff --git a/src/methodframe.cpp b/src/methodframe.cpp new file mode 100644 index 0000000..6752658 --- /dev/null +++ b/src/methodframe.cpp @@ -0,0 +1,33 @@ +/** + * MethodFrame.cpp + * + * @copyright 2014 Copernica BV + */ +#include "includes.h" +#include "exception.h" +#include "protocolexception.h" + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ +bool MethodFrame::process(ConnectionImpl *connection) +{ + // this is an exception + throw ProtocolException("unimplemented frame type " + std::to_string(type()) + " class " + std::to_string(classID()) + " method " + std::to_string(methodID())); + + // unreachable + return false; +} + +/** + * End of namespace + */ +} + diff --git a/src/methodframe.h b/src/methodframe.h index ee888aa..029ed04 100644 --- a/src/methodframe.h +++ b/src/methodframe.h @@ -69,6 +69,13 @@ public: * @return uint16_t */ virtual uint16_t methodID() const = 0; + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection); }; /** diff --git a/src/protocolheaderframe.h b/src/protocolheaderframe.h index 1d89847..20fcdd5 100644 --- a/src/protocolheaderframe.h +++ b/src/protocolheaderframe.h @@ -133,6 +133,14 @@ public: return true; } + /** + * Does this frame need an end-of-frame seperator? + * @return bool + */ + virtual bool needsSeparator() const override + { + return false; + } }; /** diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index ed825a5..c55ad29 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -96,10 +96,16 @@ ReceivedFrame::ReceivedFrame(const char *buffer, uint32_t size, uint32_t max) : if (max > 0 && _payloadSize > max - 8) throw ProtocolException("frame size exceeded"); // check if the buffer is big enough to contain all data - if (size >= _payloadSize + 8) return; - - // frame is not yet valid - _type = _channel = _payloadSize = 0; + if (size >= _payloadSize + 8) + { + // buffer is big enough, check for a valid end-of-frame marker + if ((int)buffer[_payloadSize+7] != -50) throw ProtocolException("invalid end of frame marker"); + } + else + { + // frame is not yet valid + _type = _channel = _payloadSize = 0; + } } /** @@ -290,8 +296,6 @@ const char * ReceivedFrame::nextData(uint32_t size) */ bool ReceivedFrame::process(ConnectionImpl *connection) { - std::cout << "_type = " << (int)_type << std::endl; - // check the type switch (_type) { @@ -300,8 +304,10 @@ bool ReceivedFrame::process(ConnectionImpl *connection) case 3: return BodyFrame(*this).process(connection); case 4: return HeartbeatFrame(*this).process(connection); case 8: return HeartbeatFrame(*this).process(connection); - default: return false; } + + // this is a problem + throw ProtocolException("unrecognized frame type " + std::to_string(_type)); } /** @@ -314,8 +320,6 @@ bool ReceivedFrame::processMethodFrame(ConnectionImpl *connection) // read the class id from the method uint16_t classID = nextUint16(); - std::cout << "classID = " << (int)classID << std::endl; - // construct frame based on method id switch (classID) { @@ -325,8 +329,10 @@ bool ReceivedFrame::processMethodFrame(ConnectionImpl *connection) case 50: return processQueueFrame(connection); case 60: return processBasicFrame(connection); case 90: return processTransactionFrame(connection); - default: return false; } + + // this is a problem + throw ProtocolException("unrecognized method frame class " + std::to_string(classID)); } /** @@ -339,8 +345,6 @@ bool ReceivedFrame::processConnectionFrame(ConnectionImpl *connection) // read the method id from the method uint16_t methodID = nextUint16(); - std::cout << "methodID = " << (int)methodID << std::endl; - // construct frame based on method id switch (methodID) { @@ -354,8 +358,10 @@ bool ReceivedFrame::processConnectionFrame(ConnectionImpl *connection) case 41: return ConnectionOpenOKFrame(*this).process(connection); case 50: return ConnectionCloseFrame(*this).process(connection); case 51: return ConnectionCloseOKFrame(*this).process(connection); - default: return false; } + + // this is a problem + throw ProtocolException("unrecognized connection frame method " + std::to_string(methodID)); } /** @@ -368,8 +374,6 @@ bool ReceivedFrame::processChannelFrame(ConnectionImpl *connection) // read the method id from the method uint16_t methodID = nextUint16(); - std::cout << "methodID = " << methodID << std::endl; - // construct frame based on method id switch (methodID) { @@ -379,8 +383,10 @@ bool ReceivedFrame::processChannelFrame(ConnectionImpl *connection) case 21: return ChannelFlowOKFrame(*this).process(connection); case 40: return ChannelCloseFrame(*this).process(connection); case 41: return ChannelCloseOKFrame(*this).process(connection); - default: return false; } + + // this is a problem + throw ProtocolException("unrecognized channel frame method " + std::to_string(methodID)); } /** @@ -393,8 +399,6 @@ bool ReceivedFrame::processExchangeFrame(ConnectionImpl *connection) // read the method id from the method uint16_t methodID = nextUint16(); - std::cout << "methodID = " << (int)methodID << std::endl; - // construct frame based on method id switch(methodID) { @@ -409,8 +413,10 @@ bool ReceivedFrame::processExchangeFrame(ConnectionImpl *connection) // has method ID 51, instead of (the expected) 41. This is tested // and it really has ID 51. case 51: return ExchangeUnbindOKFrame(*this).process(connection); - default: return false; } + + // this is a problem + throw ProtocolException("unrecognized exchange frame method " + std::to_string(methodID)); } /** @@ -436,8 +442,10 @@ bool ReceivedFrame::processQueueFrame(ConnectionImpl *connection) case 41: return QueueDeleteOKFrame(*this).process(connection); case 50: return QueueUnbindFrame(*this).process(connection); case 51: return QueueUnbindOKFrame(*this).process(connection); - default: return false; } + + // this is a problem + throw ProtocolException("unrecognized queue frame method " + std::to_string(methodID)); } /** @@ -470,8 +478,10 @@ bool ReceivedFrame::processBasicFrame(ConnectionImpl *connection) case 100: return BasicRecoverAsyncFrame(*this).process(connection); case 110: return BasicRecoverFrame(*this).process(connection); case 111: return BasicRecoverOKFrame(*this).process(connection); - default: return false; } + + // this is a problem + throw ProtocolException("unrecognized basic frame method " + std::to_string(methodID)); } /** @@ -493,8 +503,10 @@ bool ReceivedFrame::processTransactionFrame(ConnectionImpl *connection) case 21: return TransactionCommitOKFrame(*this).process(connection); case 30: return TransactionRollbackFrame(*this).process(connection); case 31: return TransactionRollbackOKFrame(*this).process(connection); - default: return false; } + + // this is a problem + throw ProtocolException("unrecognized transaction frame method " + std::to_string(methodID)); } /** @@ -511,8 +523,10 @@ bool ReceivedFrame::processHeaderFrame(ConnectionImpl *connection) switch (classID) { case 60: return BasicHeaderFrame(*this).process(connection); - default: return false; } + + // this is a problem + throw ProtocolException("unrecognized header frame class " + std::to_string(classID)); } /** diff --git a/src/table.cpp b/src/table.cpp index 3d28491..ba41983 100644 --- a/src/table.cpp +++ b/src/table.cpp @@ -48,6 +48,47 @@ Table::Table(const Table &table) } } +/** + * Assignment operator + * @param table + * @return Table + */ +Table &Table::operator=(const Table &table) +{ + // skip self assignment + if (this == &table) return *this; + + // empty current fields + _fields.clear(); + + // loop through the table records + for (auto iter = table._fields.begin(); iter != table._fields.end(); iter++) + { + // add the field + _fields[iter->first] = std::shared_ptr(iter->second->clone()); + } + + // done + return *this; +} + +/** + * Move assignment operator + * @param table + * @return Table + */ +Table &Table::operator=(Table &&table) +{ + // skip self assignment + if (this == &table) return *this; + + // copy fields + _fields = std::move(table._fields); + + // done + return *this; +} + /** * Get a field *