From e0feb17ecc3de878e2e69fefa909cc436155936a Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Tue, 27 Feb 2018 05:08:21 +0100 Subject: [PATCH] renamed deferredconsumerbase into deferredreceiver, because it is not only a base class for the consumer, but also for other receiving operations: get requests and in the future also for returned messages --- include/amqpcpp/channelimpl.h | 20 +-- include/amqpcpp/deferredconsumer.h | 8 +- include/amqpcpp/deferredconsumerbase.h | 166 ------------------------- include/amqpcpp/deferredget.h | 8 +- include/amqpcpp/message.h | 6 +- src/basicgetokframe.h | 11 +- src/basicheaderframe.h | 14 ++- src/bodyframe.h | 16 ++- src/channelimpl.cpp | 14 +-- src/deferredconsumerbase.cpp | 142 --------------------- 10 files changed, 56 insertions(+), 349 deletions(-) delete mode 100644 include/amqpcpp/deferredconsumerbase.h delete mode 100644 src/deferredconsumerbase.cpp diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 3d90266..c5b788b 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -34,7 +34,7 @@ namespace AMQP { /** * Forward declarations */ -class DeferredConsumerBase; +class DeferredReceiver; class BasicDeliverFrame; class DeferredConsumer; class BasicGetOKFrame; @@ -75,9 +75,9 @@ private: /** * Handlers for all consumers that are active - * @var std::map + * @var std::map */ - std::map> _consumers; + std::map> _consumers; /** * Pointer to the oldest deferred result (the first one that is going @@ -129,10 +129,10 @@ private: bool _synchronous = false; /** - * The current consumer receiving a message - * @var std::shared_ptr + * The current object that is busy receiving a message + * @var std::shared_ptr */ - std::shared_ptr _consumer; + std::shared_ptr _receiver; /** * Attach the connection @@ -664,13 +664,13 @@ public: * @param consumer The consumer handler * @param active Is this the new active consumer */ - void install(std::string consumertag, const std::shared_ptr &consumer, bool active = false) + void install(std::string consumertag, const std::shared_ptr &consumer, bool active = false) { // install the consumer handler _consumers[consumertag] = consumer; // should we become the current consumer? - if (active) _consumer = consumer; + if (active) _receiver = consumer; } /** @@ -691,11 +691,11 @@ public: void process(BasicDeliverFrame &frame); /** - * Retrieve the current consumer handler + * Retrieve the current object that is receiving a message * * @return The handler responsible for the current message */ - DeferredConsumerBase *consumer(); + DeferredReceiver *receiver(); /** * Mark the current consumer as done diff --git a/include/amqpcpp/deferredconsumer.h b/include/amqpcpp/deferredconsumer.h index d7d7bf8..bc260d5 100644 --- a/include/amqpcpp/deferredconsumer.h +++ b/include/amqpcpp/deferredconsumer.h @@ -3,7 +3,7 @@ * * Deferred callback for consumers * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -14,7 +14,7 @@ /** * Dependencies */ -#include "deferredconsumerbase.h" +#include "deferredreceiver.h" /** * Set up namespace @@ -24,7 +24,7 @@ namespace AMQP { /** * We extend from the default deferred and add extra functionality */ -class DeferredConsumer : public DeferredConsumerBase +class DeferredConsumer : public DeferredReceiver { private: /** @@ -68,7 +68,7 @@ public: * @param failed are we already failed? */ DeferredConsumer(ChannelImpl *channel, bool failed = false) : - DeferredConsumerBase(failed, channel) {} + DeferredReceiver(failed, channel) {} public: /** diff --git a/include/amqpcpp/deferredconsumerbase.h b/include/amqpcpp/deferredconsumerbase.h deleted file mode 100644 index 5b3bbdb..0000000 --- a/include/amqpcpp/deferredconsumerbase.h +++ /dev/null @@ -1,166 +0,0 @@ -/** - * deferredconsumerbase.h - * - * Base class for the deferred consumer and the - * deferred get. - * - * @copyright 2016 - 2017 Copernica B.V. - */ - -/** - * Include guard - */ -#pragma once - -/** - * Dependencies - */ -#include "deferred.h" -#include "stack_ptr.h" -#include "message.h" - -/** - * Start namespace - */ -namespace AMQP { - -/** - * Forward declarations - */ -class BasicDeliverFrame; -class BasicGetOKFrame; -class BasicHeaderFrame; -class BodyFrame; - -/** - * Base class for deferred consumers - */ -class DeferredConsumerBase : - public Deferred, - public std::enable_shared_from_this -{ -private: - /** - * Size of the body of the current message - * @var uint64_t - */ - uint64_t _bodySize = 0; - - /** - * Process a delivery frame - * - * @param frame The frame to process - */ - void process(BasicDeliverFrame &frame); - - /** - * Process a delivery frame from a get request - * - * @param frame The frame to process - */ - void process(BasicGetOKFrame &frame); - - /** - * Process the message headers - * - * @param frame The frame to process - */ - void process(BasicHeaderFrame &frame); - - /** - * Process the message data - * - * @param frame The frame to process - */ - void process(BodyFrame &frame); - - /** - * Indicate that a message was done - */ - void complete(); - - /** - * Announce that a message has been received - * @param message The message to announce - * @param deliveryTag The delivery tag (for ack()ing) - * @param redelivered Is this a redelivered message - */ - virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0; - - /** - * Frames may be processed - */ - friend class ChannelImpl; - friend class BasicDeliverFrame; - friend class BasicGetOKFrame; - friend class BasicHeaderFrame; - friend class BodyFrame; -protected: - /** - * The delivery tag for the current message - * @var uint64_t - */ - uint64_t _deliveryTag = 0; - - /** - * Is this a redelivered message - * @var bool - */ - bool _redelivered = false; - - /** - * The channel to which the consumer is linked - * @var ChannelImpl - */ - ChannelImpl *_channel; - - /** - * Callback for new message - * @var BeginCallback - */ - BeginCallback _beginCallback; - - /** - * Callback for incoming headers - * @var HeaderCallback - */ - HeaderCallback _headerCallback; - - /** - * Callback for when a chunk of data comes in - * @var DataCallback - */ - DataCallback _dataCallback; - - /** - * Callback for incoming messages - * @var MessageCallback - */ - MessageCallback _messageCallback; - - /** - * Callback for when a message was complete finished - * @var CompleteCallback - */ - CompleteCallback _completeCallback; - - /** - * The message that we are currently receiving - * @var stack_ptr - */ - stack_ptr _message; - - /** - * Constructor - * - * @param failed Have we already failed? - * @param channel The channel we are consuming on - */ - DeferredConsumerBase(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {} -public: -}; - -/** - * End namespace - */ -} diff --git a/include/amqpcpp/deferredget.h b/include/amqpcpp/deferredget.h index 4270d34..71fadbc 100644 --- a/include/amqpcpp/deferredget.h +++ b/include/amqpcpp/deferredget.h @@ -2,7 +2,7 @@ * DeferredGet.h * * @author Emiel Bruijntjes - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -13,7 +13,7 @@ /** * Dependencies */ -#include "deferredconsumerbase.h" +#include "deferredreceiver.h" /** * Set up namespace @@ -27,7 +27,7 @@ namespace AMQP { * it grabs a self-pointer when the callback is running, otherwise the onFinalize() * is called before the actual message is consumed. */ -class DeferredGet : public DeferredConsumerBase +class DeferredGet : public DeferredReceiver { private: /** @@ -84,7 +84,7 @@ public: * @param failed are we already failed? */ DeferredGet(ChannelImpl *channel, bool failed = false) : - DeferredConsumerBase(failed, channel) {} + DeferredReceiver(failed, channel) {} public: /** diff --git a/include/amqpcpp/message.h b/include/amqpcpp/message.h index 5b1bb02..7c8f59e 100644 --- a/include/amqpcpp/message.h +++ b/include/amqpcpp/message.h @@ -7,7 +7,7 @@ * Message objects can not be constructed by end users, they are only constructed * by the AMQP library, and passed to user callbacks. * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -31,7 +31,7 @@ namespace AMQP { /** * Forward declarations */ -class DeferredConsumerBase; +class DeferredReceiver; /** * Class definition @@ -61,7 +61,7 @@ protected: /** * We are an open book to the consumer handler */ - friend class DeferredConsumerBase; + friend class DeferredReceiver; /** * Set the body size diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index 0de5ea9..e90ae5e 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -1,7 +1,7 @@ /** * Class describing a basic get ok frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -173,11 +173,14 @@ public: // report success for the get operation channel->reportSuccess(messageCount(), deliveryTag(), redelivered()); - // check if we have a valid consumer - if (!channel->consumer()) return false; + // get the current receiver object + auto *receiver = channel->receiver(); + + // check if we have a valid receiver + if (receiver == nullptr) return false; // pass on to consumer - channel->consumer()->process(*this); + receiver->process(*this); // done return true; diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index c6ec1c4..bc60a17 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -1,7 +1,7 @@ /** * Class describing an AMQP basic header frame * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -16,7 +16,7 @@ #include "amqpcpp/metadata.h" #include "amqpcpp/envelope.h" #include "amqpcpp/connectionimpl.h" -#include "amqpcpp/deferredconsumerbase.h" +#include "amqpcpp/deferredreceiver.h" /** * Set up namespace @@ -134,12 +134,18 @@ public: { // we need the appropriate channel auto channel = connection->channel(this->channel()); + + // we need a channel + if (channel == nullptr) return false; + + // do we have an object that is receiving this data? + auto *receiver = channel->receiver(); // check if we have a valid channel and consumer - if (!channel || !channel->consumer()) return false; + if (receiver == nullptr) return false; // the channel can process the frame - channel->consumer()->process(*this); + receiver->process(*this); // done return true; diff --git a/src/bodyframe.h b/src/bodyframe.h index ae1621d..5dc4920 100644 --- a/src/bodyframe.h +++ b/src/bodyframe.h @@ -1,7 +1,7 @@ /** * Class describing an AMQP Body Frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -14,7 +14,7 @@ */ #include "extframe.h" #include "amqpcpp/connectionimpl.h" -#include "amqpcpp/deferredconsumerbase.h" +#include "amqpcpp/deferredreceiver.h" /** * Set up namespace @@ -105,12 +105,18 @@ public: { // we need the appropriate channel auto channel = connection->channel(this->channel()); + + // we must have a channel object + if (channel == nullptr) return false; + + // get the object that is receiving the messages + auto *receiver = channel->receiver(); - // check if we have a valid channel and consumer - if (!channel || !channel->consumer()) return false; + // check if we have a valid receiver + if (receiver == nullptr) return false; // the consumer may process the frame - channel->consumer()->process(*this); + receiver->process(*this); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index a498864..a5c17c6 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -828,20 +828,20 @@ void ChannelImpl::process(BasicDeliverFrame &frame) // we are going to be receiving a message, store // the handler for the incoming message - _consumer = iter->second; + _receiver = iter->second; // let the consumer process the frame - _consumer->process(frame); + _receiver->process(frame); } /** - * Retrieve the current consumer handler + * Retrieve the current receiver handler * * @return The handler responsible for the current message */ -DeferredConsumerBase *ChannelImpl::consumer() +DeferredReceiver *ChannelImpl::receiver() { - return _consumer.get(); + return _receiver.get(); } /** @@ -849,8 +849,8 @@ DeferredConsumerBase *ChannelImpl::consumer() */ void ChannelImpl::complete() { - // no more consumer - _consumer.reset(); + // no more receiver + _receiver.reset(); } /** diff --git a/src/deferredconsumerbase.cpp b/src/deferredconsumerbase.cpp deleted file mode 100644 index 09ed0e4..0000000 --- a/src/deferredconsumerbase.cpp +++ /dev/null @@ -1,142 +0,0 @@ -/** - * deferredconsumerbase.cpp - * - * Base class for the deferred consumer and the - * deferred get. - * - * @copyright 2016 - 2017 Copernica B.V. - */ - -/** - * Dependencies - */ -#include "amqpcpp/deferredconsumerbase.h" -#include "basicdeliverframe.h" -#include "basicgetokframe.h" -#include "basicheaderframe.h" -#include "bodyframe.h" - -/** - * Start namespace - */ -namespace AMQP { - -/** - * Process a delivery frame - * - * @param frame The frame to process - */ -void DeferredConsumerBase::process(BasicDeliverFrame &frame) -{ - // retrieve the delivery tag and whether we were redelivered - _deliveryTag = frame.deliveryTag(); - _redelivered = frame.redelivered(); - - // anybody interested in the new message? - if (_beginCallback) _beginCallback(); - - // do we have anybody interested in messages? - if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); -} - -/** - * Process a delivery frame from a get request - * - * @param frame The frame to process - */ -void DeferredConsumerBase::process(BasicGetOKFrame &frame) -{ - // retrieve the delivery tag and whether we were redelivered - _deliveryTag = frame.deliveryTag(); - _redelivered = frame.redelivered(); - - // anybody interested in the new message? - if (_beginCallback) _beginCallback(); - - // do we have anybody interested in messages? - if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey()); -} - -/** - * Process the message headers - * - * @param frame The frame to process - */ -void DeferredConsumerBase::process(BasicHeaderFrame &frame) -{ - // store the body size - _bodySize = frame.bodySize(); - - // do we have a message? - if (_message) - { - // store the body size and metadata - _message->setBodySize(_bodySize); - _message->set(frame.metaData()); - } - - // anybody interested in the headers? - if (_headerCallback) _headerCallback(frame.metaData()); - - // no body data expected? then we are now complete - if (!_bodySize) complete(); -} - -/** - * Process the message data - * - * @param frame The frame to process - */ -void DeferredConsumerBase::process(BodyFrame &frame) -{ - // make sure we stay in scope - auto self = shared_from_this(); - - // update the bytes still to receive - _bodySize -= frame.payloadSize(); - - // anybody interested in the data? - if (_dataCallback) _dataCallback(frame.payload(), frame.payloadSize()); - - // do we have a message? then append the data - if (_message) _message->append(frame.payload(), frame.payloadSize()); - - // if all bytes were received we are now complete - if (!_bodySize) complete(); -} - -/** - * Indicate that a message was done - */ -void DeferredConsumerBase::complete() -{ - // make sure we stay in scope - auto self = shared_from_this(); - - // also monitor the channel - Monitor monitor{ _channel }; - - // do we have a message? - if (_message) - { - // announce the message - announce(*_message, _deliveryTag, _redelivered); - - // and destroy it - _message.reset(); - } - - // do we have to inform anyone about completion? - if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); - - // do we still have a valid channel - if (!monitor.valid()) return; - - // we are now done executing - _channel->complete(); -} - -/** - * End namespace - */ -}