the AMQP::Message and AMQP::Envelope objects can now longer be copied or moved, and the signatures for creating an envelope and publishing a message have been made much more strict. This could be a API breaking change, to the version number should be upped

This commit is contained in:
Emiel Bruijntjes 2017-03-08 13:32:51 +01:00
parent c6608cee0a
commit 00b81949d3
20 changed files with 94 additions and 350 deletions

View File

@ -3,7 +3,7 @@
*
* Class storing deferred callbacks of different type.
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -41,7 +41,7 @@ using EmptyCallback = std::function<void()>;
using BeginCallback = std::function<void()>;
using HeaderCallback = std::function<void(const MetaData &metaData)>;
using DataCallback = std::function<void(const char *data, size_t size)>;
using MessageCallback = std::function<void(Message &&message, uint64_t deliveryTag, bool redelivered)>;
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
using CompleteCallback = std::function<void(uint64_t deliveryTag, bool redelivered)>;
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;

View File

@ -1,7 +1,7 @@
/**
* Class describing a (mid-level) AMQP channel implementation
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -344,10 +344,7 @@ public:
* @param size size of the message
*/
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); }
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message)); }
bool publish(const std::string &exchange, const std::string &routingKey, std::string &&message) { return _implementation->publish(exchange, routingKey, Envelope(std::move(message))); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); }
/**
* Set the Quality of Service (QOS) for this channel

View File

@ -5,7 +5,7 @@
* that has a private constructor so that it can not be used from outside
* the AMQP library
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**

View File

@ -3,6 +3,7 @@
*
* List of all declared classes
*
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -31,7 +32,6 @@ class ConnectionImpl;
class Exchange;
class Frame;
class Login;
class MessageImpl;
class Monitor;
class OutBuffer;
class ReceivedFrame;

View File

@ -3,7 +3,7 @@
*
* Deferred callback for consumers
*
* @copyright 2014 - 2016 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -46,7 +46,7 @@ private:
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
virtual void announce(Message &&message, uint64_t deliveryTag, bool redelivered) const override;
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override;
/**
* The channel implementation may call our

View File

@ -4,7 +4,7 @@
* Base class for the deferred consumer and the
* deferred get.
*
* @copyright 2016 Copernica B.V.
* @copyright 2016 - 2017 Copernica B.V.
*/
/**
@ -85,7 +85,7 @@ private:
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
virtual void announce(Message &&message, uint64_t deliveryTag, bool redelivered) const = 0;
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0;
/**
* Frames may be processed

View File

@ -2,7 +2,7 @@
* DeferredGet.h
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2014 - 2016 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -62,7 +62,7 @@ private:
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
virtual void announce(Message &&message, uint64_t deliveryTag, bool redelivered) const override;
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override;
/**
* The channel implementation may call our

View File

@ -4,7 +4,7 @@
* When you send or receive a message to the rabbitMQ server, it is encapsulated
* in an envelope that contains additional meta information as well.
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -29,23 +29,22 @@ class Envelope : public MetaData
{
protected:
/**
* The body (only used when string object was passed to constructor
* @var std::string
*/
std::string _str;
/**
* Pointer to the body data (the memory buffer is not managed by the AMQP
* library!)
* Pointer to the body data (the memory is not managed by the AMQP library!)
* @var const char *
*/
const char *_body;
char *_body;
/**
* Size of the data
* @var uint64_t
*/
uint64_t _bodySize;
/**
* Was the data allocated by this object?
* @var bool
*/
bool _allocated = false;
public:
/**
@ -57,120 +56,22 @@ public:
* @param body
* @param size
*/
Envelope(const char *body, uint64_t size) : MetaData(), _body(body), _bodySize(size) {}
Envelope(const char *body, uint64_t size) : MetaData(), _body((char *)body), _bodySize(size) {}
/**
* Constructor based on a string
* @param body
*/
Envelope(const std::string &body) : MetaData(), _str(body), _body(_str.data()), _bodySize(_str.size()) {}
/**
* Constructor based on a string
* @param body
*/
Envelope(std::string &&body) : MetaData(), _str(std::move(body)), _body(_str.data()), _bodySize(_str.size()) {}
/**
* Copy constructor
* Disabled copy constructor
*
* @param envelope the envelope to copy
*/
Envelope(const Envelope &envelope) :
MetaData(envelope),
_str(envelope._body, envelope._bodySize),
_body(_str.data()),
_bodySize(_str.size())
{}
/**
* Move constructor
*
* @param envelope the envelope to move
*/
Envelope(Envelope &&envelope) :
MetaData(std::move(envelope)),
_str(std::move(envelope._str)),
_body(_str.data()),
_bodySize(_str.size())
{
// if the envelope we moved did not have allocation by string
// we are out of luck, and have to copy it ourselves :(
if (_str.empty())
{
// assign the data from the other envelope
_str.assign(envelope._body, envelope._bodySize);
// and set the correct pointer and size
_body = _str.data();
_bodySize = _str.size();
}
else
{
// we moved the other envelopes string
// which means their body pointer is now
// garbage (it no longer points to a valid
// address), so we need to clear it
envelope._body = nullptr;
envelope._bodySize = 0;
}
}
Envelope(const Envelope &envelope) = delete;
/**
* Destructor
*/
virtual ~Envelope() {}
/**
* Assignment operator
*
* @param envelope the envelope to copy
* @return same object for chaining
*/
Envelope &operator=(const Envelope &envelope)
virtual ~Envelope()
{
// copy the data from the envelope
_str.assign(envelope._body, envelope._bodySize);
// set the data pointer and body size
_body = _str.data();
_bodySize = _str.size();
// allow chaining
return *this;
}
/**
* Move assignment operator
*
* @param envelope the envelope to move
* @return same object for chaining
*/
Envelope &operator=(Envelope &&envelope)
{
// was the string in the other envelop empty?
if (envelope._str.empty())
{
// that's a shame, we have to make a full copy
_str.assign(envelope._body, envelope._bodySize);
}
else
{
// not empty, just move it
_str = std::move(envelope._str);
// their string is now garbage so the
// pointer is also invalid
envelope._body = nullptr;
envelope._bodySize = 0;
}
// we now have a valid string, set the body and
_body = _str.data();
_bodySize = _str.size();
// allow chaining
return *this;
// deallocate the data
if (_allocated) free(_body);
}
/**

View File

@ -7,7 +7,7 @@
* Message objects can not be constructed by end users, they are only constructed
* by the AMQP library, and passed to user callbacks.
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -48,8 +48,15 @@ protected:
* The routing key that was originally used
* @var string
*/
std::string _routingKey;
std::string _routingkey;
/**
* Number of bytes already filled
* @var size_t
*/
size_t _filled = 0;
/**
* We are an open book to the consumer handler
*/
@ -79,43 +86,41 @@ protected:
*/
bool append(const char *buffer, uint64_t size)
{
// is this the only data, and also direct complete?
if (_str.empty() && size >= _bodySize)
// is the body already allocated?
if (_allocated)
{
// we have everything
_body = buffer;
// done
return true;
// prevent overflow
size = std::min(size, _bodySize - _filled);
// append more data
memcpy(_body + _filled, buffer, size);
// update filled data
_filled += size;
}
else if (size >= _bodySize)
{
// we do not have to combine multiple frames, so we can store
// the buffer pointer in the message
_body = (char *)buffer;
}
else
{
// it does not fit yet, do we have to allocate
if (!_body)
{
// allocate memory in the string
_str.reserve(static_cast<size_t>(_bodySize));
// we now use the data buffer inside the string
_body = _str.data();
}
// safety-check: if the given size exceeds the given message body size
// we truncate it, this should never happen because it indicates a bug
// in the AMQP server implementation, should we report this?
size = std::min(size, _bodySize - _str.size());
// we can not safely append the data to the string, it
// will not exceed the reserved size so it is guaranteed
// not to change the data pointer, we can just leave that
// @todo this is not always necessary; instead, we can refrain from
// allocating this buffer entirely and just insert it into the message
// directly.
_str.append(buffer, static_cast<size_t>(size));
// if the string is filled with the given number of characters we are done now
return _str.size() >= _bodySize;
// allocate the buffer
_body = (char *)malloc(_bodySize);
// remember that the buffer was allocated, so that the destructor can get rid of it
_allocated = true;
// append more data
memcpy(_body, buffer, std::min(size, _bodySize));
// update filled data
_filled = std::min(size, _bodySize);
}
// check if we're done
return _filled >= _bodySize;
}
public:
@ -125,82 +130,28 @@ public:
* @param exchange
* @param routingKey
*/
Message(const std::string &exchange, const std::string &routingKey) :
Envelope(nullptr, 0), _exchange(exchange), _routingKey(routingKey)
Message(std::string exchange, std::string routingkey) :
Envelope(nullptr, 0), _exchange(std::move(exchange)), _routingkey(std::move(routingkey))
{}
/**
* Copy constructor
*
* Disabled copy constructor
* @param message the message to copy
*/
Message(const Message &message) :
Envelope(message),
_exchange(message._exchange),
_routingKey(message._routingKey)
{}
/**
* Move constructor
*
* @param message the message to move
*/
Message(Message &&message) :
Envelope(std::move(message)),
_exchange(std::move(message._exchange)),
_routingKey(std::move(message._routingKey))
{}
Message(const Message &message) = delete;
/**
* Destructor
*/
virtual ~Message() = default;
/**
* Assignment operator
*
* @param message the message to copy
* @return same object for chaining
*/
Message &operator=(const Message &message)
{
// call the base assignment
Envelope::operator=(message);
// move the exchange and routing key
_exchange = message._exchange;
_routingKey = message._routingKey;
// allow chaining
return *this;
}
/**
* Move assignment operator
*
* @param message the message to move
* @return same object for chaining
*/
Message &operator=(Message &&message)
{
// call the base assignment
Envelope::operator=(std::move(message));
// move the exchange and routing key
_exchange = std::move(message._exchange);
_routingKey = std::move(message._routingKey);
// allow chaining
return *this;
}
/**
* The exchange to which it was originally published
* @var string
*/
const std::string &exchange() const
{
// expose member
return _exchange;
}
@ -208,9 +159,10 @@ public:
* The routing key that was originally used
* @var string
*/
const std::string &routingKey() const
const std::string &routingkey() const
{
return _routingKey;
// expose member
return _routingkey;
}
};

