From e0feb17ecc3de878e2e69fefa909cc436155936a Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Tue, 27 Feb 2018 05:08:21 +0100 Subject: [PATCH 01/11] renamed deferredconsumerbase into deferredreceiver, because it is not only a base class for the consumer, but also for other receiving operations: get requests and in the future also for returned messages --- include/amqpcpp/channelimpl.h | 20 +-- include/amqpcpp/deferredconsumer.h | 8 +- include/amqpcpp/deferredconsumerbase.h | 166 ------------------------- include/amqpcpp/deferredget.h | 8 +- include/amqpcpp/message.h | 6 +- src/basicgetokframe.h | 11 +- src/basicheaderframe.h | 14 ++- src/bodyframe.h | 16 ++- src/channelimpl.cpp | 14 +-- src/deferredconsumerbase.cpp | 142 --------------------- 10 files changed, 56 insertions(+), 349 deletions(-) delete mode 100644 include/amqpcpp/deferredconsumerbase.h delete mode 100644 src/deferredconsumerbase.cpp diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 3d90266..c5b788b 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -34,7 +34,7 @@ namespace AMQP { /** * Forward declarations */ -class DeferredConsumerBase; +class DeferredReceiver; class BasicDeliverFrame; class DeferredConsumer; class BasicGetOKFrame; @@ -75,9 +75,9 @@ private: /** * Handlers for all consumers that are active - * @var std::map + * @var std::map */ - std::map> _consumers; + std::map> _consumers; /** * Pointer to the oldest deferred result (the first one that is going @@ -129,10 +129,10 @@ private: bool _synchronous = false; /** - * The current consumer receiving a message - * @var std::shared_ptr + * The current object that is busy receiving a message + * @var std::shared_ptr */ - std::shared_ptr _consumer; + std::shared_ptr _receiver; /** * Attach the connection @@ -664,13 +664,13 @@ public: * @param consumer The consumer handler * @param active Is this the new active consumer */ - void install(std::string consumertag, const std::shared_ptr &consumer, bool active = false) + void install(std::string consumertag, const std::shared_ptr &consumer, bool active = false) { // install the consumer handler _consumers[consumertag] = consumer; // should we become the current consumer? - if (active) _consumer = consumer; + if (active) _receiver = consumer; } /** @@ -691,11 +691,11 @@ public: void process(BasicDeliverFrame &frame); /** - * Retrieve the current consumer handler + * Retrieve the current object that is receiving a message * * @return The handler responsible for the current message */ - DeferredConsumerBase *consumer(); + DeferredReceiver *receiver(); /** * Mark the current consumer as done diff --git a/include/amqpcpp/deferredconsumer.h b/include/amqpcpp/deferredconsumer.h index d7d7bf8..bc260d5 100644 --- a/include/amqpcpp/deferredconsumer.h +++ b/include/amqpcpp/deferredconsumer.h @@ -3,7 +3,7 @@ * * Deferred callback for consumers * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -14,7 +14,7 @@ /** * Dependencies */ -#include "deferredconsumerbase.h" +#include "deferredreceiver.h" /** * Set up namespace @@ -24,7 +24,7 @@ namespace AMQP { /** * We extend from the default deferred and add extra functionality */ -class DeferredConsumer : public DeferredConsumerBase +class DeferredConsumer : public DeferredReceiver { private: /** @@ -68,7 +68,7 @@ public: * @param failed are we already failed? */ DeferredConsumer(ChannelImpl *channel, bool failed = false) : - DeferredConsumerBase(failed, channel) {} + DeferredReceiver(failed, channel) {} public: /** diff --git a/include/amqpcpp/deferredconsumerbase.h b/include/amqpcpp/deferredconsumerbase.h deleted file mode 100644 index 5b3bbdb..0000000 --- a/include/amqpcpp/deferredconsumerbase.h +++ /dev/null @@ -1,166 +0,0 @@ -/** - * deferredconsumerbase.h - * - * Base class for the deferred consumer and the - * deferred get. - * - * @copyright 2016 - 2017 Copernica B.V. - */ - -/** - * Include guard - */ -#pragma once - -/** - * Dependencies - */ -#include "deferred.h" -#include "stack_ptr.h" -#include "message.h" - -/** - * Start namespace - */ -namespace AMQP { - -/** - * Forward declarations - */ -class BasicDeliverFrame; -class BasicGetOKFrame; -class BasicHeaderFrame; -class BodyFrame; - -/** - * Base class for deferred consumers - */ -class DeferredConsumerBase : - public Deferred, - public std::enable_shared_from_this -{ -private: - /** - * Size of the body of the current message - * @var uint64_t - */ - uint64_t _bodySize = 0; - - /** - * Process a delivery frame - * - * @param frame The frame to process - */ - void process(BasicDeliverFrame &frame); - - /** - * Process a delivery frame from a get request - * - * @param frame The frame to process - */ - void process(BasicGetOKFrame &frame); - - /** - * Process the message headers - * - * @param frame The frame to process - */ - void process(BasicHeaderFrame &frame); - - /** - * Process the message data - * - * @param frame The frame to process - */ - void process(BodyFrame &frame); - - /** - * Indicate that a message was done - */ - void complete(); - - /** - * Announce that a message has been received - * @param message The message to announce - * @param deliveryTag The delivery tag (for ack()ing) - * @param redelivered Is this a redelivered message - */ - virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0; - - /** - * Frames may be processed - */ - friend class ChannelImpl; - friend class BasicDeliverFrame; - friend class BasicGetOKFrame; - friend class BasicHeaderFrame; - friend class BodyFrame; -protected: - /** - * The delivery tag for the current message - * @var uint64_t - */ - uint64_t _deliveryTag = 0; - - /** - * Is this a redelivered message - * @var bool - */ - bool _redelivered = false; - - /** - * The channel to which the consumer is linked - * @var ChannelImpl - */ - ChannelImpl *_channel; - - /** - * Callback for new message - * @var BeginCallback - */ - BeginCallback _beginCallback; - - /** - * Callback for incoming headers - * @var HeaderCallback - */ - HeaderCallback _headerCallback; - - /** - * Callback for when a chunk of data comes in - * @var DataCallback - */ - DataCallback _dataCallback; - - /** - * Callback for incoming messages - * @var MessageCallback - */ - MessageCallback _messageCallback; - - /** - * Callback for when a message was complete finished - * @var CompleteCallback - */ - CompleteCallback _completeCallback; - - /** - * The message that we are currently receiving - * @var stack_ptr - */ - stack_ptr _message; - - /** - * Constructor - * - * @param failed Have we already failed? - * @param channel The channel we are consuming on - */ - DeferredConsumerBase(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {} -public: -}; - -/** - * End namespace - */ -} diff --git a/include/amqpcpp/deferredget.h b/include/amqpcpp/deferredget.h index 4270d34..71fadbc 100644 --- a/include/amqpcpp/deferredget.h +++ b/include/amqpcpp/deferredget.h @@ -2,7 +2,7 @@ * DeferredGet.h * * @author Emiel Bruijntjes - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -13,7 +13,7 @@ /** * Dependencies */ -#include "deferredconsumerbase.h" +#include "deferredreceiver.h" /** * Set up namespace @@ -27,7 +27,7 @@ namespace AMQP { * it grabs a self-pointer when the callback is running, otherwise the onFinalize() * is called before the actual message is consumed. */ -class DeferredGet : public DeferredConsumerBase +class DeferredGet : public DeferredReceiver { private: /** @@ -84,7 +84,7 @@ public: * @param failed are we already failed? */ DeferredGet(ChannelImpl *channel, bool failed = false) : - DeferredConsumerBase(failed, channel) {} + DeferredReceiver(failed, channel) {} public: /** diff --git a/include/amqpcpp/message.h b/include/amqpcpp/message.h index 5b1bb02..7c8f59e 100644 --- a/include/amqpcpp/message.h +++ b/include/amqpcpp/message.h @@ -7,7 +7,7 @@ * Message objects can not be constructed by end users, they are only constructed * by the AMQP library, and passed to user callbacks. * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -31,7 +31,7 @@ namespace AMQP { /** * Forward declarations */ -class DeferredConsumerBase; +class DeferredReceiver; /** * Class definition @@ -61,7 +61,7 @@ protected: /** * We are an open book to the consumer handler */ - friend class DeferredConsumerBase; + friend class DeferredReceiver; /** * Set the body size diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index 0de5ea9..e90ae5e 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -1,7 +1,7 @@ /** * Class describing a basic get ok frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -173,11 +173,14 @@ public: // report success for the get operation channel->reportSuccess(messageCount(), deliveryTag(), redelivered()); - // check if we have a valid consumer - if (!channel->consumer()) return false; + // get the current receiver object + auto *receiver = channel->receiver(); + + // check if we have a valid receiver + if (receiver == nullptr) return false; // pass on to consumer - channel->consumer()->process(*this); + receiver->process(*this); // done return true; diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index c6ec1c4..bc60a17 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -1,7 +1,7 @@ /** * Class describing an AMQP basic header frame * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -16,7 +16,7 @@ #include "amqpcpp/metadata.h" #include "amqpcpp/envelope.h" #include "amqpcpp/connectionimpl.h" -#include "amqpcpp/deferredconsumerbase.h" +#include "amqpcpp/deferredreceiver.h" /** * Set up namespace @@ -134,12 +134,18 @@ public: { // we need the appropriate channel auto channel = connection->channel(this->channel()); + + // we need a channel + if (channel == nullptr) return false; + + // do we have an object that is receiving this data? + auto *receiver = channel->receiver(); // check if we have a valid channel and consumer - if (!channel || !channel->consumer()) return false; + if (receiver == nullptr) return false; // the channel can process the frame - channel->consumer()->process(*this); + receiver->process(*this); // done return true; diff --git a/src/bodyframe.h b/src/bodyframe.h index ae1621d..5dc4920 100644 --- a/src/bodyframe.h +++ b/src/bodyframe.h @@ -1,7 +1,7 @@ /** * Class describing an AMQP Body Frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -14,7 +14,7 @@ */ #include "extframe.h" #include "amqpcpp/connectionimpl.h" -#include "amqpcpp/deferredconsumerbase.h" +#include "amqpcpp/deferredreceiver.h" /** * Set up namespace @@ -105,12 +105,18 @@ public: { // we need the appropriate channel auto channel = connection->channel(this->channel()); + + // we must have a channel object + if (channel == nullptr) return false; + + // get the object that is receiving the messages + auto *receiver = channel->receiver(); - // check if we have a valid channel and consumer - if (!channel || !channel->consumer()) return false; + // check if we have a valid receiver + if (receiver == nullptr) return false; // the consumer may process the frame - channel->consumer()->process(*this); + receiver->process(*this); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index a498864..a5c17c6 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -828,20 +828,20 @@ void ChannelImpl::process(BasicDeliverFrame &frame) // we are going to be receiving a message, store // the handler for the incoming message - _consumer = iter->second; + _receiver = iter->second; // let the consumer process the frame - _consumer->process(frame); + _receiver->process(frame); } /** - * Retrieve the current consumer handler + * Retrieve the current receiver handler * * @return The handler responsible for the current message */ -DeferredConsumerBase *ChannelImpl::consumer() +DeferredReceiver *ChannelImpl::receiver() { - return _consumer.get(); + return _receiver.get(); } /** @@ -849,8 +849,8 @@ DeferredConsumerBase *ChannelImpl::consumer() */ void ChannelImpl::complete() { - // no more consumer - _consumer.reset(); + // no more receiver + _receiver.reset(); } /** diff --git a/src/deferredconsumerbase.cpp b/src/deferredconsumerbase.cpp deleted file mode 100644 index 09ed0e4..0000000 --- a/src/deferredconsumerbase.cpp +++ /dev/null @@ -1,142 +0,0 @@ -/** - * deferredconsumerbase.cpp - * - * Base class for the deferred consumer and the - * deferred get. - * - * @copyright 2016 - 2017 Copernica B.V. - */ - -/** - * Dependencies - */ -#include "amqpcpp/deferredconsumerbase.h" -#include "basicdeliverframe.h" -#include "basicgetokframe.h" -#include "basicheaderframe.h" -#include "bodyframe.h" - -/** - * Start namespace - */ -namespace AMQP { - -/** - * Process a delivery frame - * - * @param frame The frame to process - */ -void DeferredConsumerBase::process(BasicDeliverFrame &frame) -{ - // retrieve the delivery tag and whether we were redelivered - _deliveryTag = frame.deliveryTag(); - _redelivered = frame.redelivered(); - - // anybody interested in the new message? - if (_beginCallback) _beginCallback(); - - // do we have anybody interested in messages? - if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); -} - -/** - * Process a delivery frame from a get request - * - * @param frame The frame to process - */ -void DeferredConsumerBase::process(BasicGetOKFrame &frame) -{ - // retrieve the delivery tag and whether we were redelivered - _deliveryTag = frame.deliveryTag(); - _redelivered = frame.redelivered(); - - // anybody interested in the new message? - if (_beginCallback) _beginCallback(); - - // do we have anybody interested in messages? - if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); -} - -/** - * Process the message headers - * - * @param frame The frame to process - */ -void DeferredConsumerBase::process(BasicHeaderFrame &frame) -{ - // store the body size - _bodySize = frame.bodySize(); - - // do we have a message? - if (_message) - { - // store the body size and metadata - _message->setBodySize(_bodySize); - _message->set(frame.metaData()); - } - - // anybody interested in the headers? - if (_headerCallback) _headerCallback(frame.metaData()); - - // no body data expected? then we are now complete - if (!_bodySize) complete(); -} - -/** - * Process the message data - * - * @param frame The frame to process - */ -void DeferredConsumerBase::process(BodyFrame &frame) -{ - // make sure we stay in scope - auto self = shared_from_this(); - - // update the bytes still to receive - _bodySize -= frame.payloadSize(); - - // anybody interested in the data? - if (_dataCallback) _dataCallback(frame.payload(), frame.payloadSize()); - - // do we have a message? then append the data - if (_message) _message->append(frame.payload(), frame.payloadSize()); - - // if all bytes were received we are now complete - if (!_bodySize) complete(); -} - -/** - * Indicate that a message was done - */ -void DeferredConsumerBase::complete() -{ - // make sure we stay in scope - auto self = shared_from_this(); - - // also monitor the channel - Monitor monitor{ _channel }; - - // do we have a message? - if (_message) - { - // announce the message - announce(*_message, _deliveryTag, _redelivered); - - // and destroy it - _message.reset(); - } - - // do we have to inform anyone about completion? - if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); - - // do we still have a valid channel - if (!monitor.valid()) return; - - // we are now done executing - _channel->complete(); -} - -/** - * End namespace - */ -} From 3ccc6af475fc1188ad3610b5df20afcf55491350 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Tue, 27 Feb 2018 05:09:03 +0100 Subject: [PATCH 02/11] added deferredreceiver files (forgotter in prev commit) --- include/amqpcpp/deferredreceiver.h | 166 +++++++++++++++++++++++++++++ src/deferredreceiver.cpp | 141 ++++++++++++++++++++++++ 2 files changed, 307 insertions(+) create mode 100644 include/amqpcpp/deferredreceiver.h create mode 100644 src/deferredreceiver.cpp diff --git a/include/amqpcpp/deferredreceiver.h b/include/amqpcpp/deferredreceiver.h new file mode 100644 index 0000000..a315247 --- /dev/null +++ b/include/amqpcpp/deferredreceiver.h @@ -0,0 +1,166 @@ +/** + * DeferredReceiver.h + * + * Base class for the deferred consumer, the deferred get and the + * deferred publisher (that may receive returned messages) + * + * @copyright 2016 - 2018 Copernica B.V. + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "deferred.h" +#include "stack_ptr.h" +#include "message.h" + +/** + * Start namespace + */ +namespace AMQP { + +/** + * Forward declarations + */ +class BasicDeliverFrame; +class BasicGetOKFrame; +class BasicHeaderFrame; +class BodyFrame; + +/** + * Base class for deferred consumers + */ +class DeferredReceiver : + public Deferred, + public std::enable_shared_from_this +{ +private: + /** + * Size of the body of the current message + * @var uint64_t + */ + uint64_t _bodySize = 0; + + /** + * Process a delivery frame + * + * @param frame The frame to process + */ + void process(BasicDeliverFrame &frame); + + /** + * Process a delivery frame from a get request + * + * @param frame The frame to process + */ + void process(BasicGetOKFrame &frame); + + /** + * Process the message headers + * + * @param frame The frame to process + */ + void process(BasicHeaderFrame &frame); + + /** + * Process the message data + * + * @param frame The frame to process + */ + void process(BodyFrame &frame); + + /** + * Indicate that a message was done + */ + void complete(); + + /** + * Announce that a message has been received + * @param message The message to announce + * @param deliveryTag The delivery tag (for ack()ing) + * @param redelivered Is this a redelivered message + */ + virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0; + + /** + * Frames may be processed + */ + friend class ChannelImpl; + friend class BasicDeliverFrame; + friend class BasicGetOKFrame; + friend class BasicHeaderFrame; + friend class BodyFrame; +protected: + /** + * The delivery tag for the current message + * @var uint64_t + */ + uint64_t _deliveryTag = 0; + + /** + * Is this a redelivered message + * @var bool + */ + bool _redelivered = false; + + /** + * The channel to which the consumer is linked + * @var ChannelImpl + */ + ChannelImpl *_channel; + + /** + * Callback for new message + * @var BeginCallback + */ + BeginCallback _beginCallback; + + /** + * Callback for incoming headers + * @var HeaderCallback + */ + HeaderCallback _headerCallback; + + /** + * Callback for when a chunk of data comes in + * @var DataCallback + */ + DataCallback _dataCallback; + + /** + * Callback for incoming messages + * @var MessageCallback + */ + MessageCallback _messageCallback; + + /** + * Callback for when a message was complete finished + * @var CompleteCallback + */ + CompleteCallback _completeCallback; + + /** + * The message that we are currently receiving + * @var stack_ptr + */ + stack_ptr _message; + + /** + * Constructor + * + * @param failed Have we already failed? + * @param channel The channel we are consuming on + */ + DeferredReceiver(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {} +public: +}; + +/** + * End namespace + */ +} diff --git a/src/deferredreceiver.cpp b/src/deferredreceiver.cpp new file mode 100644 index 0000000..22cedc5 --- /dev/null +++ b/src/deferredreceiver.cpp @@ -0,0 +1,141 @@ +/** + * DeferredReceiver.cpp + * + * Implementation file for the DeferredReceiver class + * + * @copyright 2016 - 2018 Copernica B.V. + */ + +/** + * Dependencies + */ +#include "amqpcpp/deferredreceiver.h" +#include "basicdeliverframe.h" +#include "basicgetokframe.h" +#include "basicheaderframe.h" +#include "bodyframe.h" + +/** + * Start namespace + */ +namespace AMQP { + +/** + * Process a delivery frame + * + * @param frame The frame to process + */ +void DeferredReceiver::process(BasicDeliverFrame &frame) +{ + // retrieve the delivery tag and whether we were redelivered + _deliveryTag = frame.deliveryTag(); + _redelivered = frame.redelivered(); + + // anybody interested in the new message? + if (_beginCallback) _beginCallback(); + + // do we have anybody interested in messages? + if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); +} + +/** + * Process a delivery frame from a get request + * + * @param frame The frame to process + */ +void DeferredReceiver::process(BasicGetOKFrame &frame) +{ + // retrieve the delivery tag and whether we were redelivered + _deliveryTag = frame.deliveryTag(); + _redelivered = frame.redelivered(); + + // anybody interested in the new message? + if (_beginCallback) _beginCallback(); + + // do we have anybody interested in messages? + if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); +} + +/** + * Process the message headers + * + * @param frame The frame to process + */ +void DeferredReceiver::process(BasicHeaderFrame &frame) +{ + // store the body size + _bodySize = frame.bodySize(); + + // do we have a message? + if (_message) + { + // store the body size and metadata + _message->setBodySize(_bodySize); + _message->set(frame.metaData()); + } + + // anybody interested in the headers? + if (_headerCallback) _headerCallback(frame.metaData()); + + // no body data expected? then we are now complete + if (!_bodySize) complete(); +} + +/** + * Process the message data + * + * @param frame The frame to process + */ +void DeferredReceiver::process(BodyFrame &frame) +{ + // make sure we stay in scope + auto self = shared_from_this(); + + // update the bytes still to receive + _bodySize -= frame.payloadSize(); + + // anybody interested in the data? + if (_dataCallback) _dataCallback(frame.payload(), frame.payloadSize()); + + // do we have a message? then append the data + if (_message) _message->append(frame.payload(), frame.payloadSize()); + + // if all bytes were received we are now complete + if (!_bodySize) complete(); +} + +/** + * Indicate that a message was done + */ +void DeferredReceiver::complete() +{ + // make sure we stay in scope + auto self = shared_from_this(); + + // also monitor the channel + Monitor monitor{ _channel }; + + // do we have a message? + if (_message) + { + // announce the message + announce(*_message, _deliveryTag, _redelivered); + + // and destroy it + _message.reset(); + } + + // do we have to inform anyone about completion? + if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); + + // do we still have a valid channel + if (!monitor.valid()) return; + + // we are now done executing + _channel->complete(); +} + +/** + * End namespace + */ +} From 520fe40201d73aac8f3dded0550bd03a76c7ead4 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 17:34:27 +0100 Subject: [PATCH 03/11] refactored handling of incoming messages from consume and get operations, to prepare for future handling of returned messages and publisher confirms. this also implies a small change to the api: the begin-callback when a message is received now also gets the original exchange and routing key (which could be useful) --- include/amqpcpp/callbacks.h | 2 +- include/amqpcpp/channelimpl.h | 39 +++++++++--------- include/amqpcpp/deferredconsumer.h | 23 ++++++++--- include/amqpcpp/deferredget.h | 15 ++++--- include/amqpcpp/deferredreceiver.h | 42 ++++++++----------- src/basicdeliverframe.h | 13 ++++-- src/basicgetokframe.h | 8 ++-- src/channelimpl.cpp | 43 ++++--------------- src/consumedmessage.h | 2 +- src/deferredconsumer.cpp | 33 +++++++++------ src/deferredget.cpp | 33 +++++---------- src/deferredreceiver.cpp | 66 +++++++++--------------------- 12 files changed, 137 insertions(+), 182 deletions(-) diff --git a/include/amqpcpp/callbacks.h b/include/amqpcpp/callbacks.h index a7c85b9..67e3757 100644 --- a/include/amqpcpp/callbacks.h +++ b/include/amqpcpp/callbacks.h @@ -38,7 +38,7 @@ using SuccessCallback = std::function; using ErrorCallback = std::function; using FinalizeCallback = std::function; using EmptyCallback = std::function; -using BeginCallback = std::function; +using BeginCallback = std::function; using HeaderCallback = std::function; using DataCallback = std::function; using MessageCallback = std::function; diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index c5b788b..3f1577e 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -75,9 +75,9 @@ private: /** * Handlers for all consumers that are active - * @var std::map + * @var std::map */ - std::map> _consumers; + std::map> _consumers; /** * Pointer to the oldest deferred result (the first one that is going @@ -130,7 +130,7 @@ private: /** * The current object that is busy receiving a message - * @var std::shared_ptr + * @var std::shared_ptr */ std::shared_ptr _receiver; @@ -659,18 +659,23 @@ public: /** * Install a consumer - * * @param consumertag The consumer tag - * @param consumer The consumer handler - * @param active Is this the new active consumer + * @param consumer The consumer object */ - void install(std::string consumertag, const std::shared_ptr &consumer, bool active = false) + void install(const std::string &consumertag, const std::shared_ptr &consumer) { // install the consumer handler _consumers[consumertag] = consumer; + } - // should we become the current consumer? - if (active) _receiver = consumer; + /** + * Install the current consumer + * @param receiver The receiver object + */ + void install(const std::shared_ptr &receiver) + { + // store object as current receiver + _receiver = receiver; } /** @@ -684,23 +689,17 @@ public: } /** - * Process incoming delivery - * - * @param frame The frame to process + * Fetch the receiver for a specific consumer tag + * @param consumertag the consumer tag + * @return the receiver object */ - void process(BasicDeliverFrame &frame); + DeferredConsumer *consumer(const std::string &consumertag) const; /** * Retrieve the current object that is receiving a message - * * @return The handler responsible for the current message */ - DeferredReceiver *receiver(); - - /** - * Mark the current consumer as done - */ - void complete(); + DeferredReceiver *receiver() const { return _receiver.get(); } /** * The channel class is its friend, thus can it instantiate this object diff --git a/include/amqpcpp/deferredconsumer.h b/include/amqpcpp/deferredconsumer.h index bc260d5..732980f 100644 --- a/include/amqpcpp/deferredconsumer.h +++ b/include/amqpcpp/deferredconsumer.h @@ -20,11 +20,16 @@ * Set up namespace */ namespace AMQP { + +/** + * Forward declararions + */ +class BasicDeliverFrame; /** * We extend from the default deferred and add extra functionality */ -class DeferredConsumer : public DeferredReceiver +class DeferredConsumer : public DeferredReceiver, public std::enable_shared_from_this { private: /** @@ -33,6 +38,13 @@ private: */ ConsumeCallback _consumeCallback; + /** + * Process a delivery frame + * + * @param frame The frame to process + */ + void process(BasicDeliverFrame &frame); + /** * Report success for frames that report start consumer operations * @param name Consumer tag that is started @@ -41,12 +53,10 @@ private: virtual const std::shared_ptr &reportSuccess(const std::string &name) override; /** - * Announce that a message has been received - * @param message The message to announce - * @param deliveryTag The delivery tag (for ack()ing) - * @param redelivered Is this a redelivered message + * Get reference to self to prevent that object falls out of scope + * @return std::shared_ptr */ - virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override; + virtual std::shared_ptr lock() override { return shared_from_this(); } /** * The channel implementation may call our @@ -54,6 +64,7 @@ private: */ friend class ChannelImpl; friend class ConsumedMessage; + friend class BasicDeliverFrame; public: /** diff --git a/include/amqpcpp/deferredget.h b/include/amqpcpp/deferredget.h index 71fadbc..4997e93 100644 --- a/include/amqpcpp/deferredget.h +++ b/include/amqpcpp/deferredget.h @@ -27,7 +27,7 @@ namespace AMQP { * it grabs a self-pointer when the callback is running, otherwise the onFinalize() * is called before the actual message is consumed. */ -class DeferredGet : public DeferredReceiver +class DeferredGet : public DeferredReceiver, public std::enable_shared_from_this { private: /** @@ -57,12 +57,15 @@ private: virtual const std::shared_ptr &reportSuccess() const override; /** - * Announce that a message has been received - * @param message The message to announce - * @param deliveryTag The delivery tag (for ack()ing) - * @param redelivered Is this a redelivered message + * Get reference to self to prevent that object falls out of scope + * @return std::shared_ptr */ - virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override; + virtual std::shared_ptr lock() override { return shared_from_this(); } + + /** + * Extended implementation of the complete method that is called when a message was fully received + */ + virtual void complete() override; /** * The channel implementation may call our diff --git a/include/amqpcpp/deferredreceiver.h b/include/amqpcpp/deferredreceiver.h index a315247..3943dc2 100644 --- a/include/amqpcpp/deferredreceiver.h +++ b/include/amqpcpp/deferredreceiver.h @@ -35,9 +35,7 @@ class BodyFrame; /** * Base class for deferred consumers */ -class DeferredReceiver : - public Deferred, - public std::enable_shared_from_this +class DeferredReceiver : public Deferred { private: /** @@ -46,20 +44,27 @@ private: */ uint64_t _bodySize = 0; + +protected: /** - * Process a delivery frame - * - * @param frame The frame to process + * Initialize the object to send out a message + * @param exchange the exchange to which the message was published + * @param routingkey the routing key that was used to publish the message */ - void process(BasicDeliverFrame &frame); + void initialize(const std::string &exchange, const std::string &routingkey); + + /** + * Get reference to self to prevent that object falls out of scope + * @return std::shared_ptr + */ + virtual std::shared_ptr lock() = 0; /** - * Process a delivery frame from a get request - * - * @param frame The frame to process + * Indicate that a message was done */ - void process(BasicGetOKFrame &frame); + virtual void complete(); +private: /** * Process the message headers * @@ -74,27 +79,14 @@ private: */ void process(BodyFrame &frame); - /** - * Indicate that a message was done - */ - void complete(); - - /** - * Announce that a message has been received - * @param message The message to announce - * @param deliveryTag The delivery tag (for ack()ing) - * @param redelivered Is this a redelivered message - */ - virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0; - /** * Frames may be processed */ friend class ChannelImpl; - friend class BasicDeliverFrame; friend class BasicGetOKFrame; friend class BasicHeaderFrame; friend class BodyFrame; + protected: /** * The delivery tag for the current message diff --git a/src/basicdeliverframe.h b/src/basicdeliverframe.h index 4d4c408..07dad11 100644 --- a/src/basicdeliverframe.h +++ b/src/basicdeliverframe.h @@ -1,7 +1,7 @@ /** * Class describing a basic deliver frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -16,6 +16,7 @@ #include "amqpcpp/stringfield.h" #include "amqpcpp/booleanset.h" #include "amqpcpp/connectionimpl.h" +#include "amqpcpp/deferredconsumer.h" /** * Set up namespace @@ -193,8 +194,14 @@ public: // channel does not exist if (!channel) return false; - // construct the message - channel->process(*this); + // get the appropriate consumer object + auto consumer = channel->consumer(_consumerTag); + + // skip if there was no consumer for this tag + if (consumer == nullptr) return false; + + // initialize the object, because we're about to receive a message + consumer->process(*this); // done return true; diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index e90ae5e..97bc7af 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -170,8 +170,8 @@ public: // channel does not exist if (!channel) return false; - // report success for the get operation - channel->reportSuccess(messageCount(), deliveryTag(), redelivered()); + // report success for the get operation (this will also update the current receiver!) + channel->reportSuccess(messageCount(), _deliveryTag, redelivered()); // get the current receiver object auto *receiver = channel->receiver(); @@ -179,8 +179,8 @@ public: // check if we have a valid receiver if (receiver == nullptr) return false; - // pass on to consumer - receiver->process(*this); + // initialize the receiver for the upcoming message + receiver->initialize(_exchange, _routingKey); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index a5c17c6..95ae8f8 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -6,7 +6,6 @@ * @copyright 2014 - 2018 Copernica BV */ #include "includes.h" -#include "basicdeliverframe.h" #include "basicgetokframe.h" #include "basicreturnframe.h" #include "consumedmessage.h" @@ -816,41 +815,17 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler) } /** - * Process incoming delivery - * - * @param frame The frame to process + * Get the current receiver for a given consumer tag + * @param consumertag the consumer frame + * @return DeferredConsumer */ -void ChannelImpl::process(BasicDeliverFrame &frame) +DeferredConsumer *ChannelImpl::consumer(const std::string &consumertag) const { - // find the consumer for this frame - auto iter = _consumers.find(frame.consumerTag()); - if (iter == _consumers.end()) return; - - // we are going to be receiving a message, store - // the handler for the incoming message - _receiver = iter->second; - - // let the consumer process the frame - _receiver->process(frame); -} - -/** - * Retrieve the current receiver handler - * - * @return The handler responsible for the current message - */ -DeferredReceiver *ChannelImpl::receiver() -{ - return _receiver.get(); -} - -/** - * Mark the current consumer as done - */ -void ChannelImpl::complete() -{ - // no more receiver - _receiver.reset(); + // look in the map + auto iter = _consumers.find(consumertag); + + // return the result + return iter == _consumers.end() ? nullptr : iter->second.get(); } /** diff --git a/src/consumedmessage.h b/src/consumedmessage.h index 01f3bcc..bba72c4 100644 --- a/src/consumedmessage.h +++ b/src/consumedmessage.h @@ -1,7 +1,7 @@ /** * Base class for a message implementation * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** diff --git a/src/deferredconsumer.cpp b/src/deferredconsumer.cpp index 221d089..1c8b83d 100644 --- a/src/deferredconsumer.cpp +++ b/src/deferredconsumer.cpp @@ -3,15 +3,34 @@ * * Implementation file for the DeferredConsumer class * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ #include "includes.h" +#include "basicdeliverframe.h" /** * Namespace */ namespace AMQP { +/** + * Process a delivery frame + * + * @param frame The frame to process + */ +void DeferredConsumer::process(BasicDeliverFrame &frame) +{ + // this object will handle all future frames with header and body data + _channel->install(shared_from_this()); + + // retrieve the delivery tag and whether we were redelivered + _deliveryTag = frame.deliveryTag(); + _redelivered = frame.redelivered(); + + // initialize the object for the next message + initialize(frame.exchange(), frame.routingKey()); +} + /** * Report success for frames that report start consumer operations * @param name Consumer tag that is started @@ -32,18 +51,6 @@ const std::shared_ptr &DeferredConsumer::reportSuccess(const std::stri return _next; } -/** - * Announce that a message was received - * @param message The message to announce - * @param deliveryTag The delivery tag (for ack()ing) - * @param redelivered Is this a redelivered message - */ -void DeferredConsumer::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const -{ - // simply execute the message callback - _messageCallback(message, deliveryTag, redelivered); -} - /** * End namespace */ diff --git a/src/deferredget.cpp b/src/deferredget.cpp index dbd838b..c1bb6ab 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -4,7 +4,7 @@ * Implementation of the DeferredGet call * * @author Emiel Bruijntjes - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -19,20 +19,19 @@ namespace AMQP { /** * Report success for a get operation - * * @param messagecount Number of messages left in the queue * @param deliveryTag Delivery tag of the message coming in * @param redelivered Was the message redelivered? */ const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered) { + // install this object as the handler for the upcoming header and body frames + _channel->install(shared_from_this()); + // store delivery tag and redelivery status _deliveryTag = deliveryTag; _redelivered = redelivered; - // install ourselves in the channel - _channel->install("", shared_from_this(), true); - // report the size (note that this is the size _minus_ the message that is retrieved // (and for which the callback will be called later), so it could be zero) if (_sizeCallback) _sizeCallback(messagecount); @@ -58,27 +57,15 @@ const std::shared_ptr &DeferredGet::reportSuccess() const } /** - * Announce that a message has been received - * @param message The message to announce - * @param deliveryTag The delivery tag (for ack()ing) - * @param redelivered Is this a redelivered message + * Extended implementation of the complete method that is called when a message was fully received */ -void DeferredGet::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const +void DeferredGet::complete() { - // monitor the channel - Monitor monitor{ _channel }; - - // the channel is now synchronized + // the channel is now synchronized, delayed frames may now be sent _channel->onSynchronized(); - - // simply execute the message callback - _messageCallback(std::move(message), deliveryTag, redelivered); - - // check if the channel is still valid - if (!monitor.valid()) return; - - // stop consuming now - _channel->uninstall({}); + + // pass on to normal implementation + DeferredReceiver::complete(); } /** diff --git a/src/deferredreceiver.cpp b/src/deferredreceiver.cpp index 22cedc5..52e3766 100644 --- a/src/deferredreceiver.cpp +++ b/src/deferredreceiver.cpp @@ -21,39 +21,17 @@ namespace AMQP { /** - * Process a delivery frame - * - * @param frame The frame to process + * Initialize the object: we are going to receive a message, next frames will be header and data + * @param exchange + * @param routingkey */ -void DeferredReceiver::process(BasicDeliverFrame &frame) +void DeferredReceiver::initialize(const std::string &exchange, const std::string &routingkey) { - // retrieve the delivery tag and whether we were redelivered - _deliveryTag = frame.deliveryTag(); - _redelivered = frame.redelivered(); - // anybody interested in the new message? - if (_beginCallback) _beginCallback(); + if (_beginCallback) _beginCallback(exchange, routingkey); - // do we have anybody interested in messages? - if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); -} - -/** - * Process a delivery frame from a get request - * - * @param frame The frame to process - */ -void DeferredReceiver::process(BasicGetOKFrame &frame) -{ - // retrieve the delivery tag and whether we were redelivered - _deliveryTag = frame.deliveryTag(); - _redelivered = frame.redelivered(); - - // anybody interested in the new message? - if (_beginCallback) _beginCallback(); - - // do we have anybody interested in messages? - if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); + // do we have anybody interested in messages? in that case we construct the message + if (_messageCallback) _message.construct(exchange, routingkey); } /** @@ -63,6 +41,9 @@ void DeferredReceiver::process(BasicGetOKFrame &frame) */ void DeferredReceiver::process(BasicHeaderFrame &frame) { + // make sure we stay in scope + auto self = lock(); + // store the body size _bodySize = frame.bodySize(); @@ -78,7 +59,7 @@ void DeferredReceiver::process(BasicHeaderFrame &frame) if (_headerCallback) _headerCallback(frame.metaData()); // no body data expected? then we are now complete - if (!_bodySize) complete(); + if (_bodySize == 0) complete(); } /** @@ -89,7 +70,7 @@ void DeferredReceiver::process(BasicHeaderFrame &frame) void DeferredReceiver::process(BodyFrame &frame) { // make sure we stay in scope - auto self = shared_from_this(); + auto self = lock(); // update the bytes still to receive _bodySize -= frame.payloadSize(); @@ -101,7 +82,7 @@ void DeferredReceiver::process(BodyFrame &frame) if (_message) _message->append(frame.payload(), frame.payloadSize()); // if all bytes were received we are now complete - if (!_bodySize) complete(); + if (_bodySize == 0) complete(); } /** @@ -109,30 +90,23 @@ void DeferredReceiver::process(BodyFrame &frame) */ void DeferredReceiver::complete() { - // make sure we stay in scope - auto self = shared_from_this(); - // also monitor the channel - Monitor monitor{ _channel }; + Monitor monitor(_channel); // do we have a message? - if (_message) - { - // announce the message - announce(*_message, _deliveryTag, _redelivered); - - // and destroy it - _message.reset(); - } + if (_message) _messageCallback(*_message, _deliveryTag, _redelivered); // do we have to inform anyone about completion? if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); + + // for the next iteration we want a new message + _message.reset(); // do we still have a valid channel if (!monitor.valid()) return; - // we are now done executing - _channel->complete(); + // we are now done executing, so the channel can forget the current receiving object + _channel->install(nullptr); } /** From ef76876d67c3aa92647a7c7a909cb0fc8cacabd1 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 18:07:18 +0100 Subject: [PATCH 04/11] refactored code to make room for a deferredpublisher class (which will also use the deferredreceiver base class) --- include/amqpcpp/deferredconsumer.h | 6 +- include/amqpcpp/deferredextreceiver.h | 85 +++++++++++++++++++++++++++ include/amqpcpp/deferredget.h | 6 +- include/amqpcpp/deferredreceiver.h | 33 ++++------- src/consumedmessage.h | 5 ++ src/deferredextreceiver.cpp | 64 ++++++++++++++++++++ src/deferredget.cpp | 2 +- src/deferredreceiver.cpp | 27 --------- 8 files changed, 171 insertions(+), 57 deletions(-) create mode 100644 include/amqpcpp/deferredextreceiver.h create mode 100644 src/deferredextreceiver.cpp diff --git a/include/amqpcpp/deferredconsumer.h b/include/amqpcpp/deferredconsumer.h index 732980f..bd5e576 100644 --- a/include/amqpcpp/deferredconsumer.h +++ b/include/amqpcpp/deferredconsumer.h @@ -14,7 +14,7 @@ /** * Dependencies */ -#include "deferredreceiver.h" +#include "deferredextreceiver.h" /** * Set up namespace @@ -29,7 +29,7 @@ class BasicDeliverFrame; /** * We extend from the default deferred and add extra functionality */ -class DeferredConsumer : public DeferredReceiver, public std::enable_shared_from_this +class DeferredConsumer : public DeferredExtReceiver, public std::enable_shared_from_this { private: /** @@ -79,7 +79,7 @@ public: * @param failed are we already failed? */ DeferredConsumer(ChannelImpl *channel, bool failed = false) : - DeferredReceiver(failed, channel) {} + DeferredExtReceiver(failed, channel) {} public: /** diff --git a/include/amqpcpp/deferredextreceiver.h b/include/amqpcpp/deferredextreceiver.h new file mode 100644 index 0000000..af15bfe --- /dev/null +++ b/include/amqpcpp/deferredextreceiver.h @@ -0,0 +1,85 @@ +/** + * DeferredExtReceiver.h + * + * Extended receiver that _wants_ to receive message (because it is + * consuming or get'ting messages. This is the base class for both + * the DeferredConsumer as well as the DeferredGet classes, but not + * the base of the DeferredPublisher (which can also receive returned + * messages, but not as a result of an explicit request) + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "deferredreceiver.h" + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class DeferredExtReceiver : public DeferredReceiver +{ +protected: + /** + * The delivery tag for the current message + * @var uint64_t + */ + uint64_t _deliveryTag = 0; + + /** + * Is this a redelivered message + * @var bool + */ + bool _redelivered = false; + + /** + * Callback for incoming messages + * @var MessageCallback + */ + MessageCallback _messageCallback; + + + /** + * Initialize the object to send out a message + * @param exchange the exchange to which the message was published + * @param routingkey the routing key that was used to publish the message + */ + virtual void initialize(const std::string &exchange, const std::string &routingkey) override; + + /** + * Indicate that a message was done + */ + virtual void complete() override; + + /** + * Constructor + * @param failed Have we already failed? + * @param channel The channel we are consuming on + */ + DeferredExtReceiver(bool failed, ChannelImpl *channel) : + DeferredReceiver(failed, channel) {} + +public: + /** + * Destructor + */ + virtual ~DeferredExtReceiver() = default; +}; + +/** + * End of namespace + */ +} + diff --git a/include/amqpcpp/deferredget.h b/include/amqpcpp/deferredget.h index 4997e93..4ea8f1f 100644 --- a/include/amqpcpp/deferredget.h +++ b/include/amqpcpp/deferredget.h @@ -13,7 +13,7 @@ /** * Dependencies */ -#include "deferredreceiver.h" +#include "deferredextreceiver.h" /** * Set up namespace @@ -27,7 +27,7 @@ namespace AMQP { * it grabs a self-pointer when the callback is running, otherwise the onFinalize() * is called before the actual message is consumed. */ -class DeferredGet : public DeferredReceiver, public std::enable_shared_from_this +class DeferredGet : public DeferredExtReceiver, public std::enable_shared_from_this { private: /** @@ -87,7 +87,7 @@ public: * @param failed are we already failed? */ DeferredGet(ChannelImpl *channel, bool failed = false) : - DeferredReceiver(failed, channel) {} + DeferredExtReceiver(failed, channel) {} public: /** diff --git a/include/amqpcpp/deferredreceiver.h b/include/amqpcpp/deferredreceiver.h index 3943dc2..e4d4562 100644 --- a/include/amqpcpp/deferredreceiver.h +++ b/include/amqpcpp/deferredreceiver.h @@ -51,18 +51,18 @@ protected: * @param exchange the exchange to which the message was published * @param routingkey the routing key that was used to publish the message */ - void initialize(const std::string &exchange, const std::string &routingkey); + virtual void initialize(const std::string &exchange, const std::string &routingkey); /** * Get reference to self to prevent that object falls out of scope * @return std::shared_ptr */ virtual std::shared_ptr lock() = 0; - + /** * Indicate that a message was done */ - virtual void complete(); + virtual void complete() = 0; private: /** @@ -88,18 +88,6 @@ private: friend class BodyFrame; protected: - /** - * The delivery tag for the current message - * @var uint64_t - */ - uint64_t _deliveryTag = 0; - - /** - * Is this a redelivered message - * @var bool - */ - bool _redelivered = false; - /** * The channel to which the consumer is linked * @var ChannelImpl @@ -124,12 +112,6 @@ protected: */ DataCallback _dataCallback; - /** - * Callback for incoming messages - * @var MessageCallback - */ - MessageCallback _messageCallback; - /** * Callback for when a message was complete finished * @var CompleteCallback @@ -144,12 +126,17 @@ protected: /** * Constructor - * * @param failed Have we already failed? * @param channel The channel we are consuming on */ - DeferredReceiver(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {} + DeferredReceiver(bool failed, ChannelImpl *channel) : + Deferred(failed), _channel(channel) {} + public: + /** + * Destructor + */ + virtual ~DeferredReceiver() = default; }; /** diff --git a/src/consumedmessage.h b/src/consumedmessage.h index bba72c4..21f4a6c 100644 --- a/src/consumedmessage.h +++ b/src/consumedmessage.h @@ -4,6 +4,11 @@ * @copyright 2014 - 2018 Copernica BV */ +/** + * Dependencies + */ +#include "basicdeliverframe.h" + /** * Set up namespace */ diff --git a/src/deferredextreceiver.cpp b/src/deferredextreceiver.cpp new file mode 100644 index 0000000..ca05185 --- /dev/null +++ b/src/deferredextreceiver.cpp @@ -0,0 +1,64 @@ +/** + * DeferredExtReceiver.cpp + * + * Implementation file for the DeferredExtReceiver class + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Dependencies + */ +#include "amqpcpp/deferredextreceiver.h" +#include "amqpcpp/channelimpl.h" + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Initialize the object to send out a message + * @param exchange the exchange to which the message was published + * @param routingkey the routing key that was used to publish the message + */ +void DeferredExtReceiver::initialize(const std::string &exchange, const std::string &routingkey) +{ + // call base + DeferredExtReceiver::initialize(exchange, routingkey); + + // do we have anybody interested in messages? in that case we construct the message + if (_messageCallback) _message.construct(exchange, routingkey); +} + +/** + * Indicate that a message was done + */ +void DeferredExtReceiver::complete() +{ + // also monitor the channel + Monitor monitor(_channel); + + // do we have a message? + if (_message) _messageCallback(*_message, _deliveryTag, _redelivered); + + // do we have to inform anyone about completion? + if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); + + // for the next iteration we want a new message + _message.reset(); + + // do we still have a valid channel + if (!monitor.valid()) return; + + // we are now done executing, so the channel can forget the current receiving object + _channel->install(nullptr); +} + +/** + * End of namespace + */ +} + + diff --git a/src/deferredget.cpp b/src/deferredget.cpp index c1bb6ab..84a2337 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -65,7 +65,7 @@ void DeferredGet::complete() _channel->onSynchronized(); // pass on to normal implementation - DeferredReceiver::complete(); + DeferredExtReceiver::complete(); } /** diff --git a/src/deferredreceiver.cpp b/src/deferredreceiver.cpp index 52e3766..251258a 100644 --- a/src/deferredreceiver.cpp +++ b/src/deferredreceiver.cpp @@ -29,9 +29,6 @@ void DeferredReceiver::initialize(const std::string &exchange, const std::string { // anybody interested in the new message? if (_beginCallback) _beginCallback(exchange, routingkey); - - // do we have anybody interested in messages? in that case we construct the message - if (_messageCallback) _message.construct(exchange, routingkey); } /** @@ -85,30 +82,6 @@ void DeferredReceiver::process(BodyFrame &frame) if (_bodySize == 0) complete(); } -/** - * Indicate that a message was done - */ -void DeferredReceiver::complete() -{ - // also monitor the channel - Monitor monitor(_channel); - - // do we have a message? - if (_message) _messageCallback(*_message, _deliveryTag, _redelivered); - - // do we have to inform anyone about completion? - if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); - - // for the next iteration we want a new message - _message.reset(); - - // do we still have a valid channel - if (!monitor.valid()) return; - - // we are now done executing, so the channel can forget the current receiving object - _channel->install(nullptr); -} - /** * End namespace */ From f39df772d32b2a826317e26f19f7ee0b6e4ab127 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 21:12:50 +0100 Subject: [PATCH 05/11] breaking changes: channel.publish() now returns a DeferredConsumer object on which callbacks can be installed for handling returned messages, channel.get().onSize() has a different behavior: it now reports the message size (and no longer the queue size), channel.get().onCount() has been added: it reports the queue size (this used to be the onSize() method), channel.consume().onSize() method has been added to find out the size of the upcoming message --- include/amqpcpp/deferredpublisher.h | 179 ++++++++++++++++++++++++++++ src/deferredpublisher.cpp | 45 +++++++ 2 files changed, 224 insertions(+) create mode 100644 include/amqpcpp/deferredpublisher.h create mode 100644 src/deferredpublisher.cpp diff --git a/include/amqpcpp/deferredpublisher.h b/include/amqpcpp/deferredpublisher.h new file mode 100644 index 0000000..04f151c --- /dev/null +++ b/include/amqpcpp/deferredpublisher.h @@ -0,0 +1,179 @@ +/** + * DeferredPublisher.h + * + * Class that is returned when channel::publish() is called, and that + * can be used to install callback methods that define how returned + * messages should be handled. + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class DeferredPublisher : public DeferredReceiver +{ +private: + + +public: + /** + * Constructor that should only be called from within the channel implementation + * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. + * + * @param channel the channel implementation + * @param failed are we already failed? + */ + DeferredConsumer(ChannelImpl *channel, bool failed = false) : + DeferredReceiver(failed, channel) {} + +public: + /** + * Register a function to be called when a full message is returned + * @param callback the callback to execute + */ + DeferredConsumer &onReceived(const ReturnCallback &callback) + { + // store callback + _returnCallback = callback; + + // allow chaining + return *this; + } + + /** + * Alias for onReceived() (see above) + * @param callback the callback to execute + */ + DeferredConsumer &onMessage(const ReturnCallback &callback) + { + // store callback + _returnCallback = callback; + + // allow chaining + return *this; + } + + /** + * Alias for onReceived() (see above) + * @param callback the callback to execute + */ + DeferredConsumer &onReturned(const ReturnCallback &callback) + { + // store callback + _returnCallback = callback; + + // allow chaining + return *this; + } + + /** + * RabbitMQ sends a message in multiple frames to its consumers. + * The AMQP-CPP library collects these frames and merges them into a + * single AMQP::Message object that is passed to the callback that + * you can set with the onReceived() or onMessage() methods (see above). + * + * However, you can also write your own algorithm to merge the frames. + * In that case you can install callbacks to handle the frames. Every + * message is sent in a number of frames: + * + * - a begin frame that marks the start of the message + * - an optional header if the message was sent with an envelope + * - zero or more data frames (usually 1, but more for large messages) + * - an end frame to mark the end of the message. + * + * To install handlers for these frames, you can use the onBegin(), + * onHeaders(), onData() and onComplete() methods. + * + * If you just rely on the onReceived() or onMessage() callbacks, you + * do not need any of the methods below this line. + */ + + /** + * Register the function that is called when the start frame of a new + * consumed message is received + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredConsumer &onBegin(const BeginCallback &callback) + { + // store callback + _beginCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function that is called when message headers come in + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ + DeferredConsumer &onHeaders(const HeaderCallback &callback) + { + // store callback + _headerCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when a chunk of data comes in + * + * Note that this function may be called zero, one or multiple times + * for each incoming message depending on the size of the message data. + * + * If you install this callback you very likely also want to install + * the onComplete callback so you know when the last data part was + * received. + * + * @param callback The callback to invoke for chunks of message data + * @return Same object for chaining + */ + DeferredConsumer &onData(const DataCallback &callback) + { + // store callback + _dataCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a funtion to be called when a message was completely received + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredConsumer &onComplete(const CompleteCallback &callback) + { + // store callback + _completeCallback = callback; + + // allow chaining + return *this; + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/deferredpublisher.cpp b/src/deferredpublisher.cpp new file mode 100644 index 0000000..61af52a --- /dev/null +++ b/src/deferredpublisher.cpp @@ -0,0 +1,45 @@ +/** + * DeferredPublisher.cpp + * + * Implementation file for the DeferredPublisher class + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ +#include "includes.h" + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Indicate that a message was done + */ +void DeferredPublisher::complete() +{ + // also monitor the channel + Monitor monitor(_channel); + + // do we have a message? + if (_message) _bounceCallback(*_message, 0, ""); + + // do we have to inform anyone about completion? + if (_completeCallback) _completeCallback(); + + // for the next iteration we want a new message + _message.reset(); + + // do we still have a valid channel + if (!monitor.valid()) return; + + // we are now done executing, so the channel can forget the current receiving object + _channel->install(nullptr); +} + +/** + * End of namespace + */ +} + + From 1f3500cee89fc98e64894c30ff3c7b2dbcc5292d Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 21:12:53 +0100 Subject: [PATCH 06/11] breaking changes: channel.publish() now returns a DeferredConsumer object on which callbacks can be installed for handling returned messages, channel.get().onSize() has a different behavior: it now reports the message size (and no longer the queue size), channel.get().onCount() has been added: it reports the queue size (this used to be the onSize() method), channel.consume().onSize() method has been added to find out the size of the upcoming message --- include/amqpcpp/callbacks.h | 66 +++++++---- include/amqpcpp/channel.h | 24 +++- include/amqpcpp/channelimpl.h | 13 ++- include/amqpcpp/deferredconsumer.h | 57 +++++++++- include/amqpcpp/deferredextreceiver.h | 6 + include/amqpcpp/deferredget.h | 157 +++++++++++++++++--------- include/amqpcpp/deferredpublisher.h | 112 +++++++++++++----- include/amqpcpp/deferredreceiver.h | 16 +-- src/channelimpl.cpp | 20 ++-- src/deferredextreceiver.cpp | 4 +- src/deferredget.cpp | 4 +- src/deferredpublisher.cpp | 2 +- src/deferredreceiver.cpp | 5 +- src/includes.h | 1 + 14 files changed, 345 insertions(+), 142 deletions(-) diff --git a/include/amqpcpp/callbacks.h b/include/amqpcpp/callbacks.h index 67e3757..a2d5ec4 100644 --- a/include/amqpcpp/callbacks.h +++ b/include/amqpcpp/callbacks.h @@ -3,7 +3,7 @@ * * Class storing deferred callbacks of different type. * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -29,25 +29,53 @@ class Message; class MetaData; /** - * All the callbacks that are supported - * - * When someone registers a callback function for certain events, it should - * match one of the following signatures. + * Generic callbacks that are used by many deferred objects */ -using SuccessCallback = std::function; -using ErrorCallback = std::function; -using FinalizeCallback = std::function; -using EmptyCallback = std::function; -using BeginCallback = std::function; -using HeaderCallback = std::function; -using DataCallback = std::function; -using MessageCallback = std::function; -using CompleteCallback = std::function; -using QueueCallback = std::function; -using DeleteCallback = std::function; -using SizeCallback = std::function; -using ConsumeCallback = std::function; -using CancelCallback = std::function; +using SuccessCallback = std::function; +using ErrorCallback = std::function; +using FinalizeCallback = std::function; + +/** + * Declaring and deleting a queue + */ +using QueueCallback = std::function; +using DeleteCallback = std::function; + +/** + * When retrieving the size of a queue in some way + */ +using EmptyCallback = std::function; +using SizeCallback = std::function; + +/** + * Starting and stopping a consumer + */ +using ConsumeCallback = std::function; +using CancelCallback = std::function; + +/** + * Receiving messages, either via consume(), get() or as returned messages + * The following methods receive the returned message in multiple parts + */ +using StartCallback = std::function; +using HeaderCallback = std::function; +using DataCallback = std::function; +using DeliveredCallback = std::function; + +/** + * For returned messages amqp-cpp first calls a return-callback before the start, + * header and data callbacks are called. Instead of the deliver-callback, a + * returned-callback is called. + */ +using ReturnCallback = std::function; +using ReturnedCallback = std::function; + +/** + * If you do not want to merge all data into a single string, you can als + * implement callbacks that return the collected message. + */ +using MessageCallback = std::function; +using BounceCallback = std::function; /** * End namespace diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index c41e0aa..2be43cd 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -341,17 +341,31 @@ public: /** * Publish a message to an exchange - * + * + * This method returns a reference to a DeferredPublisher object. You can use this returned + * object to install callbacks that are called when an undeliverable message is returned, or + * to set the callback that is called when the server confirms that the message was received. + * + * To enable handling returned messages, or to enable publisher-confirms, you must not only + * set the callback, but also pass in appropriate flags to enable this feature. If you do not + * pass in these flags, your callbacks will not be called. If you are not at all interested + * in returned messages or publish-confirms, you can ignore the flag and the returned object. + * + * Watch out: the channel returns the same DeferredPublisher object for all calls to the + * publish() method. This means that the callbacks that you install for the first published + * message are also used for subsequent messages _and_ it means that if you install a different + * callback for a later publish operation, it overwrites your earlier callbacks + * * @param exchange the exchange to publish to * @param routingkey the routing key * @param envelope the full envelope to send * @param message the message to send * @param size size of the message */ - bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); } - bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); } /** * Set the Quality of Service (QOS) for this channel diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 3f1577e..41d5742 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -44,6 +44,7 @@ class DeferredDelete; class DeferredCancel; class DeferredQueue; class DeferredGet; +class DeferredPublisher; class Connection; class Envelope; class Table; @@ -73,6 +74,12 @@ private: */ ErrorCallback _errorCallback; + /** + * Handler that deals with incoming messages as a result of publish operations + * @var std::shared_ptr + */ + std::shared_ptr _publisher; + /** * Handlers for all consumers that are active * @var std::map @@ -396,16 +403,16 @@ public: * Publish a message to an exchange * * If the mandatory or immediate flag is set, and the message could not immediately - * be published, the message will be returned to the client. However, the AMQP-CPP - * library does not yet report such returned messages. + * be published, the message will be returned to the client. * * @param exchange the exchange to publish to * @param routingkey the routing key * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @return DeferredPublisher */ - bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope); + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope); /** * Set the Quality of Service (QOS) of the entire connection diff --git a/include/amqpcpp/deferredconsumer.h b/include/amqpcpp/deferredconsumer.h index bd5e576..c8030e7 100644 --- a/include/amqpcpp/deferredconsumer.h +++ b/include/amqpcpp/deferredconsumer.h @@ -68,8 +68,7 @@ private: public: /** - * Protected constructor that can only be called - * from within the channel implementation + * Constructor that should only be called from within the channel implementation * * Note: this constructor _should_ be protected, but because make_shared * will then not work, we have decided to make it public after all, @@ -167,15 +166,46 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredConsumer &onBegin(const BeginCallback &callback) + DeferredConsumer &onBegin(const StartCallback &callback) { // store callback - _beginCallback = callback; + _startCallback = callback; // allow chaining return *this; } + /** + * Register the function that is called when the start frame of a new + * consumed message is received + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredConsumer &onStart(const StartCallback &callback) + { + // store callback + _startCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function that is called when the message size is known + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ + DeferredConsumer &onSize(const SizeCallback &callback) + { + // store callback + _sizeCallback = callback; + + // allow chaining + return *this; + } + /** * Register the function that is called when message headers come in * @@ -219,10 +249,25 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredConsumer &onComplete(const CompleteCallback &callback) + DeferredConsumer &onComplete(const DeliveredCallback &callback) { // store callback - _completeCallback = callback; + _deliveredCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a funtion to be called when a message was completely received + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredConsumer &onDelivered(const DeliveredCallback &callback) + { + // store callback + _deliveredCallback = callback; // allow chaining return *this; diff --git a/include/amqpcpp/deferredextreceiver.h b/include/amqpcpp/deferredextreceiver.h index af15bfe..3d427e3 100644 --- a/include/amqpcpp/deferredextreceiver.h +++ b/include/amqpcpp/deferredextreceiver.h @@ -50,6 +50,12 @@ protected: */ MessageCallback _messageCallback; + /** + * Callback for when a message was complete finished + * @var DeliveredCallback + */ + DeliveredCallback _deliveredCallback; + /** * Initialize the object to send out a message diff --git a/include/amqpcpp/deferredget.h b/include/amqpcpp/deferredget.h index 4ea8f1f..78332be 100644 --- a/include/amqpcpp/deferredget.h +++ b/include/amqpcpp/deferredget.h @@ -40,7 +40,7 @@ private: * Callback with the number of messages still in the queue * @var SizeCallback */ - SizeCallback _sizeCallback; + SizeCallback _countCallback; /** * Report success for a get operation @@ -90,58 +90,6 @@ public: DeferredExtReceiver(failed, channel) {} public: - /** - * Register the function to be called when a new message is expected - * - * @param callback The callback to invoke - * @return Same object for chaining - */ - DeferredGet &onBegin(const BeginCallback &callback) - { - // store callback - _beginCallback = callback; - - // allow chaining - return *this; - } - - /** - * Register the function to be called when message headers come in - * - * @param callback The callback to invoke for message headers - * @return Same object for chaining - */ - DeferredGet &onHeaders(const HeaderCallback &callback) - { - // store callback - _headerCallback = callback; - - // allow chaining - return *this; - } - - /** - * Register the function to be called when a chunk of data comes in - * - * Note that this function may be called zero, one or multiple times - * for each incoming message depending on the size of the message data. - * - * If you install this callback you very likely also want to install - * the onComplete callback so you know when the last data part was - * received. - * - * @param callback The callback to invoke for chunks of message data - * @return Same object for chaining - */ - DeferredGet &onData(const DataCallback &callback) - { - // store callback - _dataCallback = callback; - - // allow chaining - return *this; - } - /** * Register a function to be called when a message arrives * This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it @@ -198,13 +146,95 @@ public: } /** - * Register a function to be called when size information is known + * Register a function to be called when queue size information is known * @param callback the callback to execute */ + DeferredGet &onCount(const SizeCallback &callback) + { + // store callback + _countCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when a new message is expected + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredGet &onBegin(const StartCallback &callback) + { + // store callback + _startCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when a new message is expected + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredGet &onStart(const StartCallback &callback) + { + // store callback + _startCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function that is called when the message size is known + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ DeferredGet &onSize(const SizeCallback &callback) { // store callback _sizeCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when message headers come in + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ + DeferredGet &onHeaders(const HeaderCallback &callback) + { + // store callback + _headerCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register the function to be called when a chunk of data comes in + * + * Note that this function may be called zero, one or multiple times + * for each incoming message depending on the size of the message data. + * + * If you install this callback you very likely also want to install + * the onComplete callback so you know when the last data part was + * received. + * + * @param callback The callback to invoke for chunks of message data + * @return Same object for chaining + */ + DeferredGet &onData(const DataCallback &callback) + { + // store callback + _dataCallback = callback; // allow chaining return *this; @@ -216,10 +246,25 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredGet &onComplete(const CompleteCallback &callback) + DeferredGet &onComplete(const DeliveredCallback &callback) { // store callback - _completeCallback = callback; + _deliveredCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a funtion to be called when a message was completely received + * + * @param callback The callback to invoke + * @return Same object for chaining + */ + DeferredGet &onDelivered(const DeliveredCallback &callback) + { + // store callback + _deliveredCallback = callback; // allow chaining return *this; diff --git a/include/amqpcpp/deferredpublisher.h b/include/amqpcpp/deferredpublisher.h index 04f151c..77c2465 100644 --- a/include/amqpcpp/deferredpublisher.h +++ b/include/amqpcpp/deferredpublisher.h @@ -18,14 +18,58 @@ * Begin of namespace */ namespace AMQP { + +/** + * Forward declarations + */ +class ChannelImpl; /** * Class definition */ -class DeferredPublisher : public DeferredReceiver +class DeferredPublisher : public DeferredReceiver, public std::enable_shared_from_this { private: + /** + * The error code + * @var int16_t + */ + int16_t _code = 0; + + /** + * The error message + * @var std::string + */ + std::string _description; + /** + * Callback that is called when a message is returned + * @var BounceCallback + */ + BounceCallback _bounceCallback; + + /** + * Begin of a bounced message + * @var ReturnCallback + */ + ReturnCallback _beginCallback; + + /** + * End of a bounced message + * @var ReturnedCallback + */ + ReturnedCallback _completeCallback; + + /** + * Get reference to self to prevent that object falls out of scope + * @return std::shared_ptr + */ + virtual std::shared_ptr lock() override { return shared_from_this(); } + + /** + * Extended implementation of the complete method that is called when a message was fully received + */ + virtual void complete() override; public: /** @@ -38,7 +82,7 @@ public: * @param channel the channel implementation * @param failed are we already failed? */ - DeferredConsumer(ChannelImpl *channel, bool failed = false) : + DeferredPublisher(ChannelImpl *channel, bool failed = false) : DeferredReceiver(failed, channel) {} public: @@ -46,10 +90,10 @@ public: * Register a function to be called when a full message is returned * @param callback the callback to execute */ - DeferredConsumer &onReceived(const ReturnCallback &callback) + DeferredPublisher &onReceived(const BounceCallback &callback) { // store callback - _returnCallback = callback; + _bounceCallback = callback; // allow chaining return *this; @@ -59,10 +103,10 @@ public: * Alias for onReceived() (see above) * @param callback the callback to execute */ - DeferredConsumer &onMessage(const ReturnCallback &callback) + DeferredPublisher &onMessage(const BounceCallback &callback) { // store callback - _returnCallback = callback; + _bounceCallback = callback; // allow chaining return *this; @@ -72,36 +116,27 @@ public: * Alias for onReceived() (see above) * @param callback the callback to execute */ - DeferredConsumer &onReturned(const ReturnCallback &callback) + DeferredPublisher &onReturned(const BounceCallback &callback) { // store callback - _returnCallback = callback; + _bounceCallback = callback; // allow chaining return *this; } /** - * RabbitMQ sends a message in multiple frames to its consumers. - * The AMQP-CPP library collects these frames and merges them into a - * single AMQP::Message object that is passed to the callback that - * you can set with the onReceived() or onMessage() methods (see above). - * - * However, you can also write your own algorithm to merge the frames. - * In that case you can install callbacks to handle the frames. Every - * message is sent in a number of frames: - * - * - a begin frame that marks the start of the message - * - an optional header if the message was sent with an envelope - * - zero or more data frames (usually 1, but more for large messages) - * - an end frame to mark the end of the message. - * - * To install handlers for these frames, you can use the onBegin(), - * onHeaders(), onData() and onComplete() methods. - * - * If you just rely on the onReceived() or onMessage() callbacks, you - * do not need any of the methods below this line. + * Alias for onReceived() (see above) + * @param callback the callback to execute */ + DeferredPublisher &onBounced(const BounceCallback &callback) + { + // store callback + _bounceCallback = callback; + + // allow chaining + return *this; + } /** * Register the function that is called when the start frame of a new @@ -110,7 +145,7 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredConsumer &onBegin(const BeginCallback &callback) + DeferredPublisher &onBegin(const ReturnCallback &callback) { // store callback _beginCallback = callback; @@ -119,13 +154,28 @@ public: return *this; } + /** + * Register a function that is called when the message size is known + * + * @param callback The callback to invoke for message headers + * @return Same object for chaining + */ + DeferredPublisher &onSize(const SizeCallback &callback) + { + // store callback + _sizeCallback = callback; + + // allow chaining + return *this; + } + /** * Register the function that is called when message headers come in * * @param callback The callback to invoke for message headers * @return Same object for chaining */ - DeferredConsumer &onHeaders(const HeaderCallback &callback) + DeferredPublisher &onHeaders(const HeaderCallback &callback) { // store callback _headerCallback = callback; @@ -147,7 +197,7 @@ public: * @param callback The callback to invoke for chunks of message data * @return Same object for chaining */ - DeferredConsumer &onData(const DataCallback &callback) + DeferredPublisher &onData(const DataCallback &callback) { // store callback _dataCallback = callback; @@ -162,7 +212,7 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredConsumer &onComplete(const CompleteCallback &callback) + DeferredPublisher &onComplete(const ReturnedCallback &callback) { // store callback _completeCallback = callback; diff --git a/include/amqpcpp/deferredreceiver.h b/include/amqpcpp/deferredreceiver.h index e4d4562..caa489b 100644 --- a/include/amqpcpp/deferredreceiver.h +++ b/include/amqpcpp/deferredreceiver.h @@ -96,9 +96,15 @@ protected: /** * Callback for new message - * @var BeginCallback + * @var StartCallback */ - BeginCallback _beginCallback; + StartCallback _startCallback; + + /** + * Callback that is called when size of the message is known + * @var SizeCallback + */ + SizeCallback _sizeCallback; /** * Callback for incoming headers @@ -112,12 +118,6 @@ protected: */ DataCallback _dataCallback; - /** - * Callback for when a message was complete finished - * @var CompleteCallback - */ - CompleteCallback _completeCallback; - /** * The message that we are currently receiving * @var stack_ptr diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 95ae8f8..fff8873 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -451,26 +451,30 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @return DeferredPublisher */ -bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) +DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { // we are going to send out multiple frames, each one will trigger a call to the handler, // which in turn could destruct the channel object, we need to monitor that Monitor monitor(this); // @todo do not copy the entire buffer to individual frames + + // make sure we have a deferred object to return + if (!_publisher) _publisher.reset(new DeferredPublisher(this)); // send the publish frame - if (!send(BasicPublishFrame(_id, exchange, routingKey))) return false; + if (!send(BasicPublishFrame(_id, exchange, routingKey))) return *_publisher; // channel still valid? - if (!monitor.valid()) return false; + if (!monitor.valid()) return *_publisher; // send header - if (!send(BasicHeaderFrame(_id, envelope))) return false; + if (!send(BasicHeaderFrame(_id, envelope))) return *_publisher; // channel and connection still valid? - if (!monitor.valid() || !_connection) return false; + if (!monitor.valid() || !_connection) return *_publisher; // the max payload size is the max frame size minus the bytes for headers and trailer uint32_t maxpayload = _connection->maxPayload(); @@ -487,10 +491,10 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin uint64_t chunksize = std::min(static_cast(maxpayload), bytesleft); // send out a body frame - if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return false; + if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return *_publisher; // channel still valid? - if (!monitor.valid()) return false; + if (!monitor.valid()) return *_publisher; // update counters bytessent += chunksize; @@ -498,7 +502,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin } // done - return true; + return *_publisher; } /** diff --git a/src/deferredextreceiver.cpp b/src/deferredextreceiver.cpp index ca05185..0f2f047 100644 --- a/src/deferredextreceiver.cpp +++ b/src/deferredextreceiver.cpp @@ -26,7 +26,7 @@ namespace AMQP { void DeferredExtReceiver::initialize(const std::string &exchange, const std::string &routingkey) { // call base - DeferredExtReceiver::initialize(exchange, routingkey); + DeferredReceiver::initialize(exchange, routingkey); // do we have anybody interested in messages? in that case we construct the message if (_messageCallback) _message.construct(exchange, routingkey); @@ -44,7 +44,7 @@ void DeferredExtReceiver::complete() if (_message) _messageCallback(*_message, _deliveryTag, _redelivered); // do we have to inform anyone about completion? - if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); + if (_deliveredCallback) _deliveredCallback(_deliveryTag, _redelivered); // for the next iteration we want a new message _message.reset(); diff --git a/src/deferredget.cpp b/src/deferredget.cpp index 84a2337..e8985f1 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -34,7 +34,7 @@ const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messagecoun // report the size (note that this is the size _minus_ the message that is retrieved // (and for which the callback will be called later), so it could be zero) - if (_sizeCallback) _sizeCallback(messagecount); + if (_countCallback) _countCallback(messagecount); // return next handler return _next; @@ -47,7 +47,7 @@ const std::shared_ptr &DeferredGet::reportSuccess(uint32_t messagecoun const std::shared_ptr &DeferredGet::reportSuccess() const { // report the size - if (_sizeCallback) _sizeCallback(0); + if (_countCallback) _countCallback(0); // check if a callback was set if (_emptyCallback) _emptyCallback(); diff --git a/src/deferredpublisher.cpp b/src/deferredpublisher.cpp index 61af52a..72843f1 100644 --- a/src/deferredpublisher.cpp +++ b/src/deferredpublisher.cpp @@ -22,7 +22,7 @@ void DeferredPublisher::complete() Monitor monitor(_channel); // do we have a message? - if (_message) _bounceCallback(*_message, 0, ""); + if (_message) _bounceCallback(*_message, _code, _description); // do we have to inform anyone about completion? if (_completeCallback) _completeCallback(); diff --git a/src/deferredreceiver.cpp b/src/deferredreceiver.cpp index 251258a..f4e246c 100644 --- a/src/deferredreceiver.cpp +++ b/src/deferredreceiver.cpp @@ -28,7 +28,7 @@ namespace AMQP { void DeferredReceiver::initialize(const std::string &exchange, const std::string &routingkey) { // anybody interested in the new message? - if (_beginCallback) _beginCallback(exchange, routingkey); + if (_startCallback) _startCallback(exchange, routingkey); } /** @@ -43,6 +43,9 @@ void DeferredReceiver::process(BasicHeaderFrame &frame) // store the body size _bodySize = frame.bodySize(); + + // is user interested in the size? + if (_sizeCallback) _sizeCallback(_bodySize); // do we have a message? if (_message) diff --git a/src/includes.h b/src/includes.h index 122f2d0..89ae392 100644 --- a/src/includes.h +++ b/src/includes.h @@ -68,6 +68,7 @@ #include "amqpcpp/callbacks.h" #include "amqpcpp/deferred.h" #include "amqpcpp/deferredconsumer.h" +#include "amqpcpp/deferredpublisher.h" #include "amqpcpp/deferredqueue.h" #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h" From 1ccd93cc5e15d3fd3582432c5ec65a2dc7cdb20a Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 22:27:27 +0100 Subject: [PATCH 07/11] final step (although untested) for handling returned messages --- include/amqpcpp/channel.h | 21 ++++++++++++++++----- include/amqpcpp/channelimpl.h | 9 ++++++++- include/amqpcpp/deferredpublisher.h | 12 ++++++++++++ src/basicreturnframe.h | 21 ++++++++++++++++++--- src/channelimpl.cpp | 5 +++-- src/deferredpublisher.cpp | 26 +++++++++++++++++++++++++- 6 files changed, 82 insertions(+), 12 deletions(-) diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index 2be43cd..b42fc3a 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -342,6 +342,11 @@ public: /** * Publish a message to an exchange * + * You have to supply the name of an exchange and a routing key. RabbitMQ will then try + * to send the message to one or more queues. With the optional flags parameter you can + * specify what should happen if the message could not be routed to a queue. By default, + * unroutable message are silently discarded. + * * This method returns a reference to a DeferredPublisher object. You can use this returned * object to install callbacks that are called when an undeliverable message is returned, or * to set the callback that is called when the server confirms that the message was received. @@ -351,21 +356,27 @@ public: * pass in these flags, your callbacks will not be called. If you are not at all interested * in returned messages or publish-confirms, you can ignore the flag and the returned object. * - * Watch out: the channel returns the same DeferredPublisher object for all calls to the + * Watch out: the channel returns _the same_ DeferredPublisher object for all calls to the * publish() method. This means that the callbacks that you install for the first published * message are also used for subsequent messages _and_ it means that if you install a different * callback for a later publish operation, it overwrites your earlier callbacks * + * The following flags can be supplied: + * + * - mandatory If set, server returns messages that are not sent to a queue + * - immediate If set, server returns messages that can not immediately be forwarded to a consumer. + * * @param exchange the exchange to publish to * @param routingkey the routing key * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @param flags optional flags */ - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); } - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); } - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); } - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } /** * Set the Quality of Service (QOS) for this channel diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 41d5742..40d0603 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -410,9 +410,10 @@ public: * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @param flags optional flags * @return DeferredPublisher */ - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope); + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags); /** * Set the Quality of Service (QOS) of the entire connection @@ -707,6 +708,12 @@ public: * @return The handler responsible for the current message */ DeferredReceiver *receiver() const { return _receiver.get(); } + + /** + * Retrieve the deferred publisher that handles returned messages + * @return The deferred publisher object + */ + DeferredPublisher *publisher() const { return _publisher.get(); } /** * The channel class is its friend, thus can it instantiate this object diff --git a/include/amqpcpp/deferredpublisher.h b/include/amqpcpp/deferredpublisher.h index 77c2465..3c9fb17 100644 --- a/include/amqpcpp/deferredpublisher.h +++ b/include/amqpcpp/deferredpublisher.h @@ -60,6 +60,13 @@ private: */ ReturnedCallback _completeCallback; + /** + * Process a return frame + * + * @param frame The frame to process + */ + void process(BasicReturnFrame &frame); + /** * Get reference to self to prevent that object falls out of scope * @return std::shared_ptr @@ -70,6 +77,11 @@ private: * Extended implementation of the complete method that is called when a message was fully received */ virtual void complete() override; + + /** + * Classes that can access private members + */ + friend class BasicReturnFrame; public: /** diff --git a/src/basicreturnframe.h b/src/basicreturnframe.h index e140e9c..f14ff65 100644 --- a/src/basicreturnframe.h +++ b/src/basicreturnframe.h @@ -1,7 +1,7 @@ /** * Class describing a basic return frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -155,8 +155,23 @@ public: */ virtual bool process(ConnectionImpl *connection) override { - // we no longer support returned messages - return false; + // we need the appropriate channel + auto channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // get the current publisher + auto publisher = channel->publisher(); + + // if there is no deferred publisher, we can just as well stop + if (publisher == nullptr) return false; + + // initialize the object, because we're about to receive a message + publisher->process(*this); + + // done + return true; } }; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index fff8873..053e479 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -451,9 +451,10 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @param flags * @return DeferredPublisher */ -DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) +DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) { // we are going to send out multiple frames, each one will trigger a call to the handler, // which in turn could destruct the channel object, we need to monitor that @@ -465,7 +466,7 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std:: if (!_publisher) _publisher.reset(new DeferredPublisher(this)); // send the publish frame - if (!send(BasicPublishFrame(_id, exchange, routingKey))) return *_publisher; + if (!send(BasicPublishFrame(_id, exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return *_publisher; // channel still valid? if (!monitor.valid()) return *_publisher; diff --git a/src/deferredpublisher.cpp b/src/deferredpublisher.cpp index 72843f1..f25b167 100644 --- a/src/deferredpublisher.cpp +++ b/src/deferredpublisher.cpp @@ -7,12 +7,34 @@ * @copyright 2018 Copernica BV */ #include "includes.h" +#include "basicreturnframe.h" /** * Begin of namespace */ namespace AMQP { +/** + * Process a return frame + * + * @param frame The frame to process + */ +void DeferredPublisher::process(BasicReturnFrame &frame) +{ + // this object will handle all future frames with header and body data + _channel->install(shared_from_this()); + + // retrieve the delivery tag and whether we were redelivered + _code = frame.replyCode(); + _description = frame.replyText(); + + // notify user space of the begin of the returned message + if (_beginCallback) _beginCallback(_code, _description); + + // initialize the object for the next message + initialize(frame.exchange(), frame.routingKey()); +} + /** * Indicate that a message was done */ @@ -29,6 +51,9 @@ void DeferredPublisher::complete() // for the next iteration we want a new message _message.reset(); + + // the description can be thrown away too + _description.clear(); // do we still have a valid channel if (!monitor.valid()) return; @@ -42,4 +67,3 @@ void DeferredPublisher::complete() */ } - From 41239a1952d5a103e0e89b244ec5409d68626490 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 22:54:50 +0100 Subject: [PATCH 08/11] fixed to make returning message functional, and added test code --- include/amqpcpp.h | 1 + src/deferredpublisher.cpp | 3 +++ tests/address.cpp | 2 ++ tests/myconnection.cpp | 21 +++++++++++++++------ 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/include/amqpcpp.h b/include/amqpcpp.h index 4d7735a..9d0e5a7 100644 --- a/include/amqpcpp.h +++ b/include/amqpcpp.h @@ -71,6 +71,7 @@ #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h" #include "amqpcpp/deferredget.h" +#include "amqpcpp/deferredpublisher.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" #include "amqpcpp/login.h" diff --git a/src/deferredpublisher.cpp b/src/deferredpublisher.cpp index f25b167..b75e6df 100644 --- a/src/deferredpublisher.cpp +++ b/src/deferredpublisher.cpp @@ -33,6 +33,9 @@ void DeferredPublisher::process(BasicReturnFrame &frame) // initialize the object for the next message initialize(frame.exchange(), frame.routingKey()); + + // do we have anybody interested in messages? in that case we construct the message + if (_bounceCallback) _message.construct(frame.exchange(), frame.routingKey()); } /** diff --git a/tests/address.cpp b/tests/address.cpp index 2145931..0de9a15 100644 --- a/tests/address.cpp +++ b/tests/address.cpp @@ -18,6 +18,7 @@ * @param argv * @return int */ +/* int main(int argc, const char *argv[]) { // iterate over the arguments @@ -37,3 +38,4 @@ int main(int argc, const char *argv[]) // done return 0; } +*/ \ No newline at end of file diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index 96f3580..8875d02 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -30,7 +30,7 @@ MyConnection::MyConnection(const std::string &ip) : _socket(Event::MainLoop::instance(), this) { // start connecting - if (_socket.connect(Network::Ipv4Address(ip), 5672)) return; + if (_socket.connect(Dns::IpAddress(ip), 5672)) return; // failure onFailure(&_socket); @@ -96,21 +96,30 @@ void MyConnection::onConnected(Network::TcpSocket *socket) std::cout << "queue declared" << std::endl; // start consuming - _channel->consume("my_queue").onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { - std::cout << "received: " << message.message() << std::endl; + _channel->consume("my_queue").onReceived([this](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { + std::cout << "consumed from exchange " << message.exchange() << " " << message.routingkey() << ": " << std::string(message.body(), message.bodySize()) << std::endl; + _channel->ack(deliveryTag); }); }); // declare an exchange - _channel->declareExchange().onSuccess([]() { + _channel->declareExchange("my_exchange", AMQP::direct).onSuccess([]() { std::cout << "exchange declared" << std::endl; }); // bind queue and exchange _channel->bindQueue("my_exchange", "my_queue", "key").onSuccess([this]() { std::cout << "queue bound to exchange" << std::endl; + + // callback for returns + auto callback = [](const AMQP::Message &message, int16_t code, const std::string &description) { - _channel->publish("my_exchange", "key", "just a message"); + std::cout << "message was returned: " << code << " " << description << ": " << std::string(message.body(), message.bodySize()) << std::endl; + + }; + + _channel->publish("my_exchange", "key", "just a message", AMQP::mandatory).onReturned(callback); + _channel->publish("my_exchange", "unknown key", "just another message", AMQP::mandatory).onReturned(callback); }); } @@ -156,7 +165,7 @@ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer) if (!_connection) return; // let the data be handled by the connection - size_t bytes = _connection->parse(buffer->data(), buffer->size()); + size_t bytes = _connection->parse(buffer->buffer(), buffer->size()); // shrink the buffer buffer->shrink(bytes); From 2a0a6f3fb71a0aa96a72f4550e9072604b989c28 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 23:13:07 +0100 Subject: [PATCH 09/11] update cmake stuff --- src/CMakeLists.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6cc8e1a..a16912f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -44,7 +44,9 @@ add_sources( consumedmessage.h deferredcancel.cpp deferredconsumer.cpp - deferredconsumerbase.cpp + deferredreceiver.cpp + deferredextreceiver.cpp + deferredpublisher.cpp deferredget.cpp exchangebindframe.h exchangebindokframe.h From 52c71ac1688e99583fb3fcd7674f7d897b642496 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 23:27:20 +0100 Subject: [PATCH 10/11] update readme --- README.md | 65 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 715e6a5..33538a9 100644 --- a/README.md +++ b/README.md @@ -799,44 +799,54 @@ in almost any form: ````c++ /** * Publish a message to an exchange - * - * The following flags can be used - * - * - mandatory if set, an unroutable message will be sent back to - * the client (currently not supported) - * - * - immediate if set, a message that could not immediately be consumed - * is returned to the client (currently not supported) - * - * If either of the two flags is set, and the message could not immediately - * be published, the message is returned by the server to the client. However, - * at this moment in time, the AMQP-CPP library does not support catching - * such returned messages. - * + * + * You have to supply the name of an exchange and a routing key. RabbitMQ will + * then try to send the message to one or more queues. With the optional flags + * parameter you can specify what should happen if the message could not be routed + * to a queue. By default, unroutable message are silently discarded. + * + * This method returns a reference to a DeferredPublisher object. You can use + * this returned object to install callbacks that are called when an undeliverable + * message is returned, or to set the callback that is called when the server + * confirms that the message was received. + * + * To enable handling returned messages, or to enable publisher-confirms, you must + * not only set the callback, but also pass in appropriate flags to enable this + * feature. If you do not pass in these flags, your callbacks will not be called. + * If you are not at all interested in returned messages or publish-confirms, you + * can ignore the flag and the returned object. + * + * Watch out: the channel returns _the same_ DeferredPublisher object for all + * calls to the publish() method. This means that the callbacks that you install + * for the first published message are also used for subsequent messages _and_ + * it means that if you install a different callback for a later publish + * operation, it overwrites your earlier callbacks + * + * The following flags can be supplied: + * + * - mandatory If set, server returns messages that are not sent to a queue + * - immediate If set, server returns messages that can not immediately be forwarded to a consumer. + * * @param exchange the exchange to publish to * @param routingkey the routing key - * @param flags optional flags (see above) * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @param flags optional flags */ -bool publish(const std::string &exchange, const std::string &routingKey, int flags, const AMQP::Envelope &envelope); -bool publish(const std::string &exchange, const std::string &routingKey, const AMQP::Envelope &envelope); -bool publish(const std::string &exchange, const std::string &routingKey, int flags, const std::string &message); -bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message); -bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size); -bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size); +DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); } +DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } +DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); } +DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } ```` Published messages are normally not confirmed by the server, and the RabbitMQ will not send a report back to inform you whether the message was succesfully -published or not. Therefore the publish method does not return a Deferred -object. +published or not. But with the flags you can instruct RabbitMQ to send back +the message if it was undeliverable. -As long as no error is reported via the Channel::onError() method, you can safely -assume that your messages were delivered. - -This can of course be a problem when you are publishing many messages. If you get +You can also use transactions to ensure that your messages get delivered. +Let's say that you are publishing many messages in a row. If you get an error halfway through there is no way to know for sure how many messages made it to the broker and how many should be republished. If this is important, you can wrap the publish commands inside a transaction. In this case, if an error occurs, @@ -1002,7 +1012,6 @@ need additional attention: - ability to set up secure connections (or is this fully done on the IO level) - login with other protocols than login/password - publish confirms - - returned messages We also need to add more safety checks so that strange or invalid data from RabbitMQ does not break the library (although in reality RabbitMQ only sends From 86262b1024f8caab621857a6e298852a8fa3b48f Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Fri, 2 Mar 2018 08:36:00 +0100 Subject: [PATCH 11/11] update version number in makefile --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 9af8712..c4503c1 100644 --- a/Makefile +++ b/Makefile @@ -2,8 +2,8 @@ PREFIX ?= /usr INCLUDE_DIR = ${PREFIX}/include LIBRARY_DIR = ${PREFIX}/lib export LIBRARY_NAME = amqpcpp -export SONAME = 2.8 -export VERSION = 2.8.0 +export SONAME = 3.0 +export VERSION = 3.0.0 all: $(MAKE) -C src all