From b9caf0199d1ec7c79178e4d7d4973ae441424eac Mon Sep 17 00:00:00 2001 From: Martijn Otto Date: Tue, 28 Apr 2015 10:58:49 +0200 Subject: [PATCH] Received messages are now moved into the callback instead of provided as a const reference --- include/callbacks.h | 2 +- include/deferredconsumer.h | 16 +++---- include/envelope.h | 95 ++++++++++++++++++++++++++++++++++++++ include/message.h | 71 ++++++++++++++++++++++++++-- src/consumedmessage.h | 6 +-- src/deferredget.cpp | 10 ++-- src/messageimpl.h | 66 +++++++++++--------------- 7 files changed, 205 insertions(+), 61 deletions(-) diff --git a/include/callbacks.h b/include/callbacks.h index 608f5f0..e470515 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -21,7 +21,7 @@ using SuccessCallback = std::function; using ErrorCallback = std::function; using FinalizeCallback = std::function; using EmptyCallback = std::function; -using MessageCallback = std::function; +using MessageCallback = std::function; using QueueCallback = std::function; using DeleteCallback = std::function; using SizeCallback = std::function; diff --git a/include/deferredconsumer.h b/include/deferredconsumer.h index 659120c..d867018 100644 --- a/include/deferredconsumer.h +++ b/include/deferredconsumer.h @@ -24,7 +24,7 @@ private: ChannelImpl *_channel; /** - * Callback to execute when a message arrives + * Callback to execute when consumption has started * @var ConsumeCallback */ ConsumeCallback _consumeCallback; @@ -49,12 +49,12 @@ private: */ friend class ChannelImpl; friend class ConsumedMessage; - + public: /** * Protected constructor that can only be called * from within the channel implementation - * + * * Note: this constructor _should_ be protected, but because make_shared * will then not work, we have decided to make it public after all, * because the work-around would result in not-so-easy-to-read code. @@ -62,7 +62,7 @@ public: * @param channel the channel implementation * @param failed are we already failed? */ - DeferredConsumer(ChannelImpl *channel, bool failed = false) : + DeferredConsumer(ChannelImpl *channel, bool failed = false) : Deferred(failed), _channel(channel) {} public: @@ -74,7 +74,7 @@ public: { // store the callback _consumeCallback = callback; - + // allow chaining return *this; } @@ -87,7 +87,7 @@ public: { // call base Deferred::onSuccess(callback); - + // allow chaining return *this; } @@ -101,7 +101,7 @@ public: { // store callback _messageCallback = callback; - + // allow chaining return *this; } @@ -115,7 +115,7 @@ public: { // store callback _messageCallback = callback; - + // allow chaining return *this; } diff --git a/include/envelope.h b/include/envelope.h index 48aef96..ae9780f 100644 --- a/include/envelope.h +++ b/include/envelope.h @@ -62,11 +62,106 @@ public: */ Envelope(std::string &&body) : MetaData(), _str(std::move(body)), _body(_str.data()), _bodySize(_str.size()) {} + /** + * Copy constructor + * + * @param envelope the envelope to copy + */ + Envelope(const Envelope &envelope) : + _str(envelope._body, envelope._bodySize), + _body(_str.data()), + _bodySize(_str.size()) + {} + + /** + * Move constructor + * + * @param envelope the envelope to move + */ + Envelope(Envelope &&envelope) : + _str(std::move(envelope._str)), + _body(_str.data()), + _bodySize(_str.size()) + { + // if the envelope we moved did not have allocation by string + // we are out of luck, and have to copy it ourselves :( + if (_str.empty()) + { + // assign the data from the other envelope + _str.assign(envelope._body, envelope._bodySize); + + // and set the correct pointer and size + _body = _str.data(); + _bodySize = _str.size(); + } + else + { + // we moved the other envelopes string + // which means their body pointer is now + // garbage (it no longer points to a valid + // address), so we need to clear it + envelope._body = nullptr; + envelope._bodySize = 0; + } + } + /** * Destructor */ virtual ~Envelope() {} + /** + * Assignment operator + * + * @param envelope the envelope to copy + * @return same object for chaining + */ + Envelope &operator=(const Envelope &envelope) + { + // copy the data from the envelope + _str.assign(envelope._body, envelope._bodySize); + + // set the data pointer and body size + _body = _str.data(); + _bodySize = _str.size(); + + // allow chaining + return *this; + } + + /** + * Move assignment operator + * + * @param envelope the envelope to move + * @return same object for chaining + */ + Envelope &operator=(Envelope &&envelope) + { + // was the string in the other envelop empty? + if (envelope._str.empty()) + { + // that's a shame, we have to make a full copy + _str.assign(envelope._body, envelope._bodySize); + } + else + { + // not empty, just move it + _str = std::move(envelope._str); + + // their string is now garbage so the + // pointer is also invalid + envelope._body = nullptr; + envelope._bodySize = 0; + } + + // we now have a valid string, set the body and + _body = _str.data(); + _bodySize = _str.size(); + + // allow chaining + return *this; + } + /** * Access to the full message data * @return buffer diff --git a/include/message.h b/include/message.h index 92157ff..0ac7f38 100644 --- a/include/message.h +++ b/include/message.h @@ -14,7 +14,7 @@ /** * Set up namespace */ -namespace AMQP { +namespace AMQP { /** * Class definition @@ -27,13 +27,13 @@ protected: * @var string */ std::string _exchange; - + /** * The routing key that was originally used * @var string */ std::string _routingKey; - + protected: /** * The constructor is protected to ensure that endusers can not @@ -44,13 +44,74 @@ protected: Message(const std::string &exchange, const std::string &routingKey) : Envelope(nullptr, 0), _exchange(exchange), _routingKey(routingKey) {} - + public: + + /** + * Copy constructor + * + * @param message the message to copy + */ + Message(const Message &message) : + Envelope(message), + _exchange(message._exchange), + _routingKey(message._routingKey) + {} + + /** + * Move constructor + * + * @param message the message to move + */ + Message(Message &&message) : + Envelope(std::move(message)), + _exchange(std::move(message._exchange)), + _routingKey(std::move(message._routingKey)) + {} + /** * Destructor */ virtual ~Message() {} + /** + * Assignment operator + * + * @param message the message to copy + * @return same object for chaining + */ + Message &operator=(const Message &message) + { + // call the base assignment + Envelope::operator=(message); + + // move the exchange and routing key + _exchange = message._exchange; + _routingKey = message._routingKey; + + // allow chaining + return *this; + } + + /** + * Move assignment operator + * + * @param message the message to move + * @return same object for chaining + */ + Message &operator=(Message &&message) + { + // call the base assignment + Envelope::operator=(std::move(message)); + + // move the exchange and routing key + _exchange = std::move(message._exchange); + _routingKey = std::move(message._routingKey); + + // allow chaining + return *this; + } + /** * The exchange to which it was originally published * @var string @@ -59,7 +120,7 @@ public: { return _exchange; } - + /** * The routing key that was originally used * @var string diff --git a/src/consumedmessage.h b/src/consumedmessage.h index 09c9c0e..85c451c 100644 --- a/src/consumedmessage.h +++ b/src/consumedmessage.h @@ -67,15 +67,15 @@ public: { return _consumerTag; } - + /** * Report to the handler * @param callback */ - void report(const MessageCallback &callback) const + void report(const MessageCallback &callback) { // send ourselves to the consumer - if (callback) callback(*this, _deliveryTag, _redelivered); + if (callback) callback(std::move(*this), _deliveryTag, _redelivered); } }; diff --git a/src/deferredget.cpp b/src/deferredget.cpp index 9a93f6b..442961a 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -31,14 +31,14 @@ const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messageCoun // pointer is also captured, which ensures that 'this' is not destructed, all members stay // accessible, and that the onFinalize() function will only be called after the message // is reported (onFinalize() is called from the destructor of this DeferredGet object) - _channel->install("", [self, this](const Message &message, uint64_t deliveryTag, bool redelivered) { + _channel->install("", [self, this](Message &&message, uint64_t deliveryTag, bool redelivered) { // install a monitor to deal with the case that the channel is removed Monitor monitor(_channel); // call the callbacks - if (_messageCallback) _messageCallback(message, deliveryTag, redelivered); - + if (_messageCallback) _messageCallback(std::move(message), deliveryTag, redelivered); + // we can remove the callback now from the channel if (monitor.valid()) _channel->uninstall(""); }); @@ -46,7 +46,7 @@ const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messageCoun // report the size (note that this is the size _minus_ the message that is retrieved // (and for which the callback will be called later), so it could be zero) if (_sizeCallback) _sizeCallback(messageCount); - + // return next object return _next; } @@ -62,7 +62,7 @@ const std::shared_ptr &DeferredGet::reportSuccess() const // check if a callback was set if (_emptyCallback) _emptyCallback(); - + // return next object return _next; } diff --git a/src/messageimpl.h b/src/messageimpl.h index b4f1708..53bbced 100644 --- a/src/messageimpl.h +++ b/src/messageimpl.h @@ -17,19 +17,6 @@ namespace AMQP { */ class MessageImpl : public Message { -private: - /** - * How many bytes have been received? - * @var uint64_t - */ - uint64_t _received; - - /** - * Was the buffer allocated by us? - * @var bool - */ - bool _selfAllocated; - protected: /** * Constructor @@ -37,19 +24,14 @@ protected: * @param routingKey */ MessageImpl(const std::string &exchange, const std::string &routingKey) : - Message(exchange, routingKey), - _received(0), _selfAllocated(false) + Message(exchange, routingKey) {} public: /** * Destructor */ - virtual ~MessageImpl() - { - // clear up memory if it was self allocated - if (_selfAllocated) delete[] _body; - } + virtual ~MessageImpl() {} /** * Set the body size @@ -58,6 +40,12 @@ public: */ void setBodySize(uint64_t size) { + // safety-check: on 32-bit platforms size_t is obviously also a 32-bit dword + // in which case casting the uint64_t to a size_t could result in truncation + // here we check whether the given size fits inside a size_t + if (std::numeric_limits::max() < size) throw std::runtime_error("message is too big for this system"); + + // store the new size _bodySize = size; } @@ -70,38 +58,38 @@ public: bool append(const char *buffer, uint64_t size) { // is this the only data, and also direct complete? - if (_received == 0 && size >= _bodySize) + if (_str.empty() && size >= _bodySize) { // we have everything _body = buffer; - _received = _bodySize; // done return true; } else { - // we're going to allocated memory, but that should be a size_t, not a uint64_t - size_t memory = static_cast(_bodySize); - - // prevent truncation - if (memory < _bodySize) throw std::runtime_error("message is too big for this system"); - - // it does not yet fit, do we have to allocate? - if (!_body) _body = new char[memory]; - _selfAllocated = true; + // it does not fit yet, do we have to allocate + if (!_body) + { + // allocate memory in the string + _str.reserve(static_cast(_bodySize)); - // prevent that size is too big - if (size > _bodySize - _received) size = _bodySize - _received; + // we now use the data buffer inside the string + _body = _str.data(); + } - // append data - memcpy(static_cast(const_cast(_body) + _received), buffer, static_cast(size)); + // safety-check: if the given size exceeds the given message body size + // we truncate it, this should never happen because it indicates a bug + // in the AMQP server implementation, should we report this? + size = std::min(size, _bodySize - _str.size()); - // we have more data now - _received += size; + // we can not safely append the data to the string, it + // will not exceed the reserved size so it is guaranteed + // not to change the data pointer, we can just leave that + _str.append(buffer, static_cast(size)); - // done - return _received >= _bodySize; + // if the string is filled with the given number of characters we are done now + return _str.size() >= _bodySize; } } };