View File

@ -4,7 +4,7 @@
* With every published message a set of meta data is passed to. This class
* holds all that meta data.
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**

View File

@ -8,7 +8,7 @@
* This is a class that is used internally by the AMQP library. As a user
* of this library, you normally do not have to instantiate it.
*
* @documentation public
* @copyright 2014 - 2017 Copernica BV
*/
/**

View File

@ -1,7 +1,7 @@
/**
* Class describing an AMQP basic header frame
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**

View File

@ -3,13 +3,12 @@
*
* Implementation for a channel
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
#include "includes.h"
#include "basicdeliverframe.h"
#include "basicgetokframe.h"
#include "basicreturnframe.h"
#include "messageimpl.h"
#include "consumedmessage.h"
#include "returnedmessage.h"
#include "channelopenframe.h"

View File

@ -1,7 +1,7 @@
/**
* Base class for a message implementation
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -12,7 +12,7 @@ namespace AMQP {
/**
* Class definition
*/
class ConsumedMessage : public MessageImpl
class ConsumedMessage : public Message
{
private:
/**
@ -40,7 +40,7 @@ public:
* @param frame
*/
ConsumedMessage(const BasicDeliverFrame &frame) :
MessageImpl(frame.exchange(), frame.routingKey()),
Message(frame.exchange(), frame.routingKey()),
_consumerTag(frame.consumerTag()), _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered())
{}
@ -49,7 +49,7 @@ public:
* @param frame
*/
ConsumedMessage(const BasicGetOKFrame &frame) :
MessageImpl(frame.exchange(), frame.routingKey()),
Message(frame.exchange(), frame.routingKey()),
_deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered())
{}
@ -75,7 +75,7 @@ public:
void report(const MessageCallback &callback)
{
// send ourselves to the consumer
if (callback) callback(std::move(*this), _deliveryTag, _redelivered);
if (callback) callback(*this, _deliveryTag, _redelivered);
}
};

