Only allocate a message and its data when a consumer wants to and allow consumers to receive the individual parts

This commit is contained in:
Martijn Otto 2016-06-23 14:42:50 +02:00
parent 2a0b0de409
commit 2c60151d31
43 changed files with 1244 additions and 344 deletions

2
.clang_complete Normal file
View File

@ -0,0 +1,2 @@
-std=c++11
-Wno-pragma-once-outside-header

View File

@ -9,6 +9,14 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "field.h"
#include "fieldproxy.h"
#include <vector>
#include <ostream>
/** /**
* Set up namespace * Set up namespace
*/ */
@ -222,6 +230,18 @@ public:
} }
}; };
/**
* Custom output stream operator
* @param stream
* @param field
* @return ostream
*/
inline std::ostream &operator<<(std::ostream &stream, const ArrayFieldProxy &field)
{
// get underlying field, and output that
return stream << field.get();
}
/** /**
* end namespace * end namespace
*/ */

View File

@ -13,11 +13,19 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <ostream>
#include "field.h"
#include "outbuffer.h"
#include "receivedframe.h"
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/** /**
* Class definition * Class definition
*/ */
@ -63,7 +71,7 @@ public:
{ {
_byte = frame.nextUint8(); _byte = frame.nextUint8();
} }
/** /**
* Copy constructor * Copy constructor
* @param that * @param that
@ -95,10 +103,10 @@ public:
{ {
// prefix // prefix
stream << "booleanset("; stream << "booleanset(";
// the members // the members
for (int i=0; i<8; i++) stream << (i == 0 ? "" : ",") << (get(i) ? 1 : 0); for (int i=0; i<8; i++) stream << (i == 0 ? "" : ",") << (get(i) ? 1 : 0);
// postfix // postfix
stream << ")"; stream << ")";
} }

View File

@ -11,14 +11,26 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <string>
#include <functional>
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Forward declarations
*/
class Message;
class MetaData;
/** /**
* All the callbacks that are supported * All the callbacks that are supported
* *
* When someone registers a callback function for certain events, it should * When someone registers a callback function for certain events, it should
* match one of the following signatures. * match one of the following signatures.
*/ */
@ -26,7 +38,11 @@ using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>; using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>; using FinalizeCallback = std::function<void()>;
using EmptyCallback = std::function<void()>; using EmptyCallback = std::function<void()>;
using BeginCallback = std::function<void()>;
using HeaderCallback = std::function<void(const MetaData &metaData)>;
using DataCallback = std::function<void(const char *data, size_t size)>;
using MessageCallback = std::function<void(Message &&message, uint64_t deliveryTag, bool redelivered)>; using MessageCallback = std::function<void(Message &&message, uint64_t deliveryTag, bool redelivered)>;
using CompleteCallback = std::function<void(uint64_t deliveryTag, bool redelivered)>;
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>; using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>; using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using SizeCallback = std::function<void(uint32_t messagecount)>; using SizeCallback = std::function<void(uint32_t messagecount)>;

View File

