From 00b81949d3736470362e9f5b5324b8e1973867bb Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Wed, 8 Mar 2017 13:32:51 +0100 Subject: [PATCH] the AMQP::Message and AMQP::Envelope objects can now longer be copied or moved, and the signatures for creating an envelope and publishing a message have been made much more strict. This could be a API breaking change, to the version number should be upped --- include/callbacks.h | 4 +- include/channel.h | 5 +- include/channelimpl.h | 2 +- include/classes.h | 2 +- include/deferredconsumer.h | 4 +- include/deferredconsumerbase.h | 4 +- include/deferredget.h | 4 +- include/envelope.h | 129 ++++-------------------------- include/message.h | 142 +++++++++++---------------------- include/metadata.h | 2 +- include/receivedframe.h | 2 +- src/basicheaderframe.h | 2 +- src/channelimpl.cpp | 3 +- src/consumedmessage.h | 10 +-- src/deferredconsumer.cpp | 6 +- src/deferredconsumerbase.cpp | 4 +- src/deferredget.cpp | 4 +- src/messageimpl.h | 104 ------------------------ src/receivedframe.cpp | 3 +- src/returnedmessage.h | 8 +- 20 files changed, 94 insertions(+), 350 deletions(-) delete mode 100644 src/messageimpl.h diff --git a/include/callbacks.h b/include/callbacks.h index 46179b4..a7c85b9 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -3,7 +3,7 @@ * * Class storing deferred callbacks of different type. * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** @@ -41,7 +41,7 @@ using EmptyCallback = std::function; using BeginCallback = std::function; using HeaderCallback = std::function; using DataCallback = std::function; -using MessageCallback = std::function; +using MessageCallback = std::function; using CompleteCallback = std::function; using QueueCallback = std::function; using DeleteCallback = std::function; diff --git a/include/channel.h b/include/channel.h index 71b74ca..c75041d 100644 --- a/include/channel.h +++ b/include/channel.h @@ -1,7 +1,7 @@ /** * Class describing a (mid-level) AMQP channel implementation * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** @@ -344,10 +344,7 @@ public: * @param size size of the message */ bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); } - bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message)); } - bool publish(const std::string &exchange, const std::string &routingKey, std::string &&message) { return _implementation->publish(exchange, routingKey, Envelope(std::move(message))); } bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); } /** * Set the Quality of Service (QOS) for this channel diff --git a/include/channelimpl.h b/include/channelimpl.h index 6c02163..22a0835 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -5,7 +5,7 @@ * that has a private constructor so that it can not be used from outside * the AMQP library * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** diff --git a/include/classes.h b/include/classes.h index eec0f8d..e89e22e 100644 --- a/include/classes.h +++ b/include/classes.h @@ -3,6 +3,7 @@ * * List of all declared classes * + * @copyright 2014 - 2017 Copernica BV */ /** @@ -31,7 +32,6 @@ class ConnectionImpl; class Exchange; class Frame; class Login; -class MessageImpl; class Monitor; class OutBuffer; class ReceivedFrame; diff --git a/include/deferredconsumer.h b/include/deferredconsumer.h index f38161b..d5dd161 100644 --- a/include/deferredconsumer.h +++ b/include/deferredconsumer.h @@ -3,7 +3,7 @@ * * Deferred callback for consumers * - * @copyright 2014 - 2016 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** @@ -46,7 +46,7 @@ private: * @param deliveryTag The delivery tag (for ack()ing) * @param redelivered Is this a redelivered message */ - virtual void announce(Message &&message, uint64_t deliveryTag, bool redelivered) const override; + virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override; /** * The channel implementation may call our diff --git a/include/deferredconsumerbase.h b/include/deferredconsumerbase.h index 0d3b379..5b3bbdb 100644 --- a/include/deferredconsumerbase.h +++ b/include/deferredconsumerbase.h @@ -4,7 +4,7 @@ * Base class for the deferred consumer and the * deferred get. * - * @copyright 2016 Copernica B.V. + * @copyright 2016 - 2017 Copernica B.V. */ /** @@ -85,7 +85,7 @@ private: * @param deliveryTag The delivery tag (for ack()ing) * @param redelivered Is this a redelivered message */ - virtual void announce(Message &&message, uint64_t deliveryTag, bool redelivered) const = 0; + virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0; /** * Frames may be processed diff --git a/include/deferredget.h b/include/deferredget.h index 35ec50b..4270d34 100644 --- a/include/deferredget.h +++ b/include/deferredget.h @@ -2,7 +2,7 @@ * DeferredGet.h * * @author Emiel Bruijntjes - * @copyright 2014 - 2016 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** @@ -62,7 +62,7 @@ private: * @param deliveryTag The delivery tag (for ack()ing) * @param redelivered Is this a redelivered message */ - virtual void announce(Message &&message, uint64_t deliveryTag, bool redelivered) const override; + virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override; /** * The channel implementation may call our diff --git a/include/envelope.h b/include/envelope.h index 54c4140..2f5de2b 100644 --- a/include/envelope.h +++ b/include/envelope.h @@ -4,7 +4,7 @@ * When you send or receive a message to the rabbitMQ server, it is encapsulated * in an envelope that contains additional meta information as well. * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** @@ -29,23 +29,22 @@ class Envelope : public MetaData { protected: /** - * The body (only used when string object was passed to constructor - * @var std::string - */ - std::string _str; - - /** - * Pointer to the body data (the memory buffer is not managed by the AMQP - * library!) + * Pointer to the body data (the memory is not managed by the AMQP library!) * @var const char * */ - const char *_body; + char *_body; /** * Size of the data * @var uint64_t */ uint64_t _bodySize; + + /** + * Was the data allocated by this object? + * @var bool + */ + bool _allocated = false; public: /** @@ -57,120 +56,22 @@ public: * @param body * @param size */ - Envelope(const char *body, uint64_t size) : MetaData(), _body(body), _bodySize(size) {} + Envelope(const char *body, uint64_t size) : MetaData(), _body((char *)body), _bodySize(size) {} /** - * Constructor based on a string - * @param body - */ - Envelope(const std::string &body) : MetaData(), _str(body), _body(_str.data()), _bodySize(_str.size()) {} - - /** - * Constructor based on a string - * @param body - */ - Envelope(std::string &&body) : MetaData(), _str(std::move(body)), _body(_str.data()), _bodySize(_str.size()) {} - - /** - * Copy constructor + * Disabled copy constructor * * @param envelope the envelope to copy */ - Envelope(const Envelope &envelope) : - MetaData(envelope), - _str(envelope._body, envelope._bodySize), - _body(_str.data()), - _bodySize(_str.size()) - {} - - /** - * Move constructor - * - * @param envelope the envelope to move - */ - Envelope(Envelope &&envelope) : - MetaData(std::move(envelope)), - _str(std::move(envelope._str)), - _body(_str.data()), - _bodySize(_str.size()) - { - // if the envelope we moved did not have allocation by string - // we are out of luck, and have to copy it ourselves :( - if (_str.empty()) - { - // assign the data from the other envelope - _str.assign(envelope._body, envelope._bodySize); - - // and set the correct pointer and size - _body = _str.data(); - _bodySize = _str.size(); - } - else - { - // we moved the other envelopes string - // which means their body pointer is now - // garbage (it no longer points to a valid - // address), so we need to clear it - envelope._body = nullptr; - envelope._bodySize = 0; - } - } + Envelope(const Envelope &envelope) = delete; /** * Destructor */ - virtual ~Envelope() {} - - /** - * Assignment operator - * - * @param envelope the envelope to copy - * @return same object for chaining - */ - Envelope &operator=(const Envelope &envelope) + virtual ~Envelope() { - // copy the data from the envelope - _str.assign(envelope._body, envelope._bodySize); - - // set the data pointer and body size - _body = _str.data(); - _bodySize = _str.size(); - - // allow chaining - return *this; - } - - /** - * Move assignment operator - * - * @param envelope the envelope to move - * @return same object for chaining - */ - Envelope &operator=(Envelope &&envelope) - { - // was the string in the other envelop empty? - if (envelope._str.empty()) - { - // that's a shame, we have to make a full copy - _str.assign(envelope._body, envelope._bodySize); - } - else - { - // not empty, just move it - _str = std::move(envelope._str); - - // their string is now garbage so the - // pointer is also invalid - envelope._body = nullptr; - envelope._bodySize = 0; - } - - // we now have a valid string, set the body and - _body = _str.data(); - _bodySize = _str.size(); - - // allow chaining - return *this; + // deallocate the data + if (_allocated) free(_body); } /** diff --git a/include/message.h b/include/message.h index 2dd1ef7..5ce4e68 100644 --- a/include/message.h +++ b/include/message.h @@ -7,7 +7,7 @@ * Message objects can not be constructed by end users, they are only constructed * by the AMQP library, and passed to user callbacks. * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** @@ -48,8 +48,15 @@ protected: * The routing key that was originally used * @var string */ - std::string _routingKey; + std::string _routingkey; + + /** + * Number of bytes already filled + * @var size_t + */ + size_t _filled = 0; + /** * We are an open book to the consumer handler */ @@ -79,43 +86,41 @@ protected: */ bool append(const char *buffer, uint64_t size) { - // is this the only data, and also direct complete? - if (_str.empty() && size >= _bodySize) + // is the body already allocated? + if (_allocated) { - // we have everything - _body = buffer; - - // done - return true; + // prevent overflow + size = std::min(size, _bodySize - _filled); + + // append more data + memcpy(_body + _filled, buffer, size); + + // update filled data + _filled += size; + } + else if (size >= _bodySize) + { + // we do not have to combine multiple frames, so we can store + // the buffer pointer in the message + _body = (char *)buffer; } else { - // it does not fit yet, do we have to allocate - if (!_body) - { - // allocate memory in the string - _str.reserve(static_cast(_bodySize)); - - // we now use the data buffer inside the string - _body = _str.data(); - } - - // safety-check: if the given size exceeds the given message body size - // we truncate it, this should never happen because it indicates a bug - // in the AMQP server implementation, should we report this? - size = std::min(size, _bodySize - _str.size()); - - // we can not safely append the data to the string, it - // will not exceed the reserved size so it is guaranteed - // not to change the data pointer, we can just leave that - // @todo this is not always necessary; instead, we can refrain from - // allocating this buffer entirely and just insert it into the message - // directly. - _str.append(buffer, static_cast(size)); - - // if the string is filled with the given number of characters we are done now - return _str.size() >= _bodySize; + // allocate the buffer + _body = (char *)malloc(_bodySize); + + // remember that the buffer was allocated, so that the destructor can get rid of it + _allocated = true; + + // append more data + memcpy(_body, buffer, std::min(size, _bodySize)); + + // update filled data + _filled = std::min(size, _bodySize); } + + // check if we're done + return _filled >= _bodySize; } public: @@ -125,82 +130,28 @@ public: * @param exchange * @param routingKey */ - Message(const std::string &exchange, const std::string &routingKey) : - Envelope(nullptr, 0), _exchange(exchange), _routingKey(routingKey) + Message(std::string exchange, std::string routingkey) : + Envelope(nullptr, 0), _exchange(std::move(exchange)), _routingkey(std::move(routingkey)) {} - /** - * Copy constructor - * + * Disabled copy constructor * @param message the message to copy */ - Message(const Message &message) : - Envelope(message), - _exchange(message._exchange), - _routingKey(message._routingKey) - {} - - /** - * Move constructor - * - * @param message the message to move - */ - Message(Message &&message) : - Envelope(std::move(message)), - _exchange(std::move(message._exchange)), - _routingKey(std::move(message._routingKey)) - {} + Message(const Message &message) = delete; /** * Destructor */ virtual ~Message() = default; - /** - * Assignment operator - * - * @param message the message to copy - * @return same object for chaining - */ - Message &operator=(const Message &message) - { - // call the base assignment - Envelope::operator=(message); - - // move the exchange and routing key - _exchange = message._exchange; - _routingKey = message._routingKey; - - // allow chaining - return *this; - } - - /** - * Move assignment operator - * - * @param message the message to move - * @return same object for chaining - */ - Message &operator=(Message &&message) - { - // call the base assignment - Envelope::operator=(std::move(message)); - - // move the exchange and routing key - _exchange = std::move(message._exchange); - _routingKey = std::move(message._routingKey); - - // allow chaining - return *this; - } - /** * The exchange to which it was originally published * @var string */ const std::string &exchange() const { + // expose member return _exchange; } @@ -208,9 +159,10 @@ public: * The routing key that was originally used * @var string */ - const std::string &routingKey() const + const std::string &routingkey() const { - return _routingKey; + // expose member + return _routingkey; } }; diff --git a/include/metadata.h b/include/metadata.h index fa3ed82..4b1a04e 100644 --- a/include/metadata.h +++ b/include/metadata.h @@ -4,7 +4,7 @@ * With every published message a set of meta data is passed to. This class * holds all that meta data. * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** diff --git a/include/receivedframe.h b/include/receivedframe.h index acc9ae0..7c1e1d5 100644 --- a/include/receivedframe.h +++ b/include/receivedframe.h @@ -8,7 +8,7 @@ * This is a class that is used internally by the AMQP library. As a user * of this library, you normally do not have to instantiate it. * - * @documentation public + * @copyright 2014 - 2017 Copernica BV */ /** diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index 62d563e..5260621 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -1,7 +1,7 @@ /** * Class describing an AMQP basic header frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 305e238..2b980bb 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -3,13 +3,12 @@ * * Implementation for a channel * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ #include "includes.h" #include "basicdeliverframe.h" #include "basicgetokframe.h" #include "basicreturnframe.h" -#include "messageimpl.h" #include "consumedmessage.h" #include "returnedmessage.h" #include "channelopenframe.h" diff --git a/src/consumedmessage.h b/src/consumedmessage.h index 85c451c..01f3bcc 100644 --- a/src/consumedmessage.h +++ b/src/consumedmessage.h @@ -1,7 +1,7 @@ /** * Base class for a message implementation * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** @@ -12,7 +12,7 @@ namespace AMQP { /** * Class definition */ -class ConsumedMessage : public MessageImpl +class ConsumedMessage : public Message { private: /** @@ -40,7 +40,7 @@ public: * @param frame */ ConsumedMessage(const BasicDeliverFrame &frame) : - MessageImpl(frame.exchange(), frame.routingKey()), + Message(frame.exchange(), frame.routingKey()), _consumerTag(frame.consumerTag()), _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered()) {} @@ -49,7 +49,7 @@ public: * @param frame */ ConsumedMessage(const BasicGetOKFrame &frame) : - MessageImpl(frame.exchange(), frame.routingKey()), + Message(frame.exchange(), frame.routingKey()), _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered()) {} @@ -75,7 +75,7 @@ public: void report(const MessageCallback &callback) { // send ourselves to the consumer - if (callback) callback(std::move(*this), _deliveryTag, _redelivered); + if (callback) callback(*this, _deliveryTag, _redelivered); } }; diff --git a/src/deferredconsumer.cpp b/src/deferredconsumer.cpp index 3997da9..221d089 100644 --- a/src/deferredconsumer.cpp +++ b/src/deferredconsumer.cpp @@ -3,7 +3,7 @@ * * Implementation file for the DeferredConsumer class * - * @copyright 2014 - 2016 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ #include "includes.h" @@ -38,10 +38,10 @@ const std::shared_ptr &DeferredConsumer::reportSuccess(const std::stri * @param deliveryTag The delivery tag (for ack()ing) * @param redelivered Is this a redelivered message */ -void DeferredConsumer::announce(Message &&message, uint64_t deliveryTag, bool redelivered) const +void DeferredConsumer::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const { // simply execute the message callback - _messageCallback(std::move(message), deliveryTag, redelivered); + _messageCallback(message, deliveryTag, redelivered); } /** diff --git a/src/deferredconsumerbase.cpp b/src/deferredconsumerbase.cpp index 6c01f49..240842b 100644 --- a/src/deferredconsumerbase.cpp +++ b/src/deferredconsumerbase.cpp @@ -4,7 +4,7 @@ * Base class for the deferred consumer and the * deferred get. * - * @copyright 2016 Copernica B.V. + * @copyright 2016 - 2017 Copernica B.V. */ /** @@ -120,7 +120,7 @@ void DeferredConsumerBase::complete() if (_message) { // announce the message - announce(std::move(*_message), _deliveryTag, _redelivered); + announce(*_message, _deliveryTag, _redelivered); // and destroy it _message.reset(); diff --git a/src/deferredget.cpp b/src/deferredget.cpp index b75e775..dbd838b 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -4,7 +4,7 @@ * Implementation of the DeferredGet call * * @author Emiel Bruijntjes - * @copyright 2014 - 2016 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** @@ -63,7 +63,7 @@ const std::shared_ptr &DeferredGet::reportSuccess() const * @param deliveryTag The delivery tag (for ack()ing) * @param redelivered Is this a redelivered message */ -void DeferredGet::announce(Message &&message, uint64_t deliveryTag, bool redelivered) const +void DeferredGet::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const { // monitor the channel Monitor monitor{ _channel }; diff --git a/src/messageimpl.h b/src/messageimpl.h deleted file mode 100644 index 1a8fb03..0000000 --- a/src/messageimpl.h +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Base class for a message implementation - * - * This is the base class for either the returned message or the consumed - * message. - * - * @copyright 2014 Copernica BV - */ - -/** - * Namespace - */ -namespace AMQP { - -/** - * Class definition - */ -class MessageImpl : public Message -{ -protected: - /** - * Constructor - * @param exchange - * @param routingKey - */ - MessageImpl(const std::string &exchange, const std::string &routingKey) : - Message(exchange, routingKey) - {} - -public: - /** - * Destructor - */ - virtual ~MessageImpl() {} - - /** - * Set the body size - * This field is set when the header is received - * @param uint64_t - */ - void setBodySize(uint64_t size) - { - // safety-check: on 32-bit platforms size_t is obviously also a 32-bit dword - // in which case casting the uint64_t to a size_t could result in truncation - // here we check whether the given size fits inside a size_t - if (std::numeric_limits::max() < size) throw std::runtime_error("message is too big for this system"); - - // store the new size - _bodySize = size; - } - - /** - * Append data - * @param buffer incoming data - * @param size size of the data - * @return bool true if the message is now complete - */ - bool append(const char *buffer, uint64_t size) - { - // is this the only data, and also direct complete? - if (_str.empty() && size >= _bodySize) - { - // we have everything - _body = buffer; - - // done - return true; - } - else - { - // it does not fit yet, do we have to allocate - if (!_body) - { - // allocate memory in the string - _str.reserve(static_cast(_bodySize)); - - // we now use the data buffer inside the string - _body = _str.data(); - } - - // safety-check: if the given size exceeds the given message body size - // we truncate it, this should never happen because it indicates a bug - // in the AMQP server implementation, should we report this? - size = std::min(size, _bodySize - _str.size()); - - // we can not safely append the data to the string, it - // will not exceed the reserved size so it is guaranteed - // not to change the data pointer, we can just leave that - // @todo this is not always necessary; instead, we can refrain from - // allocating this buffer entirely and just insert it into the message - // directly. - _str.append(buffer, static_cast(size)); - - // if the string is filled with the given number of characters we are done now - return _str.size() >= _bodySize; - } - } -}; - -/** - * End of namespace - */ -} - diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index 07f0c95..31406f4 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -3,7 +3,7 @@ * * Implementation of the ReceivedFrame class * - * @documentation private + * @copyright 2014 - 2017 Copernica BV */ #include "includes.h" #include "heartbeatframe.h" @@ -65,7 +65,6 @@ #include "transactioncommitokframe.h" #include "transactionrollbackframe.h" #include "transactionrollbackokframe.h" -#include "messageimpl.h" #include "consumedmessage.h" #include "bodyframe.h" #include "basicheaderframe.h" diff --git a/src/returnedmessage.h b/src/returnedmessage.h index 776d72d..6a471a4 100644 --- a/src/returnedmessage.h +++ b/src/returnedmessage.h @@ -5,7 +5,7 @@ * was published with the immediate or mandatory flag, and could not be * delivered according to those rules. * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2017 Copernica BV */ /** @@ -16,7 +16,7 @@ namespace AMQP { /** * Class definition */ -class ReturnedMessage : public MessageImpl +class ReturnedMessage : public Message { private: /** @@ -38,13 +38,13 @@ public: * @param frame */ ReturnedMessage(const BasicReturnFrame &frame) : - MessageImpl(frame.exchange(), frame.routingKey()), + Message(frame.exchange(), frame.routingKey()), _replyCode(frame.replyCode()), _replyText(frame.replyText()) {} /** * Destructor */ - virtual ~ReturnedMessage() {} + virtual ~ReturnedMessage() = default; }; /**