Received messages are now moved into the callback instead of provided as a const reference

This commit is contained in:
Martijn Otto 2015-04-28 10:58:49 +02:00
parent ad3b95741e
commit b9caf0199d
7 changed files with 205 additions and 61 deletions

View File

@ -21,7 +21,7 @@ using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
using EmptyCallback = std::function<void()>;
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
using MessageCallback = std::function<void(Message &&message, uint64_t deliveryTag, bool redelivered)>;
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using SizeCallback = std::function<void(uint32_t messagecount)>;

View File

@ -24,7 +24,7 @@ private:
ChannelImpl *_channel;
/**
* Callback to execute when a message arrives
* Callback to execute when consumption has started
* @var ConsumeCallback
*/
ConsumeCallback _consumeCallback;
@ -49,12 +49,12 @@ private:
*/
friend class ChannelImpl;
friend class ConsumedMessage;
public:
/**
* Protected constructor that can only be called
* from within the channel implementation
*
*
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code.
@ -62,7 +62,7 @@ public:
* @param channel the channel implementation
* @param failed are we already failed?
*/
DeferredConsumer(ChannelImpl *channel, bool failed = false) :
DeferredConsumer(ChannelImpl *channel, bool failed = false) :
Deferred(failed), _channel(channel) {}
public:
@ -74,7 +74,7 @@ public:
{
// store the callback
_consumeCallback = callback;
// allow chaining
return *this;
}
@ -87,7 +87,7 @@ public:
{
// call base
Deferred::onSuccess(callback);
// allow chaining
return *this;
}
@ -101,7 +101,7 @@ public:
{
// store callback
_messageCallback = callback;
// allow chaining
return *this;
}
@ -115,7 +115,7 @@ public:
{
// store callback
_messageCallback = callback;
// allow chaining
return *this;
}

View File