@ -13,6 +13,19 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "exchangetype.h"
#include "watchable.h"
#include "callbacks.h"
#include "outbuffer.h"
#include "deferred.h"
#include "monitor.h"
#include <memory>
#include <queue>
#include <map>
/** /**
* Set up namespace * Set up namespace
*/ */
@ -21,7 +34,20 @@ namespace AMQP {
/** /**
* Forward declarations * Forward declarations
*/ */
class DeferredConsumerBase;
class BasicDeliverFrame;
class DeferredConsumer;
class BasicGetOKFrame;
class ConsumedMessage; class ConsumedMessage;
class ConnectionImpl;
class DeferredDelete;
class DeferredCancel;
class DeferredQueue;
class DeferredGet;
class Connection;
class Envelope;
class Table;
class Frame;
/** /**
* Class definition * Class definition
@ -48,10 +74,10 @@ private:
ErrorCallback _errorCallback; ErrorCallback _errorCallback;
/** /**
* Callbacks for all consumers that are active * Handlers for all consumers that are active
* @var std::map<std::string,MessageCallback> * @var std::map<std::string,std::shared_ptr<DeferredConsumerBase>
*/ */
std::map<std::string,MessageCallback> _consumers; std::map<std::string,std::shared_ptr<DeferredConsumerBase>> _consumers;
/** /**
* Pointer to the oldest deferred result (the first one that is going * Pointer to the oldest deferred result (the first one that is going
@ -102,10 +128,10 @@ private:
bool _synchronous = false; bool _synchronous = false;
/** /**
* The message that is now being received * The current consumer receiving a message
* @var ConsumedMessage * @var std::shared_ptr<DeferredConsumerBase>
*/ */
ConsumedMessage *_message = nullptr; std::shared_ptr<DeferredConsumerBase> _consumer;
/** /**
* Attach the connection * Attach the connection
@ -135,7 +161,7 @@ protected:
* a friend. By doing this we ensure that nobody can instantiate this * a friend. By doing this we ensure that nobody can instantiate this
* object, and that it can thus only be used inside the library. * object, and that it can thus only be used inside the library.
*/ */
ChannelImpl() {} ChannelImpl();
public: public:
/** /**
@ -633,17 +659,15 @@ public:
void reportError(const char *message, bool notifyhandler = true); void reportError(const char *message, bool notifyhandler = true);
/** /**
* Install a consumer callback * Install a consumer
*
* @param consumertag The consumer tag * @param consumertag The consumer tag
* @param callback The callback to be called * @param consumer The consumer handler
*/ */
void install(const std::string &consumertag, const MessageCallback &callback) void install(std::string consumertag, const std::shared_ptr<DeferredConsumerBase> &consumer)
{ {
// install the callback if it is assigned // install the consumer handler
if (callback) _consumers[consumertag] = callback; _consumers[consumertag] = consumer;
// otherwise we erase the previously set callback
else _consumers.erase(consumertag);
} }
/** /**
@ -657,26 +681,23 @@ public:
} }
/** /**
* Report that a message was received * Process incoming delivery
*
* @param frame The frame to process
*/ */
void reportMessage(); void process(BasicDeliverFrame &frame);
/** /**
* Create an incoming message * Retrieve the current consumer handler
* @param frame *
* @return ConsumedMessage * @return The handler responsible for the current message
*/ */
ConsumedMessage *message(const BasicDeliverFrame &frame); DeferredConsumerBase *consumer();
ConsumedMessage *message(const BasicGetOKFrame &frame);
/** /**
* Retrieve the current incoming message * Mark the current consumer as done
* @return ConsumedMessage
*/ */
ConsumedMessage *message() void complete();
{
return _message;
}
/** /**
* The channel class is its friend, thus can it instantiate this object * The channel class is its friend, thus can it instantiate this object

View File

@ -14,11 +14,22 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <cstdint>
#include <stddef.h>
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Forward declarations
*/
class Connection;
/** /**
* Class definition * Class definition
*/ */

View File

@ -13,11 +13,31 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "watchable.h"
#include "connectionhandler.h"
#include "channelimpl.h"
#include "outbuffer.h"
#include "monitor.h"
#include "login.h"
#include <unordered_map>
#include <memory>
#include <queue>
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Forward declarations
*/
class Connection;
class Buffer;
class Frame;
/** /**
* Class definition * Class definition
*/ */
@ -78,7 +98,7 @@ protected:
* @var uint32_t * @var uint32_t
*/ */
uint32_t _maxFrame = 4096; uint32_t _maxFrame = 4096;
/** /**
* Number of expected bytes that will hold the next incoming frame * Number of expected bytes that will hold the next incoming frame
* We start with seven because that is the header of a frame * We start with seven because that is the header of a frame
@ -245,7 +265,7 @@ public:
// 8 bytes for header and end-of-frame byte // 8 bytes for header and end-of-frame byte
return _maxFrame - 8; return _maxFrame - 8;
} }
/** /**
* The number of bytes that can best be passed to the next call to the parse() method * The number of bytes that can best be passed to the next call to the parse() method
* @return uint32_t * @return uint32_t
@ -334,7 +354,7 @@ public:
{ {
// set connection state to closed // set connection state to closed
_state = state_closed; _state = state_closed;
// monitor because every callback could invalidate the connection // monitor because every callback could invalidate the connection
Monitor monitor(this); Monitor monitor(this);

View File

@ -1,6 +1,6 @@
/** /**
* Decimal field type for AMQP * Decimal field type for AMQP
* *
* @copyright 2014, 2015 Copernica BV * @copyright 2014, 2015 Copernica BV
*/ */
@ -9,6 +9,15 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <cmath>
#include <ostream>
#include "field.h"
#include "outbuffer.h"
#include "receivedframe.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -44,7 +53,7 @@ private:
* The number without the decimals * The number without the decimals
*/ */
uint32_t _number; uint32_t _number;
protected: protected:
/** /**
* Write encoded payload to the given buffer. * Write encoded payload to the given buffer.
@ -67,7 +76,7 @@ public:
_places(places), _places(places),
_number(number) _number(number)
{} {}
/** /**
* Construct based on incoming data * Construct based on incoming data
* @param frame * @param frame
@ -77,7 +86,7 @@ public:
_places = frame.nextUint8(); _places = frame.nextUint8();
_number = frame.nextUint32(); _number = frame.nextUint32();
} }
/** /**
* Destructor * Destructor
*/ */
@ -161,7 +170,7 @@ public:
* Check for inequality between this and another DecimalField * Check for inequality between this and another DecimalField
* *
* @param value value to be checked for inequality * @param value value to be checked for inequality
* @return boolean whether values are inequal * @return boolean whether values are inequal
*/ */
bool operator!=(const DecimalField& value) const bool operator!=(const DecimalField& value) const
{ {
@ -199,7 +208,7 @@ public:
/** /**
* Return the DecimalField * Return the DecimalField
* To preserve precision DecimalField is returned, containing the number and places. * To preserve precision DecimalField is returned, containing the number and places.
* @return return DecimalField * @return return DecimalField
*/ */
DecimalField value() const DecimalField value() const
{ {

View File

@ -13,6 +13,12 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <memory>
#include "callbacks.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -105,12 +111,25 @@ protected:
return reportSuccess(); return reportSuccess();
} }
/**
* Report success for a get operation
*
* @param messagecount Number of messages left in the queue
* @param deliveryTag Delivery tag of the message coming in
* @param redelivered Was the message redelivered?
*/
virtual const std::shared_ptr<Deferred> &reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered)
{
// this is the same as a regular success message
return reportSuccess();
}
/** /**
* Report success for frames that report cancel operations * Report success for frames that report cancel operations
* @param name Consumer tag that is cancelled * @param name Consumer tag that is cancelled
* @return Deferred * @return Deferred
*/ */
virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) const virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name)
{ {
// this is the same as a regular success message // this is the same as a regular success message
return reportSuccess(); return reportSuccess();

View File

@ -41,7 +41,7 @@ private:
* @param name Consumer tag that is cancelled * @param name Consumer tag that is cancelled
* @return Deferred * @return Deferred
*/ */
virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) const override; virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) override;
/** /**
* The channel implementation may call our * The channel implementation may call our

View File

@ -11,6 +11,11 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "deferredconsumerbase.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -19,34 +24,30 @@ namespace AMQP {
/** /**
* We extend from the default deferred and add extra functionality * We extend from the default deferred and add extra functionality
*/ */
class DeferredConsumer : public Deferred class DeferredConsumer : public DeferredConsumerBase
{ {
private: private:
/**
* The channel to which the consumer is linked
* @var ChannelImpl
*/
ChannelImpl *_channel;
/** /**
* Callback to execute when consumption has started * Callback to execute when consumption has started
* @var ConsumeCallback * @var ConsumeCallback
*/ */
ConsumeCallback _consumeCallback; ConsumeCallback _consumeCallback;
/**
* Callback for incoming messages
* @var MessageCallback
*/
MessageCallback _messageCallback;
/** /**
* Report success for frames that report start consumer operations * Report success for frames that report start consumer operations
* @param name Consumer tag that is started * @param name Consumer tag that is started
* @return Deferred * @return Deferred
*/ */
virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) const override; virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) override;
/**
* Emit a message
*
* @param message The message to emit
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
void emit(Message &&message, uint64_t deliveryTag, bool redelivered) const override;
/** /**
* The channel implementation may call our * The channel implementation may call our
@ -68,7 +69,7 @@ public:
* @param failed are we already failed? * @param failed are we already failed?
*/ */
DeferredConsumer(ChannelImpl *channel, bool failed = false) : DeferredConsumer(ChannelImpl *channel, bool failed = false) :
Deferred(failed), _channel(channel) {} DeferredConsumerBase(failed, channel) {}
public: public:
/** /**

View File

@ -0,0 +1,252 @@
/**
* deferredconsumerbase.h
*
* Base class for the deferred consumer and the
* deferred get.
*
* @copyright 2016 Copernica B.V.
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "deferred.h"
#include "stack_ptr.h"
#include "message.h"
/**
* Start namespace
*/
namespace AMQP {
/**
* Forward declarations
*/
class BasicDeliverFrame;
class BasicHeaderFrame;
class BodyFrame;
/**
* Base class for deferred consumers
*/
class DeferredConsumerBase :
public Deferred,
public std::enable_shared_from_this<DeferredConsumerBase>
{
private:
/**
* Size of the body of the current message
* @var uint64_t
*/
uint64_t _bodySize = 0;
/**
* Process a delivery frame
*
* @param frame The frame to process
*/
void process(BasicDeliverFrame &frame);
/**
* Process the message headers
*
* @param frame The frame to process
*/
void process(BasicHeaderFrame &frame);
/**
* Process the message data
*
* @param frame The frame to process
*/
void process(BodyFrame &frame);
/**
* Indicate that a message was done
*/
void complete();
/**
* Emit a message
*
* @param message The message to emit
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
virtual void emit(Message &&message, uint64_t deliveryTag, bool redelivered) const = 0;
/**
* Frames may be processed
*/
friend class ChannelImpl;
friend class BasicDeliverFrame;
friend class BasicHeaderFrame;
friend class BodyFrame;
protected:
/**
* The delivery tag for the current message
* @var uint64_t
*/
uint64_t _deliveryTag = 0;
/**
* Is this a redelivered message
* @var bool
*/
bool _redelivered = false;
/**
* The channel to which the consumer is linked
* @var ChannelImpl
*/
ChannelImpl *_channel;
/**
* Callback for new message
* @var BeginCallback
*/
BeginCallback _beginCallback;
/**
* Callback for incoming headers
* @var HeaderCallback
*/
HeaderCallback _headerCallback;
/**
* Callback for when a chunk of data comes in
* @var DataCallback
*/
DataCallback _dataCallback;
/**
* Callback for incoming messages
* @var MessageCallback
*/
MessageCallback _messageCallback;
/**
* Callback for when a message was complete finished
* @var CompleteCallback
*/
CompleteCallback _completeCallback;
/**
* The message that we are currently receiving
* @var stack_ptr<Message>
*/
stack_ptr<Message> _message;
/**
* Constructor
*
* @param failed Have we already failed?
* @param channel The channel we are consuming on
*/
DeferredConsumerBase(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {}
public:
/**
* Register the function to be called when a new message is expected
*
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredConsumerBase &onBegin(const BeginCallback &callback)
{
// store callback
_beginCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function to be called when message headers come in
*
* @param callback The callback to invoke for message headers
* @return Same object for chaining
*/
DeferredConsumerBase &onHeaders(const HeaderCallback &callback)
{
// store callback
_headerCallback = callback;
// allow chaining
return *this;
}
/**
* Register the function to be called when a chunk of data comes in
*
* Note that this function may be called zero, one or multiple times
* for each incoming message depending on the size of the message data.
*
* If you install this callback you very likely also want to install
* the onComplete callback so you know when the last data part was
* received.
*
* @param callback The callback to invoke for chunks of message data
* @return Same object for chaining
*/
DeferredConsumerBase &onData(const DataCallback &callback)
{
// store callback
_dataCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function to be called when a message arrives
* This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it
* @param callback the callback to execute
*/
DeferredConsumerBase &onReceived(const MessageCallback &callback)
{
// store callback
_messageCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function to be called when a message arrives
* This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it
* @param callback the callback to execute
*/
DeferredConsumerBase &onMessage(const MessageCallback &callback)
{
// store callback
_messageCallback = callback;
// allow chaining
return *this;
}
/**
* Register a funtion to be called when a message was completely received
*
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredConsumerBase& onComplete(const CompleteCallback &callback)
{
// store callback
_completeCallback = callback;
// allow chaining
return *this;
}
};
/**
* End namespace
*/
}

View File

@ -10,6 +10,11 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "deferredconsumerbase.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -17,32 +22,20 @@ namespace AMQP {
/** /**
* Class definition * Class definition
* *
* This class implements the 'shared_from_this' functionality, because * This class implements the 'shared_from_this' functionality, because
* it grabs a self-pointer when the callback is running, otherwise the onFinalize() * it grabs a self-pointer when the callback is running, otherwise the onFinalize()
* is called before the actual message is consumed. * is called before the actual message is consumed.
*/ */
class DeferredGet : public Deferred, public std::enable_shared_from_this<DeferredGet> class DeferredGet : public DeferredConsumerBase
{ {
private: private:
/**
* Pointer to the channel
* @var ChannelImpl
*/
ChannelImpl *_channel;
/**
* Callback for incoming messages
* @var MessageCallback
*/
MessageCallback _messageCallback;
/** /**
* Callback in case the queue is empty * Callback in case the queue is empty
* @var EmptyCallback * @var EmptyCallback
*/ */
EmptyCallback _emptyCallback; EmptyCallback _emptyCallback;
/** /**
* Callback with the number of messages still in the queue * Callback with the number of messages still in the queue
* @var SizeCallback * @var SizeCallback
@ -50,18 +43,29 @@ private:
SizeCallback _sizeCallback; SizeCallback _sizeCallback;
/** /**
* Report success when a message is indeed expected * Report success for a get operation
* @param count number of messages in the queue *
* @return Deferred * @param messagecount Number of messages left in the queue
* @param deliveryTag Delivery tag of the message coming in
* @param redelivered Was the message redelivered?
*/ */
virtual const std::shared_ptr<Deferred> &reportSuccess(uint32_t messagecount) const override; const std::shared_ptr<Deferred> &reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered) override;
/** /**
* Report success when queue was empty * Report success when queue was empty
* @return Deferred * @return Deferred
*/ */
virtual const std::shared_ptr<Deferred> &reportSuccess() const override; virtual const std::shared_ptr<Deferred> &reportSuccess() const override;
/**
* Emit a message
*
* @param message The message to emit
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
void emit(Message &&message, uint64_t deliveryTag, bool redelivered) const override;
/** /**
* The channel implementation may call our * The channel implementation may call our
* private members and construct us * private members and construct us
@ -77,12 +81,12 @@ public:
* Note: this constructor _should_ be protected, but because make_shared * Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all, * will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code. * because the work-around would result in not-so-easy-to-read code.
* *
* @param channel the channel implementation * @param channel the channel implementation
* @param failed are we already failed? * @param failed are we already failed?
*/ */
DeferredGet(ChannelImpl *channel, bool failed = false) : DeferredGet(ChannelImpl *channel, bool failed = false) :
Deferred(failed), _channel(channel) {} DeferredConsumerBase(failed, channel) {}
public: public:
/** /**
@ -94,39 +98,11 @@ public:
{ {
// store the callback // store the callback
_messageCallback = callback; _messageCallback = callback;
// allow chaining // allow chaining
return *this; return *this;
} }
/**
* Register a function to be called when a message arrives
* This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it
* @param callback the callback to execute
*/
DeferredGet &onReceived(const MessageCallback &callback)
{
// store callback
_messageCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function to be called when a message arrives
* This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it
* @param callback the callback to execute
*/
DeferredGet &onMessage(const MessageCallback &callback)
{
// store callback
_messageCallback = callback;
// allow chaining
return *this;
}
/** /**
* Register a function to be called if no message could be fetched * Register a function to be called if no message could be fetched
* @param callback the callback to execute * @param callback the callback to execute
@ -135,11 +111,11 @@ public:
{ {
// store callback // store callback
_emptyCallback = callback; _emptyCallback = callback;
// allow chaining // allow chaining
return *this; return *this;
} }
/** /**
* Register a function to be called when size information is known * Register a function to be called when size information is known
* @param callback the callback to execute * @param callback the callback to execute
@ -148,12 +124,12 @@ public:
{ {
// store callback // store callback
_sizeCallback = callback; _sizeCallback = callback;
// allow chaining // allow chaining
return *this; return *this;
} }
}; };
/** /**
* End of namespace * End of namespace
*/ */

View File

@ -12,6 +12,11 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "metadata.h"
/** /**
* Set up namespace * Set up namespace
*/ */

View File

@ -9,11 +9,24 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <memory>
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Forward declarations
*/
class ReceivedFrame;
class OutBuffer;
class Array;
class Table;
/** /**
* Base field class * Base field class
* *

View File

@ -1,7 +1,7 @@
/** /**
* Field proxy. Returned by the table. Can be casted to the * Field proxy. Returned by the table. Can be casted to the
* relevant native type (std::string or numeric) * relevant native type (std::string or numeric)
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -10,11 +10,28 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <cstdint>
#include <string>
#include "stringfield.h"
#include "booleanset.h"
#include "decimalfield.h"
#include "numericfield.h"
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Forward declarations
*/
class Table;
class Array;
class Field;
/** /**
* Class implementation * Class implementation
*/ */
@ -200,7 +217,7 @@ public:
// cast to a string // cast to a string
return operator=(std::string(value)); return operator=(std::string(value));
} }
/** /**
* Assign an array value * Assign an array value
* @param value * @param value
@ -224,7 +241,7 @@ public:
_source->set(_index, value); _source->set(_index, value);
return *this; return *this;
} }
/** /**
* Get the underlying field * Get the underlying field
* @return Field * @return Field
@ -247,32 +264,8 @@ public:
}; };
// define types for array- and table-based field proxy // define types for array- and table-based field proxy
typedef FieldProxy<Table, std::string> AssociativeFieldProxy; using AssociativeFieldProxy = FieldProxy<Table, std::string>;
typedef FieldProxy<Array, uint8_t> ArrayFieldProxy; using ArrayFieldProxy = FieldProxy<Array, uint8_t>;
/**
* Custom output stream operator
* @param stream
* @param field
* @return ostream
*/
inline std::ostream &operator<<(std::ostream &stream, const AssociativeFieldProxy &field)
{
// get underlying field, and output that
return stream << field.get();
}
/**
* Custom output stream operator
* @param stream
* @param field
* @return ostream
*/
inline std::ostream &operator<<(std::ostream &stream, const ArrayFieldProxy &field)
{
// get underlying field, and output that
return stream << field.get();
}
/** /**
* end namespace * end namespace

View File

@ -11,6 +11,11 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <string>
/** /**
* Set up namespace * Set up namespace
*/ */
@ -33,7 +38,7 @@ private:
* @var string * @var string
*/ */
std::string _password; std::string _password;
public: public:
/** /**
@ -53,7 +58,7 @@ public:
* Destructor * Destructor
*/ */
virtual ~Login() {} virtual ~Login() {}
/** /**
* Retrieve the user name * Retrieve the user name
* @return string * @return string
@ -62,7 +67,7 @@ public:
{ {
return _user; return _user;
} }
/** /**
* Retrieve the password * Retrieve the password
* @return string * @return string
@ -71,7 +76,7 @@ public:
{ {
return _password; return _password;
} }
/** /**
* String representation in SASL PLAIN mode * String representation in SASL PLAIN mode
* @return string * @return string
@ -80,7 +85,7 @@ public:
{ {
// we need an initial string // we need an initial string
std::string result("\0", 1); std::string result("\0", 1);
// append other elements // append other elements
return result.append(_user).append("\0",1).append(_password); return result.append(_user).append("\0",1).append(_password);
} }

View File

@ -15,11 +15,22 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "envelope.h"
#include <limits>
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Forward declarations
*/
class DeferredConsumerBase;
/** /**
* Class definition * Class definition
*/ */
@ -38,10 +49,78 @@ protected:
*/ */
std::string _routingKey; std::string _routingKey;
protected:
/** /**
* The constructor is protected to ensure that endusers can not * We are an open book to the consumer handler
* instantiate a message */
friend class DeferredConsumerBase;
/**
* Set the body size
* This field is set when the header is received
* @param uint64_t
*/
void setBodySize(uint64_t size)
{
// safety-check: on 32-bit platforms size_t is obviously also a 32-bit dword
// in which case casting the uint64_t to a size_t could result in truncation
// here we check whether the given size fits inside a size_t
if (std::numeric_limits<size_t>::max() < size) throw std::runtime_error("message is too big for this system");
// store the new size
_bodySize = size;
}
/**
* Append data
* @param buffer incoming data
* @param size size of the data
* @return bool true if the message is now complete
*/
bool append(const char *buffer, uint64_t size)
{
// is this the only data, and also direct complete?
if (_str.empty() && size >= _bodySize)
{
// we have everything
_body = buffer;
// done
return true;
}
else
{
// it does not fit yet, do we have to allocate
if (!_body)
{
// allocate memory in the string
_str.reserve(static_cast<size_t>(_bodySize));
// we now use the data buffer inside the string
_body = _str.data();
}
// safety-check: if the given size exceeds the given message body size
// we truncate it, this should never happen because it indicates a bug
// in the AMQP server implementation, should we report this?
size = std::min(size, _bodySize - _str.size());
// we can not safely append the data to the string, it
// will not exceed the reserved size so it is guaranteed
// not to change the data pointer, we can just leave that
// @todo this is not always necessary; instead, we can refrain from
// allocating this buffer entirely and just insert it into the message
// directly.
_str.append(buffer, static_cast<size_t>(size));
// if the string is filled with the given number of characters we are done now
return _str.size() >= _bodySize;
}
}
public:
/**
* Constructor
*
* @param exchange * @param exchange
* @param routingKey * @param routingKey
*/ */
@ -49,7 +128,6 @@ protected:
Envelope(nullptr, 0), _exchange(exchange), _routingKey(routingKey) Envelope(nullptr, 0), _exchange(exchange), _routingKey(routingKey)
{} {}
public:
/** /**
* Copy constructor * Copy constructor
@ -76,7 +154,7 @@ public:
/** /**
* Destructor * Destructor
*/ */
virtual ~Message() {} virtual ~Message() = default;
/** /**
* Assignment operator * Assignment operator

View File

@ -12,6 +12,13 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "booleanset.h"
#include "stringfield.h"
#include "table.h"
/** /**
* Set up namespace * Set up namespace
*/ */

View File

@ -16,6 +16,11 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "watchable.h"
/** /**
* Set up namespace * Set up namespace
*/ */

View File

@ -9,6 +9,16 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <memory>
#include <type_traits>
#include "receivedframe.h"
#include "outbuffer.h"
#include "field.h"
#include <ostream>
/** /**
* Set up namespace * Set up namespace
*/ */

View File

@ -12,6 +12,12 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <memory>
#include <cstring>
/** /**
* Set up namespace * Set up namespace
*/ */
@ -93,7 +99,7 @@ public:
/** /**
* Destructor * Destructor
*/ */
virtual ~OutBuffer() {} virtual ~OutBuffer() = default;
/** /**
* Get access to the internal buffer * Get access to the internal buffer
@ -256,4 +262,3 @@ public:
* End of namespace * End of namespace
*/ */
} }

View File

@ -1,7 +1,7 @@
/** /**
* ReceivedFrame.h * ReceivedFrame.h
* *
* The received frame class is a wrapper around a data buffer, it tries to * The received frame class is a wrapper around a data buffer, it tries to
* find out if the buffer is big enough to contain an entire frame, and * find out if the buffer is big enough to contain an entire frame, and
* it will try to recognize the frame type in the buffer * it will try to recognize the frame type in the buffer
* *
@ -16,11 +16,22 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <cstdint>
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Forward declarations
*/
class Buffer;
class ConnectionImpl;
/** /**
* Class definition * Class definition
*/ */
@ -113,7 +124,7 @@ private:
* @return bool * @return bool
*/ */
bool processHeaderFrame(ConnectionImpl *connection); bool processHeaderFrame(ConnectionImpl *connection);
public: public:
/** /**
@ -171,56 +182,56 @@ public:
/** /**
* Read the next uint8_t from the buffer * Read the next uint8_t from the buffer
* *
* @return uint8_t value read * @return uint8_t value read
*/ */
uint8_t nextUint8(); uint8_t nextUint8();
/** /**
* Read the next int8_t from the buffer * Read the next int8_t from the buffer
* *
* @return int8_t value read * @return int8_t value read
*/ */
int8_t nextInt8(); int8_t nextInt8();
/** /**
* Read the next uint16_t from the buffer * Read the next uint16_t from the buffer
* *
* @return uint16_t value read * @return uint16_t value read
*/ */
uint16_t nextUint16(); uint16_t nextUint16();
/** /**
* Read the next int16_t from the buffer * Read the next int16_t from the buffer
* *
* @return int16_t value read * @return int16_t value read
*/ */
int16_t nextInt16(); int16_t nextInt16();
/** /**
* Read the next uint32_t from the buffer * Read the next uint32_t from the buffer
* *
* @return uint32_t value read * @return uint32_t value read
*/ */
uint32_t nextUint32(); uint32_t nextUint32();
/** /**
* Read the next int32_t from the buffer * Read the next int32_t from the buffer
* *
* @return int32_t value read * @return int32_t value read
*/ */
int32_t nextInt32(); int32_t nextInt32();
/** /**
* Read the next uint64_t from the buffer * Read the next uint64_t from the buffer
* *
* @return uint64_t value read * @return uint64_t value read
*/ */
uint64_t nextUint64(); uint64_t nextUint64();
/** /**
* Read the next int64_t from the buffer * Read the next int64_t from the buffer
* *
* @return int64_t value read * @return int64_t value read
*/ */
int64_t nextInt64(); int64_t nextInt64();
@ -228,7 +239,7 @@ public:
/** /**
* Read a float from the buffer * Read a float from the buffer
* *
* @return float float read from buffer. * @return float float read from buffer.
*/ */
float nextFloat(); float nextFloat();
@ -248,11 +259,11 @@ public:
/** /**
* Process the received frame * Process the received frame
* *
* If this method returns false, it means that the frame was not processed, * If this method returns false, it means that the frame was not processed,
* because it was an unrecognized frame. This does not mean that the * because it was an unrecognized frame. This does not mean that the
* connection is now in an invalid state however. * connection is now in an invalid state however.
* *
* @param connection the connection over which the data was received * @param connection the connection over which the data was received
* @return bool was the frame fully processed * @return bool was the frame fully processed
* @internal * @internal
@ -271,4 +282,3 @@ public:
* End of namespace * End of namespace
*/ */
} }

