diff --git a/.clang_complete b/.clang_complete new file mode 100644 index 0000000..82645da --- /dev/null +++ b/.clang_complete @@ -0,0 +1,2 @@ +-std=c++11 +-Wno-pragma-once-outside-header diff --git a/include/array.h b/include/array.h index 33bc973..a86f1be 100644 --- a/include/array.h +++ b/include/array.h @@ -9,6 +9,14 @@ */ #pragma once +/** + * Dependencies + */ +#include "field.h" +#include "fieldproxy.h" +#include +#include + /** * Set up namespace */ @@ -222,6 +230,18 @@ public: } }; +/** + * Custom output stream operator + * @param stream + * @param field + * @return ostream + */ +inline std::ostream &operator<<(std::ostream &stream, const ArrayFieldProxy &field) +{ + // get underlying field, and output that + return stream << field.get(); +} + /** * end namespace */ diff --git a/include/booleanset.h b/include/booleanset.h index ab2b070..5aad1cc 100644 --- a/include/booleanset.h +++ b/include/booleanset.h @@ -13,11 +13,19 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include "field.h" +#include "outbuffer.h" +#include "receivedframe.h" + /** * Set up namespace */ namespace AMQP { - + /** * Class definition */ @@ -63,7 +71,7 @@ public: { _byte = frame.nextUint8(); } - + /** * Copy constructor * @param that @@ -95,10 +103,10 @@ public: { // prefix stream << "booleanset("; - + // the members for (int i=0; i<8; i++) stream << (i == 0 ? "" : ",") << (get(i) ? 1 : 0); - + // postfix stream << ")"; } diff --git a/include/callbacks.h b/include/callbacks.h index 7b32c20..46179b4 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -11,14 +11,26 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include + /** * Set up namespace */ namespace AMQP { +/** + * Forward declarations + */ +class Message; +class MetaData; + /** * All the callbacks that are supported - * + * * When someone registers a callback function for certain events, it should * match one of the following signatures. */ @@ -26,7 +38,11 @@ using SuccessCallback = std::function; using ErrorCallback = std::function; using FinalizeCallback = std::function; using EmptyCallback = std::function; +using BeginCallback = std::function; +using HeaderCallback = std::function; +using DataCallback = std::function; using MessageCallback = std::function; +using CompleteCallback = std::function; using QueueCallback = std::function; using DeleteCallback = std::function; using SizeCallback = std::function; diff --git a/include/channelimpl.h b/include/channelimpl.h index 675d312..ec1f365 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -13,6 +13,19 @@ */ #pragma once +/** + * Dependencies + */ +#include "exchangetype.h" +#include "watchable.h" +#include "callbacks.h" +#include "outbuffer.h" +#include "deferred.h" +#include "monitor.h" +#include +#include +#include + /** * Set up namespace */ @@ -21,7 +34,20 @@ namespace AMQP { /** * Forward declarations */ +class DeferredConsumerBase; +class BasicDeliverFrame; +class DeferredConsumer; +class BasicGetOKFrame; class ConsumedMessage; +class ConnectionImpl; +class DeferredDelete; +class DeferredCancel; +class DeferredQueue; +class DeferredGet; +class Connection; +class Envelope; +class Table; +class Frame; /** * Class definition @@ -48,10 +74,10 @@ private: ErrorCallback _errorCallback; /** - * Callbacks for all consumers that are active - * @var std::map + * Handlers for all consumers that are active + * @var std::map */ - std::map _consumers; + std::map> _consumers; /** * Pointer to the oldest deferred result (the first one that is going @@ -102,10 +128,10 @@ private: bool _synchronous = false; /** - * The message that is now being received - * @var ConsumedMessage + * The current consumer receiving a message + * @var std::shared_ptr */ - ConsumedMessage *_message = nullptr; + std::shared_ptr _consumer; /** * Attach the connection @@ -135,7 +161,7 @@ protected: * a friend. By doing this we ensure that nobody can instantiate this * object, and that it can thus only be used inside the library. */ - ChannelImpl() {} + ChannelImpl(); public: /** @@ -633,17 +659,15 @@ public: void reportError(const char *message, bool notifyhandler = true); /** - * Install a consumer callback + * Install a consumer + * * @param consumertag The consumer tag - * @param callback The callback to be called + * @param consumer The consumer handler */ - void install(const std::string &consumertag, const MessageCallback &callback) + void install(std::string consumertag, const std::shared_ptr &consumer) { - // install the callback if it is assigned - if (callback) _consumers[consumertag] = callback; - - // otherwise we erase the previously set callback - else _consumers.erase(consumertag); + // install the consumer handler + _consumers[consumertag] = consumer; } /** @@ -657,26 +681,23 @@ public: } /** - * Report that a message was received + * Process incoming delivery + * + * @param frame The frame to process */ - void reportMessage(); + void process(BasicDeliverFrame &frame); /** - * Create an incoming message - * @param frame - * @return ConsumedMessage + * Retrieve the current consumer handler + * + * @return The handler responsible for the current message */ - ConsumedMessage *message(const BasicDeliverFrame &frame); - ConsumedMessage *message(const BasicGetOKFrame &frame); + DeferredConsumerBase *consumer(); /** - * Retrieve the current incoming message - * @return ConsumedMessage + * Mark the current consumer as done */ - ConsumedMessage *message() - { - return _message; - } + void complete(); /** * The channel class is its friend, thus can it instantiate this object diff --git a/include/connectionhandler.h b/include/connectionhandler.h index 4f81740..d4ccfa4 100644 --- a/include/connectionhandler.h +++ b/include/connectionhandler.h @@ -14,11 +14,22 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include + /** * Set up namespace */ namespace AMQP { +/** + * Forward declarations + */ +class Connection; + /** * Class definition */ diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 7366264..89bc538 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -13,11 +13,31 @@ */ #pragma once +/** + * Dependencies + */ +#include "watchable.h" +#include "connectionhandler.h" +#include "channelimpl.h" +#include "outbuffer.h" +#include "monitor.h" +#include "login.h" +#include +#include +#include + /** * Set up namespace */ namespace AMQP { +/** + * Forward declarations + */ +class Connection; +class Buffer; +class Frame; + /** * Class definition */ @@ -78,7 +98,7 @@ protected: * @var uint32_t */ uint32_t _maxFrame = 4096; - + /** * Number of expected bytes that will hold the next incoming frame * We start with seven because that is the header of a frame @@ -245,7 +265,7 @@ public: // 8 bytes for header and end-of-frame byte return _maxFrame - 8; } - + /** * The number of bytes that can best be passed to the next call to the parse() method * @return uint32_t @@ -334,7 +354,7 @@ public: { // set connection state to closed _state = state_closed; - + // monitor because every callback could invalidate the connection Monitor monitor(this); diff --git a/include/decimalfield.h b/include/decimalfield.h index 7dddd6c..aa1f842 100644 --- a/include/decimalfield.h +++ b/include/decimalfield.h @@ -1,6 +1,6 @@ /** * Decimal field type for AMQP - * + * * @copyright 2014, 2015 Copernica BV */ @@ -9,6 +9,15 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include +#include "field.h" +#include "outbuffer.h" +#include "receivedframe.h" + /** * Set up namespace */ @@ -44,7 +53,7 @@ private: * The number without the decimals */ uint32_t _number; - + protected: /** * Write encoded payload to the given buffer. @@ -67,7 +76,7 @@ public: _places(places), _number(number) {} - + /** * Construct based on incoming data * @param frame @@ -77,7 +86,7 @@ public: _places = frame.nextUint8(); _number = frame.nextUint32(); } - + /** * Destructor */ @@ -161,7 +170,7 @@ public: * Check for inequality between this and another DecimalField * * @param value value to be checked for inequality - * @return boolean whether values are inequal + * @return boolean whether values are inequal */ bool operator!=(const DecimalField& value) const { @@ -199,7 +208,7 @@ public: /** * Return the DecimalField * To preserve precision DecimalField is returned, containing the number and places. - * @return return DecimalField + * @return return DecimalField */ DecimalField value() const { diff --git a/include/deferred.h b/include/deferred.h index 3112732..81fe598 100644 --- a/include/deferred.h +++ b/include/deferred.h @@ -13,6 +13,12 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include "callbacks.h" + /** * Set up namespace */ @@ -105,12 +111,25 @@ protected: return reportSuccess(); } + /** + * Report success for a get operation + * + * @param messagecount Number of messages left in the queue + * @param deliveryTag Delivery tag of the message coming in + * @param redelivered Was the message redelivered? + */ + virtual const std::shared_ptr &reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered) + { + // this is the same as a regular success message + return reportSuccess(); + } + /** * Report success for frames that report cancel operations * @param name Consumer tag that is cancelled * @return Deferred */ - virtual const std::shared_ptr &reportSuccess(const std::string &name) const + virtual const std::shared_ptr &reportSuccess(const std::string &name) { // this is the same as a regular success message return reportSuccess(); diff --git a/include/deferredcancel.h b/include/deferredcancel.h index 0358c32..7bcaed6 100644 --- a/include/deferredcancel.h +++ b/include/deferredcancel.h @@ -41,7 +41,7 @@ private: * @param name Consumer tag that is cancelled * @return Deferred */ - virtual const std::shared_ptr &reportSuccess(const std::string &name) const override; + virtual const std::shared_ptr &reportSuccess(const std::string &name) override; /** * The channel implementation may call our diff --git a/include/deferredconsumer.h b/include/deferredconsumer.h index 597c2ab..63f818d 100644 --- a/include/deferredconsumer.h +++ b/include/deferredconsumer.h @@ -11,6 +11,11 @@ */ #pragma once +/** + * Dependencies + */ +#include "deferredconsumerbase.h" + /** * Set up namespace */ @@ -19,34 +24,30 @@ namespace AMQP { /** * We extend from the default deferred and add extra functionality */ -class DeferredConsumer : public Deferred +class DeferredConsumer : public DeferredConsumerBase { private: - /** - * The channel to which the consumer is linked - * @var ChannelImpl - */ - ChannelImpl *_channel; - /** * Callback to execute when consumption has started * @var ConsumeCallback */ ConsumeCallback _consumeCallback; - /** - * Callback for incoming messages - * @var MessageCallback - */ - MessageCallback _messageCallback; - - /** * Report success for frames that report start consumer operations * @param name Consumer tag that is started * @return Deferred */ - virtual const std::shared_ptr &reportSuccess(const std::string &name) const override; + virtual const std::shared_ptr &reportSuccess(const std::string &name) override; + + /** + * Emit a message + * + * @param message The message to emit + * @param deliveryTag The delivery tag (for ack()ing) + * @param redelivered Is this a redelivered message + */ + void emit(Message &&message, uint64_t deliveryTag, bool redelivered) const override; /** * The channel implementation may call our @@ -68,7 +69,7 @@ public: * @param failed are we already failed? */ DeferredConsumer(ChannelImpl *channel, bool failed = false) : - Deferred(failed), _channel(channel) {} + DeferredConsumerBase(failed, channel) {} public: /** diff --git a/include/deferredconsumerbase.h b/include/deferredconsumerbase.h new file mode 100644 index 0000000..31195a5 --- /dev/null +++ b/include/deferredconsumerbase.h @@ -0,0 +1,252 @@ +/** + * deferredconsumerbase.h + * + * Base class for the deferred consumer and the + * deferred get. + * + * @copyright 2016 Copernica B.V. + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "deferred.h" +#include "stack_ptr.h" +#include "message.h" + +/** + * Start namespace + */ +namespace AMQP { + +/** + * Forward declarations + */ +class BasicDeliverFrame; +class BasicHeaderFrame; +class BodyFrame; + +/** + * Base class for deferred consumers + */ +class DeferredConsumerBase : + public Deferred, + public std::enable_shared_from_this +{ +private: + /** + * Size of the body of the current message + * @var uint64_t + */ + uint64_t _bodySize = 0; + + /** + * Process a delivery frame + * + * @param frame The frame to process + */ + void process(BasicDeliverFrame &frame); + + /** + * Process the message headers + * + * @param frame The frame to process + */ + void process(BasicHeaderFrame &frame); + + /** + * Process the message data + * + * @param frame The frame to process + */ + void process(BodyFrame &frame); + + /** + * Indicate that a message was done + */ + void complete(); + + /** + * Emit a message + * + * @param message The message to emit + * @param deliveryTag The delivery tag (for ack()ing) + * @param redelivered Is this a redelivered message + */ + virtual void emit(Message &&message, uint64_t deliveryTag, bool redelivered) const = 0; + + /** + * Frames may be processed + */ + friend class ChannelImpl; + friend class BasicDeliverFrame; + friend class BasicHeaderFrame; + friend class BodyFrame; +protected: + /** + * The delivery tag for the current message + * @var uint64_t + */ + uint64_t _deliveryTag = 0; + + /** + * Is this a redelivered message + * @var bool + */ + bool _redelivered = false; + + /** + * The channel to which the consumer is linked + * @var ChannelImpl + */ + ChannelImpl *_channel; + + /** + * Callback for new message + * @var BeginCallback + */ + BeginCallback _beginCallback; + + /** + * Callback for incoming headers + * @var HeaderCallback + */ + HeaderCallback _headerCallback; + + /** + * Callback for when a chunk of data comes in + * @var DataCallback + */ + DataCallback _dataCallback; + + /** + * Callback for incoming messages + * @var MessageCallback + */ + MessageCallback _messageCallback; + + /** + * Callback for when a message was complete finished + * @var CompleteCallback + */ + CompleteCallback _completeCallback; + + /** + * The message that we are currently receiving + * @var stack_ptr + */ + stack_ptr _message; + + /** + * Constructor + * + * @param failed Have we already failed? + * @param channel The channel we are consuming on + */ + DeferredConsumerBase(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {} +public: + /** + * Register the function to be called when a new message is expected + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredConsumerBase &onBegin(const BeginCallback &callback) + { + // store callback + _beginCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when message headers come in + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ + DeferredConsumerBase &onHeaders(const HeaderCallback &callback) + { + // store callback + _headerCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when a chunk of data comes in + * + * Note that this function may be called zero, one or multiple times + * for each incoming message depending on the size of the message data. + * + * If you install this callback you very likely also want to install + * the onComplete callback so you know when the last data part was + * received. + * + * @param callback The callback to invoke for chunks of message data + * @return Same object for chaining + */ + DeferredConsumerBase &onData(const DataCallback &callback) + { + // store callback + _dataCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function to be called when a message arrives + * This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it + * @param callback the callback to execute + */ + DeferredConsumerBase &onReceived(const MessageCallback &callback) + { + // store callback + _messageCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function to be called when a message arrives + * This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it + * @param callback the callback to execute + */ + DeferredConsumerBase &onMessage(const MessageCallback &callback) + { + // store callback + _messageCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a funtion to be called when a message was completely received + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredConsumerBase& onComplete(const CompleteCallback &callback) + { + // store callback + _completeCallback = callback; + + // allow chaining + return *this; + } +}; + +/** + * End namespace + */ +} diff --git a/include/deferredget.h b/include/deferredget.h index 607b029..d66f6fb 100644 --- a/include/deferredget.h +++ b/include/deferredget.h @@ -10,6 +10,11 @@ */ #pragma once +/** + * Dependencies + */ +#include "deferredconsumerbase.h" + /** * Set up namespace */ @@ -17,32 +22,20 @@ namespace AMQP { /** * Class definition - * + * * This class implements the 'shared_from_this' functionality, because * it grabs a self-pointer when the callback is running, otherwise the onFinalize() * is called before the actual message is consumed. */ -class DeferredGet : public Deferred, public std::enable_shared_from_this +class DeferredGet : public DeferredConsumerBase { private: - /** - * Pointer to the channel - * @var ChannelImpl - */ - ChannelImpl *_channel; - - /** - * Callback for incoming messages - * @var MessageCallback - */ - MessageCallback _messageCallback; - /** * Callback in case the queue is empty * @var EmptyCallback */ EmptyCallback _emptyCallback; - + /** * Callback with the number of messages still in the queue * @var SizeCallback @@ -50,18 +43,29 @@ private: SizeCallback _sizeCallback; /** - * Report success when a message is indeed expected - * @param count number of messages in the queue - * @return Deferred + * Report success for a get operation + * + * @param messagecount Number of messages left in the queue + * @param deliveryTag Delivery tag of the message coming in + * @param redelivered Was the message redelivered? */ - virtual const std::shared_ptr &reportSuccess(uint32_t messagecount) const override; + const std::shared_ptr &reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered) override; /** * Report success when queue was empty * @return Deferred */ virtual const std::shared_ptr &reportSuccess() const override; - + + /** + * Emit a message + * + * @param message The message to emit + * @param deliveryTag The delivery tag (for ack()ing) + * @param redelivered Is this a redelivered message + */ + void emit(Message &&message, uint64_t deliveryTag, bool redelivered) const override; + /** * The channel implementation may call our * private members and construct us @@ -77,12 +81,12 @@ public: * Note: this constructor _should_ be protected, but because make_shared * will then not work, we have decided to make it public after all, * because the work-around would result in not-so-easy-to-read code. - * + * * @param channel the channel implementation * @param failed are we already failed? */ - DeferredGet(ChannelImpl *channel, bool failed = false) : - Deferred(failed), _channel(channel) {} + DeferredGet(ChannelImpl *channel, bool failed = false) : + DeferredConsumerBase(failed, channel) {} public: /** @@ -94,39 +98,11 @@ public: { // store the callback _messageCallback = callback; - + // allow chaining return *this; } - /** - * Register a function to be called when a message arrives - * This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it - * @param callback the callback to execute - */ - DeferredGet &onReceived(const MessageCallback &callback) - { - // store callback - _messageCallback = callback; - - // allow chaining - return *this; - } - - /** - * Register a function to be called when a message arrives - * This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it - * @param callback the callback to execute - */ - DeferredGet &onMessage(const MessageCallback &callback) - { - // store callback - _messageCallback = callback; - - // allow chaining - return *this; - } - /** * Register a function to be called if no message could be fetched * @param callback the callback to execute @@ -135,11 +111,11 @@ public: { // store callback _emptyCallback = callback; - + // allow chaining return *this; } - + /** * Register a function to be called when size information is known * @param callback the callback to execute @@ -148,12 +124,12 @@ public: { // store callback _sizeCallback = callback; - + // allow chaining return *this; } }; - + /** * End of namespace */ diff --git a/include/envelope.h b/include/envelope.h index 763d84d..54c4140 100644 --- a/include/envelope.h +++ b/include/envelope.h @@ -12,6 +12,11 @@ */ #pragma once +/** + * Dependencies + */ +#include "metadata.h" + /** * Set up namespace */ diff --git a/include/field.h b/include/field.h index dec463e..b4d8ed9 100644 --- a/include/field.h +++ b/include/field.h @@ -9,11 +9,24 @@ */ #pragma once +/** + * Dependencies + */ +#include + /** * Set up namespace */ namespace AMQP { +/** + * Forward declarations + */ +class ReceivedFrame; +class OutBuffer; +class Array; +class Table; + /** * Base field class * diff --git a/include/fieldproxy.h b/include/fieldproxy.h index 44064cd..a122f76 100644 --- a/include/fieldproxy.h +++ b/include/fieldproxy.h @@ -1,7 +1,7 @@ /** * Field proxy. Returned by the table. Can be casted to the * relevant native type (std::string or numeric) - * + * * @copyright 2014 Copernica BV */ @@ -10,11 +10,28 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include +#include "stringfield.h" +#include "booleanset.h" +#include "decimalfield.h" +#include "numericfield.h" + /** * Set up namespace */ namespace AMQP { +/** + * Forward declarations + */ +class Table; +class Array; +class Field; + /** * Class implementation */ @@ -200,7 +217,7 @@ public: // cast to a string return operator=(std::string(value)); } - + /** * Assign an array value * @param value @@ -224,7 +241,7 @@ public: _source->set(_index, value); return *this; } - + /** * Get the underlying field * @return Field @@ -247,32 +264,8 @@ public: }; // define types for array- and table-based field proxy -typedef FieldProxy AssociativeFieldProxy; -typedef FieldProxy ArrayFieldProxy; - -/** - * Custom output stream operator - * @param stream - * @param field - * @return ostream - */ -inline std::ostream &operator<<(std::ostream &stream, const AssociativeFieldProxy &field) -{ - // get underlying field, and output that - return stream << field.get(); -} - -/** - * Custom output stream operator - * @param stream - * @param field - * @return ostream - */ -inline std::ostream &operator<<(std::ostream &stream, const ArrayFieldProxy &field) -{ - // get underlying field, and output that - return stream << field.get(); -} +using AssociativeFieldProxy = FieldProxy; +using ArrayFieldProxy = FieldProxy; /** * end namespace diff --git a/include/login.h b/include/login.h index 3d6e095..b82c7a9 100644 --- a/include/login.h +++ b/include/login.h @@ -11,6 +11,11 @@ */ #pragma once +/** + * Dependencies + */ +#include + /** * Set up namespace */ @@ -33,7 +38,7 @@ private: * @var string */ std::string _password; - + public: /** @@ -53,7 +58,7 @@ public: * Destructor */ virtual ~Login() {} - + /** * Retrieve the user name * @return string @@ -62,7 +67,7 @@ public: { return _user; } - + /** * Retrieve the password * @return string @@ -71,7 +76,7 @@ public: { return _password; } - + /** * String representation in SASL PLAIN mode * @return string @@ -80,7 +85,7 @@ public: { // we need an initial string std::string result("\0", 1); - + // append other elements return result.append(_user).append("\0",1).append(_password); } diff --git a/include/message.h b/include/message.h index 706aa20..3d3a49d 100644 --- a/include/message.h +++ b/include/message.h @@ -15,11 +15,22 @@ */ #pragma once +/** + * Dependencies + */ +#include "envelope.h" +#include + /** * Set up namespace */ namespace AMQP { +/** + * Forward declarations + */ +class DeferredConsumerBase; + /** * Class definition */ @@ -38,10 +49,78 @@ protected: */ std::string _routingKey; -protected: /** - * The constructor is protected to ensure that endusers can not - * instantiate a message + * We are an open book to the consumer handler + */ + friend class DeferredConsumerBase; + + /** + * Set the body size + * This field is set when the header is received + * @param uint64_t + */ + void setBodySize(uint64_t size) + { + // safety-check: on 32-bit platforms size_t is obviously also a 32-bit dword + // in which case casting the uint64_t to a size_t could result in truncation + // here we check whether the given size fits inside a size_t + if (std::numeric_limits::max() < size) throw std::runtime_error("message is too big for this system"); + + // store the new size + _bodySize = size; + } + + /** + * Append data + * @param buffer incoming data + * @param size size of the data + * @return bool true if the message is now complete + */ + bool append(const char *buffer, uint64_t size) + { + // is this the only data, and also direct complete? + if (_str.empty() && size >= _bodySize) + { + // we have everything + _body = buffer; + + // done + return true; + } + else + { + // it does not fit yet, do we have to allocate + if (!_body) + { + // allocate memory in the string + _str.reserve(static_cast(_bodySize)); + + // we now use the data buffer inside the string + _body = _str.data(); + } + + // safety-check: if the given size exceeds the given message body size + // we truncate it, this should never happen because it indicates a bug + // in the AMQP server implementation, should we report this? + size = std::min(size, _bodySize - _str.size()); + + // we can not safely append the data to the string, it + // will not exceed the reserved size so it is guaranteed + // not to change the data pointer, we can just leave that + // @todo this is not always necessary; instead, we can refrain from + // allocating this buffer entirely and just insert it into the message + // directly. + _str.append(buffer, static_cast(size)); + + // if the string is filled with the given number of characters we are done now + return _str.size() >= _bodySize; + } + } + +public: + /** + * Constructor + * * @param exchange * @param routingKey */ @@ -49,7 +128,6 @@ protected: Envelope(nullptr, 0), _exchange(exchange), _routingKey(routingKey) {} -public: /** * Copy constructor @@ -76,7 +154,7 @@ public: /** * Destructor */ - virtual ~Message() {} + virtual ~Message() = default; /** * Assignment operator diff --git a/include/metadata.h b/include/metadata.h index 5b0288f..fa3ed82 100644 --- a/include/metadata.h +++ b/include/metadata.h @@ -12,6 +12,13 @@ */ #pragma once +/** + * Dependencies + */ +#include "booleanset.h" +#include "stringfield.h" +#include "table.h" + /** * Set up namespace */ diff --git a/include/monitor.h b/include/monitor.h index 6cea798..9219283 100644 --- a/include/monitor.h +++ b/include/monitor.h @@ -16,6 +16,11 @@ */ #pragma once +/** + * Dependencies + */ +#include "watchable.h" + /** * Set up namespace */ diff --git a/include/numericfield.h b/include/numericfield.h index b3d803b..29f2f42 100644 --- a/include/numericfield.h +++ b/include/numericfield.h @@ -9,6 +9,16 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include +#include "receivedframe.h" +#include "outbuffer.h" +#include "field.h" +#include + /** * Set up namespace */ diff --git a/include/outbuffer.h b/include/outbuffer.h index 28b533c..1eeab8a 100644 --- a/include/outbuffer.h +++ b/include/outbuffer.h @@ -12,6 +12,12 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include + /** * Set up namespace */ @@ -93,7 +99,7 @@ public: /** * Destructor */ - virtual ~OutBuffer() {} + virtual ~OutBuffer() = default; /** * Get access to the internal buffer @@ -256,4 +262,3 @@ public: * End of namespace */ } - diff --git a/include/receivedframe.h b/include/receivedframe.h index b2565b7..acc9ae0 100644 --- a/include/receivedframe.h +++ b/include/receivedframe.h @@ -1,7 +1,7 @@ /** * ReceivedFrame.h * - * The received frame class is a wrapper around a data buffer, it tries to + * The received frame class is a wrapper around a data buffer, it tries to * find out if the buffer is big enough to contain an entire frame, and * it will try to recognize the frame type in the buffer * @@ -16,11 +16,22 @@ */ #pragma once +/** + * Dependencies + */ +#include + /** * Set up namespace */ namespace AMQP { +/** + * Forward declarations + */ +class Buffer; +class ConnectionImpl; + /** * Class definition */ @@ -113,7 +124,7 @@ private: * @return bool */ bool processHeaderFrame(ConnectionImpl *connection); - + public: /** @@ -171,56 +182,56 @@ public: /** * Read the next uint8_t from the buffer - * + * * @return uint8_t value read */ uint8_t nextUint8(); /** * Read the next int8_t from the buffer - * + * * @return int8_t value read */ int8_t nextInt8(); /** * Read the next uint16_t from the buffer - * + * * @return uint16_t value read */ uint16_t nextUint16(); /** * Read the next int16_t from the buffer - * + * * @return int16_t value read */ int16_t nextInt16(); /** * Read the next uint32_t from the buffer - * + * * @return uint32_t value read */ uint32_t nextUint32(); /** * Read the next int32_t from the buffer - * + * * @return int32_t value read */ int32_t nextInt32(); /** * Read the next uint64_t from the buffer - * + * * @return uint64_t value read */ uint64_t nextUint64(); /** * Read the next int64_t from the buffer - * + * * @return int64_t value read */ int64_t nextInt64(); @@ -228,7 +239,7 @@ public: /** * Read a float from the buffer * - * @return float float read from buffer. + * @return float float read from buffer. */ float nextFloat(); @@ -248,11 +259,11 @@ public: /** * Process the received frame - * + * * If this method returns false, it means that the frame was not processed, - * because it was an unrecognized frame. This does not mean that the + * because it was an unrecognized frame. This does not mean that the * connection is now in an invalid state however. - * + * * @param connection the connection over which the data was received * @return bool was the frame fully processed * @internal @@ -271,4 +282,3 @@ public: * End of namespace */ } - diff --git a/include/stack_ptr.h b/include/stack_ptr.h new file mode 100644 index 0000000..1383127 --- /dev/null +++ b/include/stack_ptr.h @@ -0,0 +1,143 @@ +/** + * stack_ptr.h + * + * Implementation of an object that behaves like a + * smart pointer but is actually managed on the stack + * + * @copyright 2016 Copernica B.V. + */ + +/** + * Dependencies + */ +#include +#include + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Stack-based smart pointer + */ +template +class stack_ptr +{ +private: + /** + * Storage for the object + * @var std::aligned_storage + */ + std::aligned_storage _data; + + /** + * Is the pointer initialized? + * @var boolean + */ + bool _initialized = false; +public: + /** + * Constructor + */ + stack_ptr() = default; + + /** + * Copy and moving is disabled + * + * @param that The stack_ptr we refuse to copy/move + */ + stack_ptr(const stack_ptr &that) = delete; + stack_ptr(stack_ptr &&that) = delete; + + /** + * Destructor + */ + ~stack_ptr() + { + // reset the pointer + reset(); + } + + /** + * Reset the pointer + */ + void reset() + { + // are we initialized? + if (!_initialized) return; + + // destroy the object + reinterpret_cast(&_data)->~T(); + + // the object is not currently initialized + _initialized = false; + } + + /** + * Construct the object + * + * @param ... Zero or more constructor arguments for T + */ + template + void construct(Arguments&&... parameters) + { + // first reset the current object + reset(); + + // initialize new object + new (&_data) T(std::forward(parameters)...); + } + + /** + * Is the object initialized? + * + * @return Are we currently managing an object? + */ + operator bool() const + { + // are we initialized with an object? + return _initialized; + } + + /** + * Retrieve a pointer to the object + * + * @return Pointer to the object or nullptr if no object is managed + */ + T *get() const + { + // do we have a managed object + if (!_initialized) return nullptr; + + // return pointer to the managed object + return const_cast(reinterpret_cast(&_data)); + } + + /** + * Retrieve a reference to the object + * + * @return Reference to the object, undefined if no object is managed + */ + T &operator*() const + { + // dereference the pointer + return *operator->(); + } + + /** + * Retrieve a pointer to the object + * + * @return Pointer to the object, undefined if no object is managed + */ + T *operator->() const + { + // return pointer to the managed object + return const_cast(reinterpret_cast(&_data)); + } +}; + +/** + * End namespace + */ +} diff --git a/include/stringfield.h b/include/stringfield.h index 789f57e..bab6310 100644 --- a/include/stringfield.h +++ b/include/stringfield.h @@ -9,6 +9,14 @@ */ #pragma once +/** + * Dependencies + */ +#include "field.h" +#include "outbuffer.h" +#include "numericfield.h" +#include "receivedframe.h" + /** * Set up namespace */ @@ -63,7 +71,7 @@ public: /** * Clean up memory used */ - virtual ~StringField() {} + virtual ~StringField() = default; /** * Create a new instance of this object diff --git a/include/table.h b/include/table.h index 75779c6..fd90bed 100644 --- a/include/table.h +++ b/include/table.h @@ -9,6 +9,14 @@ */ #pragma once +/** + * Dependencies + */ +#include "field.h" +#include "fieldproxy.h" +#include +#include + /** * Set up namespace */ @@ -112,7 +120,7 @@ public: // allow chaining return *this; } - + /** * Aliases for setting values * @param name @@ -246,6 +254,18 @@ public: } }; +/** + * Custom output stream operator + * @param stream + * @param field + * @return ostream + */ +inline std::ostream &operator<<(std::ostream &stream, const AssociativeFieldProxy &field) +{ + // get underlying field, and output that + return stream << field.get(); +} + /** * end namespace */ diff --git a/include/watchable.h b/include/watchable.h index 889a2d7..74f3780 100644 --- a/include/watchable.h +++ b/include/watchable.h @@ -12,11 +12,22 @@ */ #pragma once +/** + * Dependencies + */ +#include +#include + /** * Set up namespace */ namespace AMQP { +/** + * Forward declarations + */ +class Monitor; + /** * Class definition */ @@ -38,7 +49,7 @@ private: // add to the vector _monitors.push_back(monitor); } - + /** * Remove a monitor * @param monitor @@ -47,7 +58,7 @@ private: { // put the monitor at the end of the vector auto iter = std::remove(_monitors.begin(), _monitors.end(), monitor); - + // make the vector smaller _monitors.erase(iter, _monitors.end()); } @@ -57,12 +68,12 @@ public: * Destructor */ virtual ~Watchable(); - + /** * Only a monitor has full access */ friend class Monitor; -}; +}; /** * End of namespace diff --git a/src/basicdeliverframe.h b/src/basicdeliverframe.h index ec6cd78..1252969 100644 --- a/src/basicdeliverframe.h +++ b/src/basicdeliverframe.h @@ -1,9 +1,22 @@ /** * Class describing a basic deliver frame - * + * * @copyright 2014 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "basicframe.h" +#include "../include/stringfield.h" +#include "../include/booleanset.h" +#include "../include/connectionimpl.h" + /** * Set up namespace */ @@ -12,7 +25,7 @@ namespace AMQP{ /** * Class implementation */ -class BasicDeliverFrame : public BasicFrame +class BasicDeliverFrame : public BasicFrame { private: /** @@ -70,7 +83,7 @@ public: * @param consumerTag identifier for the consumer, valid within current channel * @param deliveryTag server-assigned and channel specific delivery tag * @param redelivered indicates whether the message has been previously delivered to this (or another) client - * @param exchange name of exchange to publish to + * @param exchange name of exchange to publish to * @param routingKey message routing key */ BasicDeliverFrame(uint16_t channel, const std::string& consumerTag, uint64_t deliveryTag, bool redelivered = false, const std::string& exchange = "", const std::string& routingKey = "") : @@ -86,7 +99,7 @@ public: /** * Construct a basic deliver frame from a received frame * - * @param frame received frame + * @param frame received frame */ BasicDeliverFrame(ReceivedFrame &frame) : BasicFrame(frame), @@ -176,13 +189,13 @@ public: { // we need the appropriate channel auto channel = connection->channel(this->channel()); - + // channel does not exist - if (!channel) return false; - + if (!channel) return false; + // construct the message - channel->message(*this); - + channel->process(*this); + // done return true; } diff --git a/src/basicframe.h b/src/basicframe.h index 1604ec3..3d9669c 100644 --- a/src/basicframe.h +++ b/src/basicframe.h @@ -1,9 +1,19 @@ /** * Class describing an AMQP basic frame - * + * * @copyright 2014 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "methodframe.h" + /** * Set up namespace */ @@ -35,7 +45,7 @@ public: */ virtual ~BasicFrame() {} - /** + /** * Class id * @return uint16_t */ diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index 42d33c1..be4189f 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -1,6 +1,6 @@ /** * Class describing a basic get ok frame - * + * * @copyright 2014 Copernica BV */ @@ -12,7 +12,7 @@ namespace AMQP{ /** * Class implementation */ -class BasicGetOKFrame : public BasicFrame +class BasicGetOKFrame : public BasicFrame { private: /** @@ -71,7 +71,7 @@ public: * @param channel channel we're working on * @param deliveryTag server-assigned and channel specific delivery tag * @param redelivered indicates whether the message has been previously delivered to this (or another) client - * @param exchange name of exchange to publish to + * @param exchange name of exchange to publish to * @param routingKey message routing key * @param messageCount number of messages in the queue */ @@ -166,20 +166,16 @@ public: { // we need the appropriate channel auto channel = connection->channel(this->channel()); - + // channel does not exist - if (!channel) return false; - - // report (if this function returns false, it means that the channel - // object no longer is valid) - if (!channel->reportSuccess(_messageCount)) return true; - - // construct the message - channel->message(*this); - + if (!channel) return false; + + // report success for the get operation + channel->reportSuccess(messageCount(), deliveryTag(), redelivered()); + // notice that the channel is not yet synchronized here, because // we first have to receive the entire body - + // done return true; } diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index 6ed9518..62d563e 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -1,9 +1,23 @@ /** * Class describing an AMQP basic header frame - * + * * @copyright 2014 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "headerframe.h" +#include "../include/metadata.h" +#include "../include/envelope.h" +#include "../include/connectionimpl.h" +#include "../include/deferredconsumerbase.h" + /** * Set up namespace */ @@ -57,7 +71,7 @@ public: * Construct an empty basic header frame * * All options are set using setter functions. - * + * * @param channel channel we're working on * @param envelope the envelope */ @@ -71,17 +85,17 @@ public: * Constructor to parse incoming frame * @param frame */ - BasicHeaderFrame(ReceivedFrame &frame) : + BasicHeaderFrame(ReceivedFrame &frame) : HeaderFrame(frame), _weight(frame.nextUint16()), _bodySize(frame.nextUint64()), _metadata(frame) {} - + /** * Destructor */ - virtual ~BasicHeaderFrame() {} + virtual ~BasicHeaderFrame() = default; /** * Size of the body @@ -92,6 +106,16 @@ public: return _bodySize; } + /** + * The metadata sent in this frame + * + * @return All the metadata for this message + */ + const MetaData &metaData() const + { + return _metadata; + } + /** * The class ID * @return uint16_t @@ -100,7 +124,7 @@ public: { return 60; } - + /** * Process the frame * @param connection The connection over which it was received @@ -110,23 +134,13 @@ public: { // we need the appropriate channel auto channel = connection->channel(this->channel()); - - // channel does not exist - if (!channel) return false; - - // is there a current message? - MessageImpl *message = channel->message(); - if (!message) return false; - - // store size - message->setBodySize(_bodySize); - - // and copy the meta data - message->set(_metadata); - - // for empty bodies we're ready now - if (_bodySize == 0) channel->reportMessage(); - + + // check if we have a valid channel and consumer + if (!channel || !channel->consumer()) return false; + + // the channel can process the frame + channel->consumer()->process(*this); + // done return true; } diff --git a/src/bodyframe.h b/src/bodyframe.h index a5cf234..ba34cd2 100644 --- a/src/bodyframe.h +++ b/src/bodyframe.h @@ -4,6 +4,18 @@ * @copyright 2014 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "extframe.h" +#include "../include/connectionimpl.h" +#include "../include/deferredconsumerbase.h" + /** * Set up namespace */ @@ -94,18 +106,11 @@ public: // we need the appropriate channel auto channel = connection->channel(this->channel()); - // channel does not exist - if (!channel) return false; + // check if we have a valid channel and consumer + if (!channel || !channel->consumer()) return false; - // is there a current message? - MessageImpl *message = channel->message(); - if (!message) return false; - - // store size - if (!message->append(_payload, _size)) return true; - - // the message is complete - channel->reportMessage(); + // the consumer may process the frame + channel->consumer()->process(*this); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 77f7a63..6a6904b 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -46,15 +46,16 @@ */ namespace AMQP { +/** + * Constructor + */ +ChannelImpl::ChannelImpl() = default; + /** * Destructor */ ChannelImpl::~ChannelImpl() { - // remove incoming message - if (_message) delete _message; - _message = nullptr; - // remove this channel from the connection (but not if the connection is already destructed) if (_connection) _connection->remove(this); } @@ -705,40 +706,6 @@ void ChannelImpl::onSynchronized() } } -/** - * Report the received message - */ -void ChannelImpl::reportMessage() -{ - // skip if there is no message - if (!_message) return; - - // after the report the channel may be destructed, monitor that - Monitor monitor(this); - - // synchronize the channel if this comes from a basic.get frame - if (_message->consumer().empty()) onSynchronized(); - - // syncing the channel may destruct the channel - if (!monitor.valid()) return; - - // look for the consumer - auto iter = _consumers.find(_message->consumer()); - if (iter == _consumers.end()) return; - - // is this a valid callback method - if (!iter->second) return; - - // call the callback - _message->report(iter->second); - - // skip if channel was destructed - if (!monitor.valid()) return; - - // no longer need the message - delete _message; _message = nullptr; -} - /** * Report an error message on a channel * @param message the error message @@ -800,38 +767,47 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler) // leap out if object no longer exists if (!monitor.valid()) return; - // the connection now longer has to know that this channel exists, + // the connection no longer has to know that this channel exists, // because the channel ID is no longer in use if (_connection) _connection->remove(this); } - /** - * Create an incoming message from a consume call - * @param frame - * @return ConsumedMessage + * Process incoming delivery + * + * @param frame The frame to process */ -ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame) +void ChannelImpl::process(BasicDeliverFrame &frame) { - // destruct if message is already set - if (_message) delete _message; + // find the consumer for this frame + auto iter = _consumers.find(frame.consumerTag()); + if (iter == _consumers.end()) return; - // construct a message - return _message = new ConsumedMessage(frame); + // we are going to be receiving a message, store + // the handler for the incoming message + _consumer = iter->second; + + // let the consumer process the frame + _consumer->process(frame); } /** - * Create an incoming message from a get call - * @param frame - * @return ConsumedMessage + * Retrieve the current consumer handler + * + * @return The handler responsible for the current message */ -ConsumedMessage *ChannelImpl::message(const BasicGetOKFrame &frame) +DeferredConsumerBase *ChannelImpl::consumer() { - // destruct if message is already set - if (_message) delete _message; - - // construct message - return _message = new ConsumedMessage(frame); + return _consumer.get(); +} + +/** + * Mark the current consumer as done + */ +void ChannelImpl::complete() +{ + // no more consumer + _consumer.reset(); } /** diff --git a/src/deferredcancel.cpp b/src/deferredcancel.cpp index e747e69..d757e8f 100644 --- a/src/deferredcancel.cpp +++ b/src/deferredcancel.cpp @@ -17,17 +17,17 @@ namespace AMQP { * @param name Consumer tag that is cancelled * @return Deferred */ -const std::shared_ptr &DeferredCancel::reportSuccess(const std::string &name) const +const std::shared_ptr &DeferredCancel::reportSuccess(const std::string &name) { // in the channel, we should uninstall the consumer _channel->uninstall(name); - + // skip if no special callback was installed if (!_cancelCallback) return Deferred::reportSuccess(); - + // call the callback _cancelCallback(name); - + // return next object return _next; } diff --git a/src/deferredconsumer.cpp b/src/deferredconsumer.cpp index 1661168..eed3fbf 100644 --- a/src/deferredconsumer.cpp +++ b/src/deferredconsumer.cpp @@ -17,21 +17,34 @@ namespace AMQP { * @param name Consumer tag that is started * @return Deferred */ -const std::shared_ptr &DeferredConsumer::reportSuccess(const std::string &name) const +const std::shared_ptr &DeferredConsumer::reportSuccess(const std::string &name) { - // we now know the name, so we can install the message callback on the channel - _channel->install(name, _messageCallback); - + // we now know the name, so install ourselves in the channel + _channel->install(name, shared_from_this()); + // skip if no special callback was installed if (!_consumeCallback) return Deferred::reportSuccess(); - + // call the callback _consumeCallback(name); - + // return next object return _next; } +/** + * Emit a message + * + * @param message The message to emit + * @param deliveryTag The delivery tag (for ack()ing) + * @param redelivered Is this a redelivered message + */ +void DeferredConsumer::emit(Message &&message, uint64_t deliveryTag, bool redelivered) const +{ + // simply execute the message callback + _messageCallback(std::move(message), deliveryTag, redelivered); +} + /** * End namespace */ diff --git a/src/deferredconsumerbase.cpp b/src/deferredconsumerbase.cpp new file mode 100644 index 0000000..fd85f14 --- /dev/null +++ b/src/deferredconsumerbase.cpp @@ -0,0 +1,123 @@ +/** + * deferredconsumerbase.cpp + * + * Base class for the deferred consumer and the + * deferred get. + * + * @copyright 2016 Copernica B.V. + */ + +/** + * Dependencies + */ +#include "../include/deferredconsumerbase.h" +#include "basicdeliverframe.h" +#include "basicheaderframe.h" +#include "bodyframe.h" + +/** + * Start namespace + */ +namespace AMQP { + +/** + * Process a delivery frame + * + * @param frame The frame to process + */ +void DeferredConsumerBase::process(BasicDeliverFrame &frame) +{ + // retrieve the delivery tag and whether we were redelivered + _deliveryTag = frame.deliveryTag(); + _redelivered = frame.redelivered(); + + // anybody interested in the new message? + if (_beginCallback) _beginCallback(); + + // do we have anybody interested in messages? + if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); +} + +/** + * Process the message headers + * + * @param frame The frame to process + */ +void DeferredConsumerBase::process(BasicHeaderFrame &frame) +{ + // store the body size + _bodySize = frame.bodySize(); + + // do we have a message? + if (_message) + { + // store the body size and metadata + _message->setBodySize(_bodySize); + _message->set(frame.metaData()); + } + + // anybody interested in the headers? + if (_headerCallback) _headerCallback(frame.metaData()); + + // no body data expected? then we are now complete + if (!_bodySize) complete(); +} + +/** + * Process the message data + * + * @param frame The frame to process + */ +void DeferredConsumerBase::process(BodyFrame &frame) +{ + // make sure we stay in scope + auto self = shared_from_this(); + + // update the bytes still to receive + _bodySize -= frame.payloadSize(); + + // anybody interested in the data? + if (_dataCallback) _dataCallback(frame.payload(), frame.payloadSize()); + + // do we have a message? then append the data + if (_message) _message->append(frame.payload(), frame.payloadSize()); + + // if all bytes were received we are now complete + if (!_bodySize) complete(); +} + +/** + * Indicate that a message was done + */ +void DeferredConsumerBase::complete() +{ + // make sure we stay in scope + auto self = shared_from_this(); + + // also monitor the channel + Monitor monitor{ _channel }; + + // do we have a message? + if (_message) + { + // emit the message + emit(std::move(*_message), _deliveryTag, _redelivered); + + // and destroy it + _message.reset(); + } + + // do we have to inform anyone about completion? + if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); + + // do we still have a valid channel + if (!monitor.valid()) return; + + // we are now done executing + _channel->complete(); +} + +/** + * End namespace + */ +} diff --git a/src/deferredget.cpp b/src/deferredget.cpp index 442961a..a8b4a20 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -18,41 +18,31 @@ namespace AMQP { /** - * Report success, a get message succeeded and the message is expected soon - * @param messageCount Message count - * @return Deferred + * Report success for a get operation + * + * @param messagecount Number of messages left in the queue + * @param deliveryTag Delivery tag of the message coming in + * @param redelivered Was the message redelivered? */ -const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messageCount) const +const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered) { - // we grab a self pointer to ensure that the deferred object stays alive - auto self = shared_from_this(); + // store delivery tag and redelivery status + _deliveryTag = deliveryTag; + _redelivered = redelivered; - // we now know the name, so we can install the message callback on the channel, the self - // pointer is also captured, which ensures that 'this' is not destructed, all members stay - // accessible, and that the onFinalize() function will only be called after the message - // is reported (onFinalize() is called from the destructor of this DeferredGet object) - _channel->install("", [self, this](Message &&message, uint64_t deliveryTag, bool redelivered) { - - // install a monitor to deal with the case that the channel is removed - Monitor monitor(_channel); - - // call the callbacks - if (_messageCallback) _messageCallback(std::move(message), deliveryTag, redelivered); - - // we can remove the callback now from the channel - if (monitor.valid()) _channel->uninstall(""); - }); + // install ourselves in the channel + _channel->install("", shared_from_this()); // report the size (note that this is the size _minus_ the message that is retrieved // (and for which the callback will be called later), so it could be zero) - if (_sizeCallback) _sizeCallback(messageCount); + if (_sizeCallback) _sizeCallback(messagecount); - // return next object + // return next handler return _next; } /** - * Report success, although no message could be get + * Report success, although no message could be gotten * @return Deferred */ const std::shared_ptr &DeferredGet::reportSuccess() const @@ -67,6 +57,31 @@ const std::shared_ptr &DeferredGet::reportSuccess() const return _next; } +/** + * Emit a message + * + * @param message The message to emit + * @param deliveryTag The delivery tag (for ack()ing) + * @param redelivered Is this a redelivered message + */ +void DeferredGet::emit(Message &&message, uint64_t deliveryTag, bool redelivered) const +{ + // monitor the channel + Monitor monitor{ _channel }; + + // the channel is now synchronized + _channel->onSynchronized(); + + // simply execute the message callback + _messageCallback(std::move(message), deliveryTag, redelivered); + + // check if the channel is still valid + if (!monitor.valid()) return; + + // stop consuming now + _channel->uninstall({}); +} + /** * End of namespace */ diff --git a/src/exception.h b/src/exception.h index e4bb35a..53f1765 100644 --- a/src/exception.h +++ b/src/exception.h @@ -6,6 +6,16 @@ * @copyright 2014 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include + /** * Set up namespace */ @@ -23,7 +33,7 @@ protected: */ explicit Exception(const std::string &what) : runtime_error(what) {} }; - + /** * End of namespace */ diff --git a/src/extframe.h b/src/extframe.h index eb35c68..89ddda1 100644 --- a/src/extframe.h +++ b/src/extframe.h @@ -1,16 +1,27 @@ /** * ExtFrame.h - * - * Class describing an AMQP frame. A frame can be encoded into the AMQP - * wireframe format, so that it can be sent over an open socket, or it can be + * + * Class describing an AMQP frame. A frame can be encoded into the AMQP + * wireframe format, so that it can be sent over an open socket, or it can be * constructed from a buffer containing AMQP wireframe format. - * + * * The ExtFrame is the base class for all other frames, apart from the * protocol-header-frame - * + * * @copyright 2014 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "frame.h" +#include "../include/receivedframe.h" + /** * Set up namespace */ @@ -36,13 +47,13 @@ protected: /** * Constructor for an AMQP Frame - * - * The constructor is protected as you're not supposed - * + * + * The constructor is protected as you're not supposed + * * @param channel channel we're working on * @param size size of the payload */ - ExtFrame(uint16_t channel, uint32_t size) : _channel(channel), _size(size) {} + ExtFrame(uint16_t channel, uint32_t size) : _channel(channel), _size(size) {} /** * Constructor based on a received not-yet-recognized frame @@ -141,7 +152,7 @@ public: { // this is an exception throw ProtocolException("unimplemented frame type " + std::to_string(type())); - + // unreachable return false; } diff --git a/src/frame.h b/src/frame.h index 372ed77..636d189 100644 --- a/src/frame.h +++ b/src/frame.h @@ -7,11 +7,27 @@ * @copyright 2014 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "../include/outbuffer.h" +#include "protocolexception.h" + /** * Set up namespace */ namespace AMQP { +/** + * Forward declarations + */ +class ConnectionImpl; + /** * Class definition */ diff --git a/src/headerframe.h b/src/headerframe.h index 69bde42..1e813fd 100644 --- a/src/headerframe.h +++ b/src/headerframe.h @@ -4,6 +4,16 @@ * @copyright 2014, 2015 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "extframe.h" + /** * Set up namespace */ diff --git a/src/methodframe.h b/src/methodframe.h index eac62b5..67055e3 100644 --- a/src/methodframe.h +++ b/src/methodframe.h @@ -1,9 +1,19 @@ /** * Class describing an AMQP method frame - * + * * @copyright 2014 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "extframe.h" + /** * Set up namespace */ @@ -17,18 +27,18 @@ class MethodFrame : public ExtFrame protected: /** * Constructor for a methodFrame - * + * * @param channel channel we're working on * @param size size of the frame. */ MethodFrame(uint16_t channel, uint32_t size) : ExtFrame(channel, size + 4) {} // size of classID and methodID - + /** * Load a method from from a received frame * @param frame The received frame */ MethodFrame(ReceivedFrame &frame) : ExtFrame(frame) {} - + /** * Fill an output buffer * @param buffer @@ -37,7 +47,7 @@ protected: { // call base ExtFrame::fill(buffer); - + // add type buffer.add(classID()); buffer.add(methodID()); diff --git a/src/protocolexception.h b/src/protocolexception.h index 7f829bc..f90316f 100644 --- a/src/protocolexception.h +++ b/src/protocolexception.h @@ -7,6 +7,16 @@ * @copyright 2014 Copernica BV */ +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "exception.h" + /** * Set up namespace */