From ef76876d67c3aa92647a7c7a909cb0fc8cacabd1 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 18:07:18 +0100 Subject: [PATCH] refactored code to make room for a deferredpublisher class (which will also use the deferredreceiver base class) --- include/amqpcpp/deferredconsumer.h | 6 +- include/amqpcpp/deferredextreceiver.h | 85 +++++++++++++++++++++++++++ include/amqpcpp/deferredget.h | 6 +- include/amqpcpp/deferredreceiver.h | 33 ++++------- src/consumedmessage.h | 5 ++ src/deferredextreceiver.cpp | 64 ++++++++++++++++++++ src/deferredget.cpp | 2 +- src/deferredreceiver.cpp | 27 --------- 8 files changed, 171 insertions(+), 57 deletions(-) create mode 100644 include/amqpcpp/deferredextreceiver.h create mode 100644 src/deferredextreceiver.cpp diff --git a/include/amqpcpp/deferredconsumer.h b/include/amqpcpp/deferredconsumer.h index 732980f..bd5e576 100644 --- a/include/amqpcpp/deferredconsumer.h +++ b/include/amqpcpp/deferredconsumer.h @@ -14,7 +14,7 @@ /** * Dependencies */ -#include "deferredreceiver.h" +#include "deferredextreceiver.h" /** * Set up namespace @@ -29,7 +29,7 @@ class BasicDeliverFrame; /** * We extend from the default deferred and add extra functionality */ -class DeferredConsumer : public DeferredReceiver, public std::enable_shared_from_this +class DeferredConsumer : public DeferredExtReceiver, public std::enable_shared_from_this { private: /** @@ -79,7 +79,7 @@ public: * @param failed are we already failed? */ DeferredConsumer(ChannelImpl *channel, bool failed = false) : - DeferredReceiver(failed, channel) {} + DeferredExtReceiver(failed, channel) {} public: /** diff --git a/include/amqpcpp/deferredextreceiver.h b/include/amqpcpp/deferredextreceiver.h new file mode 100644 index 0000000..af15bfe --- /dev/null +++ b/include/amqpcpp/deferredextreceiver.h @@ -0,0 +1,85 @@ +/** + * DeferredExtReceiver.h + * + * Extended receiver that _wants_ to receive message (because it is + * consuming or get'ting messages. This is the base class for both + * the DeferredConsumer as well as the DeferredGet classes, but not + * the base of the DeferredPublisher (which can also receive returned + * messages, but not as a result of an explicit request) + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "deferredreceiver.h" + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class DeferredExtReceiver : public DeferredReceiver +{ +protected: + /** + * The delivery tag for the current message + * @var uint64_t + */ + uint64_t _deliveryTag = 0; + + /** + * Is this a redelivered message + * @var bool + */ + bool _redelivered = false; + + /** + * Callback for incoming messages + * @var MessageCallback + */ + MessageCallback _messageCallback; + + + /** + * Initialize the object to send out a message + * @param exchange the exchange to which the message was published + * @param routingkey the routing key that was used to publish the message + */ + virtual void initialize(const std::string &exchange, const std::string &routingkey) override; + + /** + * Indicate that a message was done + */ + virtual void complete() override; + + /** + * Constructor + * @param failed Have we already failed? + * @param channel The channel we are consuming on + */ + DeferredExtReceiver(bool failed, ChannelImpl *channel) : + DeferredReceiver(failed, channel) {} + +public: + /** + * Destructor + */ + virtual ~DeferredExtReceiver() = default; +}; + +/** + * End of namespace + */ +} + diff --git a/include/amqpcpp/deferredget.h b/include/amqpcpp/deferredget.h index 4997e93..4ea8f1f 100644 --- a/include/amqpcpp/deferredget.h +++ b/include/amqpcpp/deferredget.h @@ -13,7 +13,7 @@ /** * Dependencies */ -#include "deferredreceiver.h" +#include "deferredextreceiver.h" /** * Set up namespace @@ -27,7 +27,7 @@ namespace AMQP { * it grabs a self-pointer when the callback is running, otherwise the onFinalize() * is called before the actual message is consumed. */ -class DeferredGet : public DeferredReceiver, public std::enable_shared_from_this +class DeferredGet : public DeferredExtReceiver, public std::enable_shared_from_this { private: /** @@ -87,7 +87,7 @@ public: * @param failed are we already failed? */ DeferredGet(ChannelImpl *channel, bool failed = false) : - DeferredReceiver(failed, channel) {} + DeferredExtReceiver(failed, channel) {} public: /** diff --git a/include/amqpcpp/deferredreceiver.h b/include/amqpcpp/deferredreceiver.h index 3943dc2..e4d4562 100644 --- a/include/amqpcpp/deferredreceiver.h +++ b/include/amqpcpp/deferredreceiver.h @@ -51,18 +51,18 @@ protected: * @param exchange the exchange to which the message was published * @param routingkey the routing key that was used to publish the message */ - void initialize(const std::string &exchange, const std::string &routingkey); + virtual void initialize(const std::string &exchange, const std::string &routingkey); /** * Get reference to self to prevent that object falls out of scope * @return std::shared_ptr */ virtual std::shared_ptr lock() = 0; - + /** * Indicate that a message was done */ - virtual void complete(); + virtual void complete() = 0; private: /** @@ -88,18 +88,6 @@ private: friend class BodyFrame; protected: - /** - * The delivery tag for the current message - * @var uint64_t - */ - uint64_t _deliveryTag = 0; - - /** - * Is this a redelivered message - * @var bool - */ - bool _redelivered = false; - /** * The channel to which the consumer is linked * @var ChannelImpl @@ -124,12 +112,6 @@ protected: */ DataCallback _dataCallback; - /** - * Callback for incoming messages - * @var MessageCallback - */ - MessageCallback _messageCallback; - /** * Callback for when a message was complete finished * @var CompleteCallback @@ -144,12 +126,17 @@ protected: /** * Constructor - * * @param failed Have we already failed? * @param channel The channel we are consuming on */ - DeferredReceiver(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {} + DeferredReceiver(bool failed, ChannelImpl *channel) : + Deferred(failed), _channel(channel) {} + public: + /** + * Destructor + */ + virtual ~DeferredReceiver() = default; }; /** diff --git a/src/consumedmessage.h b/src/consumedmessage.h index bba72c4..21f4a6c 100644 --- a/src/consumedmessage.h +++ b/src/consumedmessage.h @@ -4,6 +4,11 @@ * @copyright 2014 - 2018 Copernica BV */ +/** + * Dependencies + */ +#include "basicdeliverframe.h" + /** * Set up namespace */ diff --git a/src/deferredextreceiver.cpp b/src/deferredextreceiver.cpp new file mode 100644 index 0000000..ca05185 --- /dev/null +++ b/src/deferredextreceiver.cpp @@ -0,0 +1,64 @@ +/** + * DeferredExtReceiver.cpp + * + * Implementation file for the DeferredExtReceiver class + * + * @author Emiel Bruijntjes + * @copyright 2018 Copernica BV + */ + +/** + * Dependencies + */ +#include "amqpcpp/deferredextreceiver.h" +#include "amqpcpp/channelimpl.h" + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Initialize the object to send out a message + * @param exchange the exchange to which the message was published + * @param routingkey the routing key that was used to publish the message + */ +void DeferredExtReceiver::initialize(const std::string &exchange, const std::string &routingkey) +{ + // call base + DeferredExtReceiver::initialize(exchange, routingkey); + + // do we have anybody interested in messages? in that case we construct the message + if (_messageCallback) _message.construct(exchange, routingkey); +} + +/** + * Indicate that a message was done + */ +void DeferredExtReceiver::complete() +{ + // also monitor the channel + Monitor monitor(_channel); + + // do we have a message? + if (_message) _messageCallback(*_message, _deliveryTag, _redelivered); + + // do we have to inform anyone about completion? + if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); + + // for the next iteration we want a new message + _message.reset(); + + // do we still have a valid channel + if (!monitor.valid()) return; + + // we are now done executing, so the channel can forget the current receiving object + _channel->install(nullptr); +} + +/** + * End of namespace + */ +} + + diff --git a/src/deferredget.cpp b/src/deferredget.cpp index c1bb6ab..84a2337 100644 --- a/src/deferredget.cpp +++ b/src/deferredget.cpp @@ -65,7 +65,7 @@ void DeferredGet::complete() _channel->onSynchronized(); // pass on to normal implementation - DeferredReceiver::complete(); + DeferredExtReceiver::complete(); } /** diff --git a/src/deferredreceiver.cpp b/src/deferredreceiver.cpp index 52e3766..251258a 100644 --- a/src/deferredreceiver.cpp +++ b/src/deferredreceiver.cpp @@ -29,9 +29,6 @@ void DeferredReceiver::initialize(const std::string &exchange, const std::string { // anybody interested in the new message? if (_beginCallback) _beginCallback(exchange, routingkey); - - // do we have anybody interested in messages? in that case we construct the message - if (_messageCallback) _message.construct(exchange, routingkey); } /** @@ -85,30 +82,6 @@ void DeferredReceiver::process(BodyFrame &frame) if (_bodySize == 0) complete(); } -/** - * Indicate that a message was done - */ -void DeferredReceiver::complete() -{ - // also monitor the channel - Monitor monitor(_channel); - - // do we have a message? - if (_message) _messageCallback(*_message, _deliveryTag, _redelivered); - - // do we have to inform anyone about completion? - if (_completeCallback) _completeCallback(_deliveryTag, _redelivered); - - // for the next iteration we want a new message - _message.reset(); - - // do we still have a valid channel - if (!monitor.valid()) return; - - // we are now done executing, so the channel can forget the current receiving object - _channel->install(nullptr); -} - /** * End namespace */