143
include/stack_ptr.h Normal file
View File

@ -0,0 +1,143 @@
/**
* stack_ptr.h
*
* Implementation of an object that behaves like a
* smart pointer but is actually managed on the stack
*
* @copyright 2016 Copernica B.V.
*/
/**
* Dependencies
*/
#include <type_traits>
#include <utility>
/**
* Set up namespace
*/
namespace AMQP {
/**
* Stack-based smart pointer
*/
template <typename T>
class stack_ptr
{
private:
/**
* Storage for the object
* @var std::aligned_storage<sizeof(T), alignof(T)>
*/
std::aligned_storage<sizeof(T), alignof(T)> _data;
/**
* Is the pointer initialized?
* @var boolean
*/
bool _initialized = false;
public:
/**
* Constructor
*/
stack_ptr() = default;
/**
* Copy and moving is disabled
*
* @param that The stack_ptr we refuse to copy/move
*/
stack_ptr(const stack_ptr &that) = delete;
stack_ptr(stack_ptr &&that) = delete;
/**
* Destructor
*/
~stack_ptr()
{
// reset the pointer
reset();
}
/**
* Reset the pointer
*/
void reset()
{
// are we initialized?
if (!_initialized) return;
// destroy the object
reinterpret_cast<T*>(&_data)->~T();
// the object is not currently initialized
_initialized = false;
}
/**
* Construct the object
*
* @param ... Zero or more constructor arguments for T
*/
template <typename... Arguments>
void construct(Arguments&&... parameters)
{
// first reset the current object
reset();
// initialize new object
new (&_data) T(std::forward<Arguments>(parameters)...);
}
/**
* Is the object initialized?
*
* @return Are we currently managing an object?
*/
operator bool() const
{
// are we initialized with an object?
return _initialized;
}
/**
* Retrieve a pointer to the object
*
* @return Pointer to the object or nullptr if no object is managed
*/
T *get() const
{
// do we have a managed object
if (!_initialized) return nullptr;
// return pointer to the managed object
return const_cast<T*>(reinterpret_cast<const T*>(&_data));
}
/**
* Retrieve a reference to the object
*
* @return Reference to the object, undefined if no object is managed
*/
T &operator*() const
{
// dereference the pointer
return *operator->();
}
/**
* Retrieve a pointer to the object
*
* @return Pointer to the object, undefined if no object is managed
*/
T *operator->() const
{
// return pointer to the managed object
return const_cast<T*>(reinterpret_cast<const T*>(&_data));
}
};
/**
* End namespace
*/
}

