renamed deferredconsumerbase into deferredreceiver, because it is not only a base class for the consumer, but also for other receiving operations: get requests and in the future also for returned messages

This commit is contained in:
Emiel Bruijntjes 2018-02-27 05:08:21 +01:00
parent 95f8fd5448
commit e0feb17ecc
10 changed files with 56 additions and 349 deletions

View File

@ -34,7 +34,7 @@ namespace AMQP {
/** /**
* Forward declarations * Forward declarations
*/ */
class DeferredConsumerBase; class DeferredReceiver;
class BasicDeliverFrame; class BasicDeliverFrame;
class DeferredConsumer; class DeferredConsumer;
class BasicGetOKFrame; class BasicGetOKFrame;
@ -75,9 +75,9 @@ private:
/** /**
* Handlers for all consumers that are active * Handlers for all consumers that are active
* @var std::map<std::string,std::shared_ptr<DeferredConsumerBase> * @var std::map<std::string,std::shared_ptr<DeferredReceiver>
*/ */
std::map<std::string,std::shared_ptr<DeferredConsumerBase>> _consumers; std::map<std::string,std::shared_ptr<DeferredReceiver>> _consumers;
/** /**
* Pointer to the oldest deferred result (the first one that is going * Pointer to the oldest deferred result (the first one that is going
@ -129,10 +129,10 @@ private:
bool _synchronous = false; bool _synchronous = false;
/** /**
* The current consumer receiving a message * The current object that is busy receiving a message
* @var std::shared_ptr<DeferredConsumerBase> * @var std::shared_ptr<DeferredReceiver>
*/ */
std::shared_ptr<DeferredConsumerBase> _consumer; std::shared_ptr<DeferredReceiver> _receiver;
/** /**
* Attach the connection * Attach the connection
@ -664,13 +664,13 @@ public:
* @param consumer The consumer handler * @param consumer The consumer handler
* @param active Is this the new active consumer * @param active Is this the new active consumer
*/ */
void install(std::string consumertag, const std::shared_ptr<DeferredConsumerBase> &consumer, bool active = false) void install(std::string consumertag, const std::shared_ptr<DeferredReceiver> &consumer, bool active = false)
{ {
// install the consumer handler // install the consumer handler
_consumers[consumertag] = consumer; _consumers[consumertag] = consumer;
// should we become the current consumer? // should we become the current consumer?
if (active) _consumer = consumer; if (active) _receiver = consumer;
} }
/** /**
@ -691,11 +691,11 @@ public:
void process(BasicDeliverFrame &frame); void process(BasicDeliverFrame &frame);
/** /**
* Retrieve the current consumer handler * Retrieve the current object that is receiving a message
* *
* @return The handler responsible for the current message * @return The handler responsible for the current message
*/ */
DeferredConsumerBase *consumer(); DeferredReceiver *receiver();
/** /**
* Mark the current consumer as done * Mark the current consumer as done

View File

@ -3,7 +3,7 @@
* *
* Deferred callback for consumers * Deferred callback for consumers
* *
* @copyright 2014 - 2017 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -14,7 +14,7 @@
/** /**
* Dependencies * Dependencies
*/ */
#include "deferredconsumerbase.h" #include "deferredreceiver.h"
/** /**
* Set up namespace * Set up namespace
@ -24,7 +24,7 @@ namespace AMQP {
/** /**
* We extend from the default deferred and add extra functionality * We extend from the default deferred and add extra functionality
*/ */
class DeferredConsumer : public DeferredConsumerBase class DeferredConsumer : public DeferredReceiver
{ {
private: private:
/** /**
@ -68,7 +68,7 @@ public:
* @param failed are we already failed? * @param failed are we already failed?
*/ */
DeferredConsumer(ChannelImpl *channel, bool failed = false) : DeferredConsumer(ChannelImpl *channel, bool failed = false) :
DeferredConsumerBase(failed, channel) {} DeferredReceiver(failed, channel) {}
public: public:
/** /**

View File

@ -1,166 +0,0 @@
/**
* deferredconsumerbase.h
*
* Base class for the deferred consumer and the
* deferred get.
*
* @copyright 2016 - 2017 Copernica B.V.
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "deferred.h"
#include "stack_ptr.h"
#include "message.h"
/**
* Start namespace
*/
namespace AMQP {
/**
* Forward declarations
*/
class BasicDeliverFrame;
class BasicGetOKFrame;
class BasicHeaderFrame;
class BodyFrame;
/**
* Base class for deferred consumers
*/
class DeferredConsumerBase :
public Deferred,
public std::enable_shared_from_this<DeferredConsumerBase>
{
private:
/**
* Size of the body of the current message
* @var uint64_t
*/
uint64_t _bodySize = 0;
/**
* Process a delivery frame
*
* @param frame The frame to process
*/
void process(BasicDeliverFrame &frame);
/**
* Process a delivery frame from a get request
*
* @param frame The frame to process
*/
void process(BasicGetOKFrame &frame);
/**
* Process the message headers
*
* @param frame The frame to process
*/
void process(BasicHeaderFrame &frame);
/**
* Process the message data
*
* @param frame The frame to process
*/
void process(BodyFrame &frame);
/**
* Indicate that a message was done
*/
void complete();
/**
* Announce that a message has been received
* @param message The message to announce
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0;
/**
* Frames may be processed
*/
friend class ChannelImpl;
friend class BasicDeliverFrame;
friend class BasicGetOKFrame;
friend class BasicHeaderFrame;
friend class BodyFrame;
protected:
/**
* The delivery tag for the current message
* @var uint64_t
*/
uint64_t _deliveryTag = 0;
/**
* Is this a redelivered message
* @var bool
*/
bool _redelivered = false;
/**
* The channel to which the consumer is linked
* @var ChannelImpl
*/
ChannelImpl *_channel;
/**
* Callback for new message
* @var BeginCallback
*/
BeginCallback _beginCallback;
/**
* Callback for incoming headers
* @var HeaderCallback
*/
HeaderCallback _headerCallback;
/**
* Callback for when a chunk of data comes in
* @var DataCallback
*/
DataCallback _dataCallback;
/**
* Callback for incoming messages
* @var MessageCallback
*/
MessageCallback _messageCallback;
/**
* Callback for when a message was complete finished
* @var CompleteCallback
*/
CompleteCallback _completeCallback;
/**
* The message that we are currently receiving
* @var stack_ptr<Message>
*/
stack_ptr<Message> _message;
/**
* Constructor
*
* @param failed Have we already failed?
* @param channel The channel we are consuming on
*/
DeferredConsumerBase(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {}
public:
};
/**
* End namespace
*/
}

View File

@ -2,7 +2,7 @@
* DeferredGet.h * DeferredGet.h
* *
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com> * @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2014 - 2017 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -13,7 +13,7 @@
/** /**
* Dependencies * Dependencies
*/ */
#include "deferredconsumerbase.h" #include "deferredreceiver.h"
/** /**
* Set up namespace * Set up namespace
@ -27,7 +27,7 @@ namespace AMQP {
* it grabs a self-pointer when the callback is running, otherwise the onFinalize() * it grabs a self-pointer when the callback is running, otherwise the onFinalize()
* is called before the actual message is consumed. * is called before the actual message is consumed.
*/ */
class DeferredGet : public DeferredConsumerBase class DeferredGet : public DeferredReceiver
{ {
private: private:
/** /**
@ -84,7 +84,7 @@ public:
* @param failed are we already failed? * @param failed are we already failed?
*/ */
DeferredGet(ChannelImpl *channel, bool failed = false) : DeferredGet(ChannelImpl *channel, bool failed = false) :
DeferredConsumerBase(failed, channel) {} DeferredReceiver(failed, channel) {}
public: public:
/** /**

View File

@ -7,7 +7,7 @@
* Message objects can not be constructed by end users, they are only constructed * Message objects can not be constructed by end users, they are only constructed
* by the AMQP library, and passed to user callbacks. * by the AMQP library, and passed to user callbacks.
* *
* @copyright 2014 - 2017 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -31,7 +31,7 @@ namespace AMQP {
/** /**
* Forward declarations * Forward declarations
*/ */
class DeferredConsumerBase; class DeferredReceiver;
/** /**
* Class definition * Class definition
@ -61,7 +61,7 @@ protected:
/** /**
* We are an open book to the consumer handler * We are an open book to the consumer handler
*/ */
friend class DeferredConsumerBase; friend class DeferredReceiver;
/** /**
* Set the body size * Set the body size

View File

@ -1,7 +1,7 @@
/** /**
* Class describing a basic get ok frame * Class describing a basic get ok frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -173,11 +173,14 @@ public:
// report success for the get operation // report success for the get operation
channel->reportSuccess(messageCount(), deliveryTag(), redelivered()); channel->reportSuccess(messageCount(), deliveryTag(), redelivered());
// check if we have a valid consumer // get the current receiver object
if (!channel->consumer()) return false; auto *receiver = channel->receiver();
// check if we have a valid receiver
if (receiver == nullptr) return false;
// pass on to consumer // pass on to consumer
channel->consumer()->process(*this); receiver->process(*this);
// done // done
return true; return true;

View File

@ -1,7 +1,7 @@
/** /**
* Class describing an AMQP basic header frame * Class describing an AMQP basic header frame
* *
* @copyright 2014 - 2017 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -16,7 +16,7 @@
#include "amqpcpp/metadata.h" #include "amqpcpp/metadata.h"
#include "amqpcpp/envelope.h" #include "amqpcpp/envelope.h"
#include "amqpcpp/connectionimpl.h" #include "amqpcpp/connectionimpl.h"
#include "amqpcpp/deferredconsumerbase.h" #include "amqpcpp/deferredreceiver.h"
/** /**
* Set up namespace * Set up namespace
@ -135,11 +135,17 @@ public:
// we need the appropriate channel // we need the appropriate channel
auto channel = connection->channel(this->channel()); auto channel = connection->channel(this->channel());
// we need a channel
if (channel == nullptr) return false;
// do we have an object that is receiving this data?
auto *receiver = channel->receiver();
// check if we have a valid channel and consumer // check if we have a valid channel and consumer
if (!channel || !channel->consumer()) return false; if (receiver == nullptr) return false;
// the channel can process the frame // the channel can process the frame
channel->consumer()->process(*this); receiver->process(*this);
// done // done
return true; return true;

View File

@ -1,7 +1,7 @@
/** /**
* Class describing an AMQP Body Frame * Class describing an AMQP Body Frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -14,7 +14,7 @@
*/ */
#include "extframe.h" #include "extframe.h"
#include "amqpcpp/connectionimpl.h" #include "amqpcpp/connectionimpl.h"
#include "amqpcpp/deferredconsumerbase.h" #include "amqpcpp/deferredreceiver.h"
/** /**
* Set up namespace * Set up namespace
@ -106,11 +106,17 @@ public:
// we need the appropriate channel // we need the appropriate channel
auto channel = connection->channel(this->channel()); auto channel = connection->channel(this->channel());
// check if we have a valid channel and consumer // we must have a channel object
if (!channel || !channel->consumer()) return false; if (channel == nullptr) return false;
// get the object that is receiving the messages
auto *receiver = channel->receiver();
// check if we have a valid receiver
if (receiver == nullptr) return false;
// the consumer may process the frame // the consumer may process the frame
channel->consumer()->process(*this); receiver->process(*this);
// done // done
return true; return true;

View File

@ -828,20 +828,20 @@ void ChannelImpl::process(BasicDeliverFrame &frame)
// we are going to be receiving a message, store // we are going to be receiving a message, store
// the handler for the incoming message // the handler for the incoming message
_consumer = iter->second; _receiver = iter->second;
// let the consumer process the frame // let the consumer process the frame
_consumer->process(frame); _receiver->process(frame);
} }
/** /**
* Retrieve the current consumer handler * Retrieve the current receiver handler
* *
* @return The handler responsible for the current message * @return The handler responsible for the current message
*/ */
DeferredConsumerBase *ChannelImpl::consumer() DeferredReceiver *ChannelImpl::receiver()
{ {
return _consumer.get(); return _receiver.get();
} }
/** /**
@ -849,8 +849,8 @@ DeferredConsumerBase *ChannelImpl::consumer()
*/ */
void ChannelImpl::complete() void ChannelImpl::complete()
{ {
// no more consumer // no more receiver
_consumer.reset(); _receiver.reset();
} }
/** /**

View File

@ -1,142 +0,0 @@
/**
* deferredconsumerbase.cpp
*
* Base class for the deferred consumer and the
* deferred get.
*
* @copyright 2016 - 2017 Copernica B.V.
*/
/**
* Dependencies
*/
#include "amqpcpp/deferredconsumerbase.h"
#include "basicdeliverframe.h"
#include "basicgetokframe.h"
#include "basicheaderframe.h"
#include "bodyframe.h"
/**
* Start namespace
*/
namespace AMQP {
/**
* Process a delivery frame
*
* @param frame The frame to process
*/
void DeferredConsumerBase::process(BasicDeliverFrame &frame)
{
// retrieve the delivery tag and whether we were redelivered
_deliveryTag = frame.deliveryTag();
_redelivered = frame.redelivered();
// anybody interested in the new message?
if (_beginCallback) _beginCallback();
// do we have anybody interested in messages?
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
}
/**
* Process a delivery frame from a get request
*
* @param frame The frame to process
*/
void DeferredConsumerBase::process(BasicGetOKFrame &frame)
{
// retrieve the delivery tag and whether we were redelivered
_deliveryTag = frame.deliveryTag();
_redelivered = frame.redelivered();
// anybody interested in the new message?
if (_beginCallback) _beginCallback();
// do we have anybody interested in messages?
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
}
/**
* Process the message headers
*
* @param frame The frame to process
*/
void DeferredConsumerBase::process(BasicHeaderFrame &frame)
{
// store the body size
_bodySize = frame.bodySize();
// do we have a message?
if (_message)
{
// store the body size and metadata
_message->setBodySize(_bodySize);
_message->set(frame.metaData());
}
// anybody interested in the headers?
if (_headerCallback) _headerCallback(frame.metaData());
// no body data expected? then we are now complete
if (!_bodySize) complete();
}
/**
* Process the message data
*
* @param frame The frame to process
*/
void DeferredConsumerBase::process(BodyFrame &frame)
{
// make sure we stay in scope
auto self = shared_from_this();
// update the bytes still to receive
_bodySize -= frame.payloadSize();
// anybody interested in the data?
if (_dataCallback) _dataCallback(frame.payload(), frame.payloadSize());
// do we have a message? then append the data
if (_message) _message->append(frame.payload(), frame.payloadSize());
// if all bytes were received we are now complete
if (!_bodySize) complete();
}
/**
* Indicate that a message was done
*/
void DeferredConsumerBase::complete()
{
// make sure we stay in scope
auto self = shared_from_this();
// also monitor the channel
Monitor monitor{ _channel };
// do we have a message?
if (_message)
{
// announce the message
announce(*_message, _deliveryTag, _redelivered);
// and destroy it
_message.reset();
}
// do we have to inform anyone about completion?
if (_completeCallback) _completeCallback(_deliveryTag, _redelivered);
// do we still have a valid channel
if (!monitor.valid()) return;
// we are now done executing
_channel->complete();
}
/**
* End namespace
*/
}