View File

@ -3,7 +3,7 @@
*
* Implementation file for the DeferredConsumer class
*
* @copyright 2014 - 2016 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
#include "includes.h"
@ -38,10 +38,10 @@ const std::shared_ptr<Deferred> &DeferredConsumer::reportSuccess(const std::stri
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
void DeferredConsumer::announce(Message &&message, uint64_t deliveryTag, bool redelivered) const
void DeferredConsumer::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const
{
// simply execute the message callback
_messageCallback(std::move(message), deliveryTag, redelivered);
_messageCallback(message, deliveryTag, redelivered);
}
/**

View File

@ -4,7 +4,7 @@
* Base class for the deferred consumer and the
* deferred get.
*
* @copyright 2016 Copernica B.V.
* @copyright 2016 - 2017 Copernica B.V.
*/
/**
@ -120,7 +120,7 @@ void DeferredConsumerBase::complete()
if (_message)
{
// announce the message
announce(std::move(*_message), _deliveryTag, _redelivered);
announce(*_message, _deliveryTag, _redelivered);
// and destroy it
_message.reset();

View File

@ -4,7 +4,7 @@
* Implementation of the DeferredGet call
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2014 - 2016 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -63,7 +63,7 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
void DeferredGet::announce(Message &&message, uint64_t deliveryTag, bool redelivered) const
void DeferredGet::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const
{
// monitor the channel
Monitor monitor{ _channel };

View File

@ -1,104 +0,0 @@
/**
* Base class for a message implementation
*
* This is the base class for either the returned message or the consumed
* message.
*
* @copyright 2014 Copernica BV
*/
/**
* Namespace
*/
namespace AMQP {
/**
* Class definition
*/
class MessageImpl : public Message
{
protected:
/**
* Constructor
* @param exchange
* @param routingKey
*/
MessageImpl(const std::string &exchange, const std::string &routingKey) :
Message(exchange, routingKey)
{}
public:
/**
* Destructor
*/
virtual ~MessageImpl() {}
/**
* Set the body size
* This field is set when the header is received
* @param uint64_t
*/
void setBodySize(uint64_t size)
{
// safety-check: on 32-bit platforms size_t is obviously also a 32-bit dword
// in which case casting the uint64_t to a size_t could result in truncation
// here we check whether the given size fits inside a size_t
if (std::numeric_limits<size_t>::max() < size) throw std::runtime_error("message is too big for this system");
// store the new size
_bodySize = size;
}
/**
* Append data
* @param buffer incoming data
* @param size size of the data
* @return bool true if the message is now complete
*/
bool append(const char *buffer, uint64_t size)
{
// is this the only data, and also direct complete?
if (_str.empty() && size >= _bodySize)
{
// we have everything
_body = buffer;
// done
return true;
}
else
{
// it does not fit yet, do we have to allocate
if (!_body)
{
// allocate memory in the string
_str.reserve(static_cast<size_t>(_bodySize));
// we now use the data buffer inside the string
_body = _str.data();
}
// safety-check: if the given size exceeds the given message body size
// we truncate it, this should never happen because it indicates a bug
// in the AMQP server implementation, should we report this?
size = std::min(size, _bodySize - _str.size());
// we can not safely append the data to the string, it
// will not exceed the reserved size so it is guaranteed
// not to change the data pointer, we can just leave that
// @todo this is not always necessary; instead, we can refrain from
// allocating this buffer entirely and just insert it into the message
// directly.
_str.append(buffer, static_cast<size_t>(size));
// if the string is filled with the given number of characters we are done now
return _str.size() >= _bodySize;
}
}
};
/**
* End of namespace
*/
}

View File

@ -3,7 +3,7 @@
*
* Implementation of the ReceivedFrame class
*
* @documentation private
* @copyright 2014 - 2017 Copernica BV
*/
#include "includes.h"
#include "heartbeatframe.h"
@ -65,7 +65,6 @@
#include "transactioncommitokframe.h"
#include "transactionrollbackframe.h"
#include "transactionrollbackokframe.h"
#include "messageimpl.h"
#include "consumedmessage.h"
#include "bodyframe.h"
#include "basicheaderframe.h"

View File

@ -5,7 +5,7 @@
* was published with the immediate or mandatory flag, and could not be
* delivered according to those rules.
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2017 Copernica BV
*/
/**
@ -16,7 +16,7 @@ namespace AMQP {
/**
* Class definition
*/
class ReturnedMessage : public MessageImpl
class ReturnedMessage : public Message
{
private:
/**
@ -38,13 +38,13 @@ public:
* @param frame
*/
ReturnedMessage(const BasicReturnFrame &frame) :
MessageImpl(frame.exchange(), frame.routingKey()),
Message(frame.exchange(), frame.routingKey()),
_replyCode(frame.replyCode()), _replyText(frame.replyText()) {}
/**
* Destructor
*/
virtual ~ReturnedMessage() {}
virtual ~ReturnedMessage() = default;
};
/**