View File

@ -9,6 +9,14 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "field.h"
#include "outbuffer.h"
#include "numericfield.h"
#include "receivedframe.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -63,7 +71,7 @@ public:
/** /**
* Clean up memory used * Clean up memory used
*/ */
virtual ~StringField() {} virtual ~StringField() = default;
/** /**
* Create a new instance of this object * Create a new instance of this object

View File

@ -9,6 +9,14 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include "field.h"
#include "fieldproxy.h"
#include <vector>
#include <map>
/** /**
* Set up namespace * Set up namespace
*/ */
@ -112,7 +120,7 @@ public:
// allow chaining // allow chaining
return *this; return *this;
} }
/** /**
* Aliases for setting values * Aliases for setting values
* @param name * @param name
@ -246,6 +254,18 @@ public:
} }
}; };
/**
* Custom output stream operator
* @param stream
* @param field
* @return ostream
*/
inline std::ostream &operator<<(std::ostream &stream, const AssociativeFieldProxy &field)
{
// get underlying field, and output that
return stream << field.get();
}
/** /**
* end namespace * end namespace
*/ */

View File

@ -12,11 +12,22 @@
*/ */
#pragma once #pragma once
/**
* Dependencies
*/
#include <vector>
#include <algorithm>
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Forward declarations
*/
class Monitor;
/** /**
* Class definition * Class definition
*/ */
@ -38,7 +49,7 @@ private:
// add to the vector // add to the vector
_monitors.push_back(monitor); _monitors.push_back(monitor);
} }
/** /**
* Remove a monitor * Remove a monitor
* @param monitor * @param monitor
@ -47,7 +58,7 @@ private:
{ {
// put the monitor at the end of the vector // put the monitor at the end of the vector
auto iter = std::remove(_monitors.begin(), _monitors.end(), monitor); auto iter = std::remove(_monitors.begin(), _monitors.end(), monitor);
// make the vector smaller // make the vector smaller
_monitors.erase(iter, _monitors.end()); _monitors.erase(iter, _monitors.end());
} }
@ -57,12 +68,12 @@ public:
* Destructor * Destructor
*/ */
virtual ~Watchable(); virtual ~Watchable();
/** /**
* Only a monitor has full access * Only a monitor has full access
*/ */
friend class Monitor; friend class Monitor;
}; };
/** /**
* End of namespace * End of namespace

View File

@ -1,9 +1,22 @@
/** /**
* Class describing a basic deliver frame * Class describing a basic deliver frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "basicframe.h"
#include "../include/stringfield.h"
#include "../include/booleanset.h"
#include "../include/connectionimpl.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -12,7 +25,7 @@ namespace AMQP{
/** /**
* Class implementation * Class implementation
*/ */
class BasicDeliverFrame : public BasicFrame class BasicDeliverFrame : public BasicFrame
{ {
private: private:
/** /**
@ -70,7 +83,7 @@ public:
* @param consumerTag identifier for the consumer, valid within current channel * @param consumerTag identifier for the consumer, valid within current channel
* @param deliveryTag server-assigned and channel specific delivery tag * @param deliveryTag server-assigned and channel specific delivery tag
* @param redelivered indicates whether the message has been previously delivered to this (or another) client * @param redelivered indicates whether the message has been previously delivered to this (or another) client
* @param exchange name of exchange to publish to * @param exchange name of exchange to publish to
* @param routingKey message routing key * @param routingKey message routing key
*/ */
BasicDeliverFrame(uint16_t channel, const std::string& consumerTag, uint64_t deliveryTag, bool redelivered = false, const std::string& exchange = "", const std::string& routingKey = "") : BasicDeliverFrame(uint16_t channel, const std::string& consumerTag, uint64_t deliveryTag, bool redelivered = false, const std::string& exchange = "", const std::string& routingKey = "") :
@ -86,7 +99,7 @@ public:
/** /**
* Construct a basic deliver frame from a received frame * Construct a basic deliver frame from a received frame
* *
* @param frame received frame * @param frame received frame
*/ */
BasicDeliverFrame(ReceivedFrame &frame) : BasicDeliverFrame(ReceivedFrame &frame) :
BasicFrame(frame), BasicFrame(frame),
@ -176,13 +189,13 @@ public:
{ {
// we need the appropriate channel // we need the appropriate channel
auto channel = connection->channel(this->channel()); auto channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if (!channel) return false; if (!channel) return false;
// construct the message // construct the message
channel->message(*this); channel->process(*this);
// done // done
return true; return true;
} }

View File

@ -1,9 +1,19 @@
/** /**
* Class describing an AMQP basic frame * Class describing an AMQP basic frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "methodframe.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -35,7 +45,7 @@ public:
*/ */
virtual ~BasicFrame() {} virtual ~BasicFrame() {}
/** /**
* Class id * Class id
* @return uint16_t * @return uint16_t
*/ */

View File

@ -1,6 +1,6 @@
/** /**
* Class describing a basic get ok frame * Class describing a basic get ok frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -12,7 +12,7 @@ namespace AMQP{
/** /**
* Class implementation * Class implementation
*/ */
class BasicGetOKFrame : public BasicFrame class BasicGetOKFrame : public BasicFrame
{ {
private: private:
/** /**
@ -71,7 +71,7 @@ public:
* @param channel channel we're working on * @param channel channel we're working on
* @param deliveryTag server-assigned and channel specific delivery tag * @param deliveryTag server-assigned and channel specific delivery tag
* @param redelivered indicates whether the message has been previously delivered to this (or another) client * @param redelivered indicates whether the message has been previously delivered to this (or another) client
* @param exchange name of exchange to publish to * @param exchange name of exchange to publish to
* @param routingKey message routing key * @param routingKey message routing key
* @param messageCount number of messages in the queue * @param messageCount number of messages in the queue
*/ */
@ -166,20 +166,16 @@ public:
{ {
// we need the appropriate channel // we need the appropriate channel
auto channel = connection->channel(this->channel()); auto channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if (!channel) return false; if (!channel) return false;
// report (if this function returns false, it means that the channel // report success for the get operation
// object no longer is valid) channel->reportSuccess(messageCount(), deliveryTag(), redelivered());
if (!channel->reportSuccess(_messageCount)) return true;
// construct the message
channel->message(*this);
// notice that the channel is not yet synchronized here, because // notice that the channel is not yet synchronized here, because
// we first have to receive the entire body // we first have to receive the entire body
// done // done
return true; return true;
} }

View File

@ -1,9 +1,23 @@
/** /**
* Class describing an AMQP basic header frame * Class describing an AMQP basic header frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "headerframe.h"
#include "../include/metadata.h"
#include "../include/envelope.h"
#include "../include/connectionimpl.h"
#include "../include/deferredconsumerbase.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -57,7 +71,7 @@ public:
* Construct an empty basic header frame * Construct an empty basic header frame
* *
* All options are set using setter functions. * All options are set using setter functions.
* *
* @param channel channel we're working on * @param channel channel we're working on
* @param envelope the envelope * @param envelope the envelope
*/ */
@ -71,17 +85,17 @@ public:
* Constructor to parse incoming frame * Constructor to parse incoming frame
* @param frame * @param frame
*/ */
BasicHeaderFrame(ReceivedFrame &frame) : BasicHeaderFrame(ReceivedFrame &frame) :
HeaderFrame(frame), HeaderFrame(frame),
_weight(frame.nextUint16()), _weight(frame.nextUint16()),
_bodySize(frame.nextUint64()), _bodySize(frame.nextUint64()),
_metadata(frame) _metadata(frame)
{} {}
/** /**
* Destructor * Destructor
*/ */
virtual ~BasicHeaderFrame() {} virtual ~BasicHeaderFrame() = default;
/** /**
* Size of the body * Size of the body
@ -92,6 +106,16 @@ public:
return _bodySize; return _bodySize;
} }
/**
* The metadata sent in this frame
*
* @return All the metadata for this message
*/
const MetaData &metaData() const
{
return _metadata;
}
/** /**
* The class ID * The class ID
* @return uint16_t * @return uint16_t
@ -100,7 +124,7 @@ public:
{ {
return 60; return 60;
} }
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
@ -110,23 +134,13 @@ public:
{ {
// we need the appropriate channel // we need the appropriate channel
auto channel = connection->channel(this->channel()); auto channel = connection->channel(this->channel());
// channel does not exist // check if we have a valid channel and consumer
if (!channel) return false; if (!channel || !channel->consumer()) return false;
// is there a current message? // the channel can process the frame
MessageImpl *message = channel->message(); channel->consumer()->process(*this);
if (!message) return false;
// store size
message->setBodySize(_bodySize);
// and copy the meta data
message->set(_metadata);
// for empty bodies we're ready now
if (_bodySize == 0) channel->reportMessage();
// done // done
return true; return true;
} }

View File

@ -4,6 +4,18 @@
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "extframe.h"
#include "../include/connectionimpl.h"
#include "../include/deferredconsumerbase.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -94,18 +106,11 @@ public:
// we need the appropriate channel // we need the appropriate channel
auto channel = connection->channel(this->channel()); auto channel = connection->channel(this->channel());
// channel does not exist // check if we have a valid channel and consumer
if (!channel) return false; if (!channel || !channel->consumer()) return false;
// is there a current message? // the consumer may process the frame
MessageImpl *message = channel->message(); channel->consumer()->process(*this);
if (!message) return false;
// store size
if (!message->append(_payload, _size)) return true;
// the message is complete
channel->reportMessage();
// done // done
return true; return true;

View File

@ -46,15 +46,16 @@
*/ */
namespace AMQP { namespace AMQP {
/**
* Constructor
*/
ChannelImpl::ChannelImpl() = default;
/** /**
* Destructor * Destructor
*/ */
ChannelImpl::~ChannelImpl() ChannelImpl::~ChannelImpl()
{ {
// remove incoming message
if (_message) delete _message;
_message = nullptr;
// remove this channel from the connection (but not if the connection is already destructed) // remove this channel from the connection (but not if the connection is already destructed)
if (_connection) _connection->remove(this); if (_connection) _connection->remove(this);
} }
@ -705,40 +706,6 @@ void ChannelImpl::onSynchronized()
} }
} }
/**
* Report the received message
*/
void ChannelImpl::reportMessage()
{
// skip if there is no message
if (!_message) return;
// after the report the channel may be destructed, monitor that
Monitor monitor(this);
// synchronize the channel if this comes from a basic.get frame
if (_message->consumer().empty()) onSynchronized();
// syncing the channel may destruct the channel
if (!monitor.valid()) return;
// look for the consumer
auto iter = _consumers.find(_message->consumer());
if (iter == _consumers.end()) return;
// is this a valid callback method
if (!iter->second) return;
// call the callback
_message->report(iter->second);
// skip if channel was destructed
if (!monitor.valid()) return;
// no longer need the message
delete _message; _message = nullptr;
}
/** /**
* Report an error message on a channel * Report an error message on a channel
* @param message the error message * @param message the error message
@ -800,38 +767,47 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler)
// leap out if object no longer exists // leap out if object no longer exists
if (!monitor.valid()) return; if (!monitor.valid()) return;
// the connection now longer has to know that this channel exists, // the connection no longer has to know that this channel exists,
// because the channel ID is no longer in use // because the channel ID is no longer in use
if (_connection) _connection->remove(this); if (_connection) _connection->remove(this);
} }
/** /**
* Create an incoming message from a consume call * Process incoming delivery
* @param frame *
* @return ConsumedMessage * @param frame The frame to process
*/ */
ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame) void ChannelImpl::process(BasicDeliverFrame &frame)
{ {
// destruct if message is already set // find the consumer for this frame
if (_message) delete _message; auto iter = _consumers.find(frame.consumerTag());
if (iter == _consumers.end()) return;
// construct a message // we are going to be receiving a message, store
return _message = new ConsumedMessage(frame); // the handler for the incoming message
_consumer = iter->second;
// let the consumer process the frame
_consumer->process(frame);
} }
/** /**
* Create an incoming message from a get call * Retrieve the current consumer handler
* @param frame *
* @return ConsumedMessage * @return The handler responsible for the current message
*/ */
ConsumedMessage *ChannelImpl::message(const BasicGetOKFrame &frame) DeferredConsumerBase *ChannelImpl::consumer()
{ {
// destruct if message is already set return _consumer.get();
if (_message) delete _message; }
// construct message /**
return _message = new ConsumedMessage(frame); * Mark the current consumer as done
*/
void ChannelImpl::complete()
{
// no more consumer
_consumer.reset();
} }
/** /**

View File

@ -17,17 +17,17 @@ namespace AMQP {
* @param name Consumer tag that is cancelled * @param name Consumer tag that is cancelled
* @return Deferred * @return Deferred
*/ */
const std::shared_ptr<Deferred> &DeferredCancel::reportSuccess(const std::string &name) const const std::shared_ptr<Deferred> &DeferredCancel::reportSuccess(const std::string &name)
{ {
// in the channel, we should uninstall the consumer // in the channel, we should uninstall the consumer
_channel->uninstall(name); _channel->uninstall(name);
// skip if no special callback was installed // skip if no special callback was installed
if (!_cancelCallback) return Deferred::reportSuccess(); if (!_cancelCallback) return Deferred::reportSuccess();
// call the callback // call the callback
_cancelCallback(name); _cancelCallback(name);
// return next object // return next object
return _next; return _next;
} }

View File

@ -17,21 +17,34 @@ namespace AMQP {
* @param name Consumer tag that is started * @param name Consumer tag that is started
* @return Deferred * @return Deferred
*/ */
const std::shared_ptr<Deferred> &DeferredConsumer::reportSuccess(const std::string &name) const const std::shared_ptr<Deferred> &DeferredConsumer::reportSuccess(const std::string &name)
{ {
// we now know the name, so we can install the message callback on the channel // we now know the name, so install ourselves in the channel
_channel->install(name, _messageCallback); _channel->install(name, shared_from_this());
// skip if no special callback was installed // skip if no special callback was installed
if (!_consumeCallback) return Deferred::reportSuccess(); if (!_consumeCallback) return Deferred::reportSuccess();
// call the callback // call the callback
_consumeCallback(name); _consumeCallback(name);
// return next object // return next object
return _next; return _next;
} }
/**
* Emit a message
*
* @param message The message to emit
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
void DeferredConsumer::emit(Message &&message, uint64_t deliveryTag, bool redelivered) const
{
// simply execute the message callback
_messageCallback(std::move(message), deliveryTag, redelivered);
}
/** /**
* End namespace * End namespace
*/ */

View File

@ -0,0 +1,123 @@
/**
* deferredconsumerbase.cpp
*
* Base class for the deferred consumer and the
* deferred get.
*
* @copyright 2016 Copernica B.V.
*/
/**
* Dependencies
*/
#include "../include/deferredconsumerbase.h"
#include "basicdeliverframe.h"
#include "basicheaderframe.h"
#include "bodyframe.h"
/**
* Start namespace
*/
namespace AMQP {
/**
* Process a delivery frame
*
* @param frame The frame to process
*/
void DeferredConsumerBase::process(BasicDeliverFrame &frame)
{
// retrieve the delivery tag and whether we were redelivered
_deliveryTag = frame.deliveryTag();
_redelivered = frame.redelivered();
// anybody interested in the new message?
if (_beginCallback) _beginCallback();
// do we have anybody interested in messages?
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
}
/**
* Process the message headers
*
* @param frame The frame to process
*/
void DeferredConsumerBase::process(BasicHeaderFrame &frame)
{
// store the body size
_bodySize = frame.bodySize();
// do we have a message?
if (_message)
{
// store the body size and metadata
_message->setBodySize(_bodySize);
_message->set(frame.metaData());
}
// anybody interested in the headers?
if (_headerCallback) _headerCallback(frame.metaData());
// no body data expected? then we are now complete
if (!_bodySize) complete();
}
/**
* Process the message data
*
* @param frame The frame to process
*/
void DeferredConsumerBase::process(BodyFrame &frame)
{
// make sure we stay in scope
auto self = shared_from_this();
// update the bytes still to receive
_bodySize -= frame.payloadSize();
// anybody interested in the data?
if (_dataCallback) _dataCallback(frame.payload(), frame.payloadSize());
// do we have a message? then append the data
if (_message) _message->append(frame.payload(), frame.payloadSize());
// if all bytes were received we are now complete
if (!_bodySize) complete();
}
/**
* Indicate that a message was done
*/
void DeferredConsumerBase::complete()
{
// make sure we stay in scope
auto self = shared_from_this();
// also monitor the channel
Monitor monitor{ _channel };
// do we have a message?
if (_message)
{
// emit the message
emit(std::move(*_message), _deliveryTag, _redelivered);
// and destroy it
_message.reset();
}
// do we have to inform anyone about completion?
if (_completeCallback) _completeCallback(_deliveryTag, _redelivered);
// do we still have a valid channel
if (!monitor.valid()) return;
// we are now done executing
_channel->complete();
}
/**
* End namespace
*/
}

View File

@ -18,41 +18,31 @@
namespace AMQP { namespace AMQP {
/** /**
* Report success, a get message succeeded and the message is expected soon * Report success for a get operation
* @param messageCount Message count *
* @return Deferred * @param messagecount Number of messages left in the queue
* @param deliveryTag Delivery tag of the message coming in
* @param redelivered Was the message redelivered?
*/ */
const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messageCount) const const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered)
{ {
// we grab a self pointer to ensure that the deferred object stays alive // store delivery tag and redelivery status
auto self = shared_from_this(); _deliveryTag = deliveryTag;
_redelivered = redelivered;
// we now know the name, so we can install the message callback on the channel, the self // install ourselves in the channel
// pointer is also captured, which ensures that 'this' is not destructed, all members stay _channel->install("", shared_from_this());
// accessible, and that the onFinalize() function will only be called after the message
// is reported (onFinalize() is called from the destructor of this DeferredGet object)
_channel->install("", [self, this](Message &&message, uint64_t deliveryTag, bool redelivered) {
// install a monitor to deal with the case that the channel is removed
Monitor monitor(_channel);
// call the callbacks
if (_messageCallback) _messageCallback(std::move(message), deliveryTag, redelivered);
// we can remove the callback now from the channel
if (monitor.valid()) _channel->uninstall("");
});
// report the size (note that this is the size _minus_ the message that is retrieved // report the size (note that this is the size _minus_ the message that is retrieved
// (and for which the callback will be called later), so it could be zero) // (and for which the callback will be called later), so it could be zero)
if (_sizeCallback) _sizeCallback(messageCount); if (_sizeCallback) _sizeCallback(messagecount);
// return next object // return next handler
return _next; return _next;
} }
/** /**
* Report success, although no message could be get * Report success, although no message could be gotten
* @return Deferred * @return Deferred
*/ */
const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
@ -67,6 +57,31 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
return _next; return _next;
} }
/**
* Emit a message
*
* @param message The message to emit
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
void DeferredGet::emit(Message &&message, uint64_t deliveryTag, bool redelivered) const
{
// monitor the channel
Monitor monitor{ _channel };
// the channel is now synchronized
_channel->onSynchronized();
// simply execute the message callback
_messageCallback(std::move(message), deliveryTag, redelivered);
// check if the channel is still valid
if (!monitor.valid()) return;
// stop consuming now
_channel->uninstall({});
}
/** /**
* End of namespace * End of namespace
*/ */

