refactored code to make room for a deferredpublisher class (which will also use the deferredreceiver base class)

This commit is contained in:
Emiel Bruijntjes 2018-03-01 18:07:18 +01:00
parent 520fe40201
commit ef76876d67
8 changed files with 171 additions and 57 deletions

View File

@ -14,7 +14,7 @@
/** /**
* Dependencies * Dependencies
*/ */
#include "deferredreceiver.h" #include "deferredextreceiver.h"
/** /**
* Set up namespace * Set up namespace
@ -29,7 +29,7 @@ class BasicDeliverFrame;
/** /**
* We extend from the default deferred and add extra functionality * We extend from the default deferred and add extra functionality
*/ */
class DeferredConsumer : public DeferredReceiver, public std::enable_shared_from_this<DeferredConsumer> class DeferredConsumer : public DeferredExtReceiver, public std::enable_shared_from_this<DeferredConsumer>
{ {
private: private:
/** /**
@ -79,7 +79,7 @@ public:
* @param failed are we already failed? * @param failed are we already failed?
*/ */
DeferredConsumer(ChannelImpl *channel, bool failed = false) : DeferredConsumer(ChannelImpl *channel, bool failed = false) :
DeferredReceiver(failed, channel) {} DeferredExtReceiver(failed, channel) {}
public: public:
/** /**

View File

@ -0,0 +1,85 @@
/**
* DeferredExtReceiver.h
*
* Extended receiver that _wants_ to receive message (because it is
* consuming or get'ting messages. This is the base class for both
* the DeferredConsumer as well as the DeferredGet classes, but not
* the base of the DeferredPublisher (which can also receive returned
* messages, but not as a result of an explicit request)
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "deferredreceiver.h"
/**
* Begin of namespace
*/
namespace AMQP {
/**
* Class definition
*/
class DeferredExtReceiver : public DeferredReceiver
{
protected:
/**
* The delivery tag for the current message
* @var uint64_t
*/
uint64_t _deliveryTag = 0;
/**
* Is this a redelivered message
* @var bool
*/
bool _redelivered = false;
/**
* Callback for incoming messages
* @var MessageCallback
*/
MessageCallback _messageCallback;
/**
* Initialize the object to send out a message
* @param exchange the exchange to which the message was published
* @param routingkey the routing key that was used to publish the message
*/
virtual void initialize(const std::string &exchange, const std::string &routingkey) override;
/**
* Indicate that a message was done
*/
virtual void complete() override;
/**
* Constructor
* @param failed Have we already failed?
* @param channel The channel we are consuming on
*/
DeferredExtReceiver(bool failed, ChannelImpl *channel) :
DeferredReceiver(failed, channel) {}
public:
/**
* Destructor
*/
virtual ~DeferredExtReceiver() = default;
};
/**
* End of namespace
*/
}

View File

@ -13,7 +13,7 @@
/** /**
* Dependencies * Dependencies
*/ */
#include "deferredreceiver.h" #include "deferredextreceiver.h"
/** /**
* Set up namespace * Set up namespace
@ -27,7 +27,7 @@ namespace AMQP {
* it grabs a self-pointer when the callback is running, otherwise the onFinalize() * it grabs a self-pointer when the callback is running, otherwise the onFinalize()
* is called before the actual message is consumed. * is called before the actual message is consumed.
*/ */
class DeferredGet : public DeferredReceiver, public std::enable_shared_from_this<DeferredGet> class DeferredGet : public DeferredExtReceiver, public std::enable_shared_from_this<DeferredGet>
{ {
private: private:
/** /**
@ -87,7 +87,7 @@ public:
* @param failed are we already failed? * @param failed are we already failed?
*/ */
DeferredGet(ChannelImpl *channel, bool failed = false) : DeferredGet(ChannelImpl *channel, bool failed = false) :
DeferredReceiver(failed, channel) {} DeferredExtReceiver(failed, channel) {}
public: public:
/** /**

View File

@ -51,18 +51,18 @@ protected:
* @param exchange the exchange to which the message was published * @param exchange the exchange to which the message was published
* @param routingkey the routing key that was used to publish the message * @param routingkey the routing key that was used to publish the message
*/ */
void initialize(const std::string &exchange, const std::string &routingkey); virtual void initialize(const std::string &exchange, const std::string &routingkey);
/** /**
* Get reference to self to prevent that object falls out of scope * Get reference to self to prevent that object falls out of scope
* @return std::shared_ptr * @return std::shared_ptr
*/ */
virtual std::shared_ptr<DeferredReceiver> lock() = 0; virtual std::shared_ptr<DeferredReceiver> lock() = 0;
/** /**
* Indicate that a message was done * Indicate that a message was done
*/ */
virtual void complete(); virtual void complete() = 0;
private: private:
/** /**
@ -88,18 +88,6 @@ private:
friend class BodyFrame; friend class BodyFrame;
protected: protected:
/**
* The delivery tag for the current message
* @var uint64_t
*/
uint64_t _deliveryTag = 0;
/**
* Is this a redelivered message
* @var bool
*/
bool _redelivered = false;
/** /**
* The channel to which the consumer is linked * The channel to which the consumer is linked
* @var ChannelImpl * @var ChannelImpl
@ -124,12 +112,6 @@ protected:
*/ */
DataCallback _dataCallback; DataCallback _dataCallback;
/**
* Callback for incoming messages
* @var MessageCallback
*/
MessageCallback _messageCallback;
/** /**
* Callback for when a message was complete finished * Callback for when a message was complete finished
* @var CompleteCallback * @var CompleteCallback
@ -144,12 +126,17 @@ protected:
/** /**
* Constructor * Constructor
*
* @param failed Have we already failed? * @param failed Have we already failed?
* @param channel The channel we are consuming on * @param channel The channel we are consuming on
*/ */
DeferredReceiver(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {} DeferredReceiver(bool failed, ChannelImpl *channel) :
Deferred(failed), _channel(channel) {}
public: public:
/**
* Destructor
*/
virtual ~DeferredReceiver() = default;
}; };
/** /**

View File

@ -4,6 +4,11 @@
* @copyright 2014 - 2018 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/**
* Dependencies
*/
#include "basicdeliverframe.h"
/** /**
* Set up namespace * Set up namespace
*/ */

View File

@ -0,0 +1,64 @@
/**
* DeferredExtReceiver.cpp
*
* Implementation file for the DeferredExtReceiver class
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
*/
/**
* Dependencies
*/
#include "amqpcpp/deferredextreceiver.h"
#include "amqpcpp/channelimpl.h"
/**
* Begin of namespace
*/
namespace AMQP {
/**
* Initialize the object to send out a message
* @param exchange the exchange to which the message was published
* @param routingkey the routing key that was used to publish the message
*/
void DeferredExtReceiver::initialize(const std::string &exchange, const std::string &routingkey)
{
// call base
DeferredExtReceiver::initialize(exchange, routingkey);
// do we have anybody interested in messages? in that case we construct the message
if (_messageCallback) _message.construct(exchange, routingkey);
}
/**
* Indicate that a message was done
*/
void DeferredExtReceiver::complete()
{
// also monitor the channel
Monitor monitor(_channel);
// do we have a message?
if (_message) _messageCallback(*_message, _deliveryTag, _redelivered);
// do we have to inform anyone about completion?
if (_completeCallback) _completeCallback(_deliveryTag, _redelivered);
// for the next iteration we want a new message
_message.reset();
// do we still have a valid channel
if (!monitor.valid()) return;
// we are now done executing, so the channel can forget the current receiving object
_channel->install(nullptr);
}
/**
* End of namespace
*/
}

View File

@ -65,7 +65,7 @@ void DeferredGet::complete()
_channel->onSynchronized(); _channel->onSynchronized();
// pass on to normal implementation // pass on to normal implementation
DeferredReceiver::complete(); DeferredExtReceiver::complete();
} }
/** /**

View File

@ -29,9 +29,6 @@ void DeferredReceiver::initialize(const std::string &exchange, const std::string
{ {
// anybody interested in the new message? // anybody interested in the new message?
if (_beginCallback) _beginCallback(exchange, routingkey); if (_beginCallback) _beginCallback(exchange, routingkey);
// do we have anybody interested in messages? in that case we construct the message
if (_messageCallback) _message.construct(exchange, routingkey);
} }
/** /**
@ -85,30 +82,6 @@ void DeferredReceiver::process(BodyFrame &frame)
if (_bodySize == 0) complete(); if (_bodySize == 0) complete();
} }
/**
* Indicate that a message was done
*/
void DeferredReceiver::complete()
{
// also monitor the channel
Monitor monitor(_channel);
// do we have a message?
if (_message) _messageCallback(*_message, _deliveryTag, _redelivered);
// do we have to inform anyone about completion?
if (_completeCallback) _completeCallback(_deliveryTag, _redelivered);
// for the next iteration we want a new message
_message.reset();
// do we still have a valid channel
if (!monitor.valid()) return;
// we are now done executing, so the channel can forget the current receiving object
_channel->install(nullptr);
}
/** /**
* End namespace * End namespace
*/ */