@ -62,11 +62,106 @@ public:
*/
Envelope(std::string &&body) : MetaData(), _str(std::move(body)), _body(_str.data()), _bodySize(_str.size()) {}
/**
* Copy constructor
*
* @param envelope the envelope to copy
*/
Envelope(const Envelope &envelope) :
_str(envelope._body, envelope._bodySize),
_body(_str.data()),
_bodySize(_str.size())
{}
/**
* Move constructor
*
* @param envelope the envelope to move
*/
Envelope(Envelope &&envelope) :
_str(std::move(envelope._str)),
_body(_str.data()),
_bodySize(_str.size())
{
// if the envelope we moved did not have allocation by string
// we are out of luck, and have to copy it ourselves :(
if (_str.empty())
{
// assign the data from the other envelope
_str.assign(envelope._body, envelope._bodySize);
// and set the correct pointer and size
_body = _str.data();
_bodySize = _str.size();
}
else
{
// we moved the other envelopes string
// which means their body pointer is now
// garbage (it no longer points to a valid
// address), so we need to clear it
envelope._body = nullptr;
envelope._bodySize = 0;
}
}
/**
* Destructor
*/
virtual ~Envelope() {}
/**
* Assignment operator
*
* @param envelope the envelope to copy
* @return same object for chaining
*/
Envelope &operator=(const Envelope &envelope)
{
// copy the data from the envelope
_str.assign(envelope._body, envelope._bodySize);
// set the data pointer and body size
_body = _str.data();
_bodySize = _str.size();
// allow chaining
return *this;
}
/**
* Move assignment operator
*
* @param envelope the envelope to move
* @return same object for chaining
*/
Envelope &operator=(Envelope &&envelope)
{
// was the string in the other envelop empty?
if (envelope._str.empty())
{
// that's a shame, we have to make a full copy
_str.assign(envelope._body, envelope._bodySize);
}
else
{
// not empty, just move it
_str = std::move(envelope._str);
// their string is now garbage so the
// pointer is also invalid
envelope._body = nullptr;
envelope._bodySize = 0;
}
// we now have a valid string, set the body and
_body = _str.data();
_bodySize = _str.size();
// allow chaining
return *this;
}
/**
* Access to the full message data
* @return buffer

View File

@ -14,7 +14,7 @@
/**
* Set up namespace
*/
namespace AMQP {
namespace AMQP {
/**
* Class definition
@ -27,13 +27,13 @@ protected:
* @var string
*/
std::string _exchange;
/**
* The routing key that was originally used
* @var string
*/
std::string _routingKey;
protected:
/**
* The constructor is protected to ensure that endusers can not
@ -44,13 +44,74 @@ protected:
Message(const std::string &exchange, const std::string &routingKey) :
Envelope(nullptr, 0), _exchange(exchange), _routingKey(routingKey)
{}
public:
/**
* Copy constructor
*
* @param message the message to copy
*/
Message(const Message &message) :
Envelope(message),
_exchange(message._exchange),
_routingKey(message._routingKey)
{}
/**
* Move constructor
*
* @param message the message to move
*/
Message(Message &&message) :
Envelope(std::move(message)),
_exchange(std::move(message._exchange)),
_routingKey(std::move(message._routingKey))
{}
/**
* Destructor
*/
virtual ~Message() {}
/**
* Assignment operator
*
* @param message the message to copy
* @return same object for chaining
*/
Message &operator=(const Message &message)
{
// call the base assignment
Envelope::operator=(message);
// move the exchange and routing key
_exchange = message._exchange;
_routingKey = message._routingKey;
// allow chaining
return *this;
}
/**
* Move assignment operator
*
* @param message the message to move
* @return same object for chaining
*/
Message &operator=(Message &&message)
{
// call the base assignment
Envelope::operator=(std::move(message));
// move the exchange and routing key
_exchange = std::move(message._exchange);
_routingKey = std::move(message._routingKey);
// allow chaining
return *this;
}
/**
* The exchange to which it was originally published
* @var string
@ -59,7 +120,7 @@ public:
{
return _exchange;
}
/**
* The routing key that was originally used
* @var string

View File

@ -67,15 +67,15 @@ public:
{
return _consumerTag;
}
/**
* Report to the handler
* @param callback
*/
void report(const MessageCallback &callback) const
void report(const MessageCallback &callback)
{
// send ourselves to the consumer
if (callback) callback(*this, _deliveryTag, _redelivered);
if (callback) callback(std::move(*this), _deliveryTag, _redelivered);
}
};

View File

@ -31,14 +31,14 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messageCoun
// pointer is also captured, which ensures that 'this' is not destructed, all members stay
// accessible, and that the onFinalize() function will only be called after the message
// is reported (onFinalize() is called from the destructor of this DeferredGet object)
_channel->install("", [self, this](const Message &message, uint64_t deliveryTag, bool redelivered) {
_channel->install("", [self, this](Message &&message, uint64_t deliveryTag, bool redelivered) {
// install a monitor to deal with the case that the channel is removed
Monitor monitor(_channel);
// call the callbacks
if (_messageCallback) _messageCallback(message, deliveryTag, redelivered);
if (_messageCallback) _messageCallback(std::move(message), deliveryTag, redelivered);
// we can remove the callback now from the channel
if (monitor.valid()) _channel->uninstall("");
});
@ -46,7 +46,7 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messageCoun
// report the size (note that this is the size _minus_ the message that is retrieved
// (and for which the callback will be called later), so it could be zero)
if (_sizeCallback) _sizeCallback(messageCount);
// return next object
return _next;
}
@ -62,7 +62,7 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
// check if a callback was set
if (_emptyCallback) _emptyCallback();
// return next object
return _next;
}

View File

@ -17,19 +17,6 @@ namespace AMQP {
*/
class MessageImpl : public Message
{
private:
/**
* How many bytes have been received?
* @var uint64_t
*/
uint64_t _received;
/**
* Was the buffer allocated by us?
* @var bool
*/
bool _selfAllocated;
protected:
/**
* Constructor
@ -37,19 +24,14 @@ protected:
* @param routingKey
*/
MessageImpl(const std::string &exchange, const std::string &routingKey) :
Message(exchange, routingKey),
_received(0), _selfAllocated(false)
Message(exchange, routingKey)
{}
public:
/**
* Destructor
*/
virtual ~MessageImpl()
{
// clear up memory if it was self allocated
if (_selfAllocated) delete[] _body;
}
virtual ~MessageImpl() {}
/**
* Set the body size
@ -58,6 +40,12 @@ public:
*/
void setBodySize(uint64_t size)
{
// safety-check: on 32-bit platforms size_t is obviously also a 32-bit dword
// in which case casting the uint64_t to a size_t could result in truncation
// here we check whether the given size fits inside a size_t
if (std::numeric_limits<size_t>::max() < size) throw std::runtime_error("message is too big for this system");
// store the new size
_bodySize = size;
}
@ -70,38 +58,38 @@ public:
bool append(const char *buffer, uint64_t size)
{
// is this the only data, and also direct complete?
if (_received == 0 && size >= _bodySize)
if (_str.empty() && size >= _bodySize)
{
// we have everything
_body = buffer;
_received = _bodySize;
// done
return true;
}
else
{
// we're going to allocated memory, but that should be a size_t, not a uint64_t
size_t memory = static_cast<size_t>(_bodySize);
// prevent truncation
if (memory < _bodySize) throw std::runtime_error("message is too big for this system");
// it does not yet fit, do we have to allocate?
if (!_body) _body = new char[memory];
_selfAllocated = true;
// it does not fit yet, do we have to allocate
if (!_body)
{
// allocate memory in the string
_str.reserve(static_cast<size_t>(_bodySize));
// prevent that size is too big
if (size > _bodySize - _received) size = _bodySize - _received;
// we now use the data buffer inside the string
_body = _str.data();
}
// append data
memcpy(static_cast<void*>(const_cast<char*>(_body) + _received), buffer, static_cast<size_t>(size));
// safety-check: if the given size exceeds the given message body size
// we truncate it, this should never happen because it indicates a bug
// in the AMQP server implementation, should we report this?
size = std::min(size, _bodySize - _str.size());
// we have more data now
_received += size;
// we can not safely append the data to the string, it
// will not exceed the reserved size so it is guaranteed
// not to change the data pointer, we can just leave that
_str.append(buffer, static_cast<size_t>(size));
// done
return _received >= _bodySize;
// if the string is filled with the given number of characters we are done now
return _str.size() >= _bodySize;
}
}
};