View File

@ -6,6 +6,16 @@
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include <stdexcept>
/** /**
* Set up namespace * Set up namespace
*/ */
@ -23,7 +33,7 @@ protected:
*/ */
explicit Exception(const std::string &what) : runtime_error(what) {} explicit Exception(const std::string &what) : runtime_error(what) {}
}; };
/** /**
* End of namespace * End of namespace
*/ */

View File

@ -1,16 +1,27 @@
/** /**
* ExtFrame.h * ExtFrame.h
* *
* Class describing an AMQP frame. A frame can be encoded into the AMQP * Class describing an AMQP frame. A frame can be encoded into the AMQP
* wireframe format, so that it can be sent over an open socket, or it can be * wireframe format, so that it can be sent over an open socket, or it can be
* constructed from a buffer containing AMQP wireframe format. * constructed from a buffer containing AMQP wireframe format.
* *
* The ExtFrame is the base class for all other frames, apart from the * The ExtFrame is the base class for all other frames, apart from the
* protocol-header-frame * protocol-header-frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "frame.h"
#include "../include/receivedframe.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -36,13 +47,13 @@ protected:
/** /**
* Constructor for an AMQP Frame * Constructor for an AMQP Frame
* *
* The constructor is protected as you're not supposed * The constructor is protected as you're not supposed
* *
* @param channel channel we're working on * @param channel channel we're working on
* @param size size of the payload * @param size size of the payload
*/ */
ExtFrame(uint16_t channel, uint32_t size) : _channel(channel), _size(size) {} ExtFrame(uint16_t channel, uint32_t size) : _channel(channel), _size(size) {}
/** /**
* Constructor based on a received not-yet-recognized frame * Constructor based on a received not-yet-recognized frame
@ -141,7 +152,7 @@ public:
{ {
// this is an exception // this is an exception
throw ProtocolException("unimplemented frame type " + std::to_string(type())); throw ProtocolException("unimplemented frame type " + std::to_string(type()));
// unreachable // unreachable
return false; return false;
} }

View File

@ -7,11 +7,27 @@
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "../include/outbuffer.h"
#include "protocolexception.h"
/** /**
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Forward declarations
*/
class ConnectionImpl;
/** /**
* Class definition * Class definition
*/ */

View File

@ -4,6 +4,16 @@
* @copyright 2014, 2015 Copernica BV * @copyright 2014, 2015 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "extframe.h"
/** /**
* Set up namespace * Set up namespace
*/ */

View File

@ -1,9 +1,19 @@
/** /**
* Class describing an AMQP method frame * Class describing an AMQP method frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "extframe.h"
/** /**
* Set up namespace * Set up namespace
*/ */
@ -17,18 +27,18 @@ class MethodFrame : public ExtFrame
protected: protected:
/** /**
* Constructor for a methodFrame * Constructor for a methodFrame
* *
* @param channel channel we're working on * @param channel channel we're working on
* @param size size of the frame. * @param size size of the frame.
*/ */
MethodFrame(uint16_t channel, uint32_t size) : ExtFrame(channel, size + 4) {} // size of classID and methodID MethodFrame(uint16_t channel, uint32_t size) : ExtFrame(channel, size + 4) {} // size of classID and methodID
/** /**
* Load a method from from a received frame * Load a method from from a received frame
* @param frame The received frame * @param frame The received frame
*/ */
MethodFrame(ReceivedFrame &frame) : ExtFrame(frame) {} MethodFrame(ReceivedFrame &frame) : ExtFrame(frame) {}
/** /**
* Fill an output buffer * Fill an output buffer
* @param buffer * @param buffer
@ -37,7 +47,7 @@ protected:
{ {
// call base // call base
ExtFrame::fill(buffer); ExtFrame::fill(buffer);
// add type // add type
buffer.add(classID()); buffer.add(classID());
buffer.add(methodID()); buffer.add(methodID());

View File

@ -7,6 +7,16 @@
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "exception.h"
/** /**
* Set up namespace * Set up namespace
*/ */