added deferredreceiver files (forgotter in prev commit)

This commit is contained in:
Emiel Bruijntjes 2018-02-27 05:09:03 +01:00
parent e0feb17ecc
commit 3ccc6af475
2 changed files with 307 additions and 0 deletions

View File

@ -0,0 +1,166 @@
/**
* DeferredReceiver.h
*
* Base class for the deferred consumer, the deferred get and the
* deferred publisher (that may receive returned messages)
*
* @copyright 2016 - 2018 Copernica B.V.
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "deferred.h"
#include "stack_ptr.h"
#include "message.h"
/**
* Start namespace
*/
namespace AMQP {
/**
* Forward declarations
*/
class BasicDeliverFrame;
class BasicGetOKFrame;
class BasicHeaderFrame;
class BodyFrame;
/**
* Base class for deferred consumers
*/
class DeferredReceiver :
public Deferred,
public std::enable_shared_from_this<DeferredReceiver>
{
private:
/**
* Size of the body of the current message
* @var uint64_t
*/
uint64_t _bodySize = 0;
/**
* Process a delivery frame
*
* @param frame The frame to process
*/
void process(BasicDeliverFrame &frame);
/**
* Process a delivery frame from a get request
*
* @param frame The frame to process
*/
void process(BasicGetOKFrame &frame);
/**
* Process the message headers
*
* @param frame The frame to process
*/
void process(BasicHeaderFrame &frame);
/**
* Process the message data
*
* @param frame The frame to process
*/
void process(BodyFrame &frame);
/**
* Indicate that a message was done
*/
void complete();
/**
* Announce that a message has been received
* @param message The message to announce
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0;
/**
* Frames may be processed
*/
friend class ChannelImpl;
friend class BasicDeliverFrame;
friend class BasicGetOKFrame;
friend class BasicHeaderFrame;
friend class BodyFrame;
protected:
/**
* The delivery tag for the current message
* @var uint64_t
*/
uint64_t _deliveryTag = 0;
/**
* Is this a redelivered message
* @var bool
*/
bool _redelivered = false;
/**
* The channel to which the consumer is linked
* @var ChannelImpl
*/
ChannelImpl *_channel;
/**
* Callback for new message
* @var BeginCallback
*/
BeginCallback _beginCallback;
/**
* Callback for incoming headers
* @var HeaderCallback
*/
HeaderCallback _headerCallback;
/**
* Callback for when a chunk of data comes in
* @var DataCallback
*/
DataCallback _dataCallback;
/**
* Callback for incoming messages
* @var MessageCallback
*/
MessageCallback _messageCallback;
/**
* Callback for when a message was complete finished
* @var CompleteCallback
*/
CompleteCallback _completeCallback;
/**
* The message that we are currently receiving
* @var stack_ptr<Message>
*/
stack_ptr<Message> _message;
/**
* Constructor
*
* @param failed Have we already failed?
* @param channel The channel we are consuming on
*/
DeferredReceiver(bool failed, ChannelImpl *channel) : Deferred(failed), _channel(channel) {}
public:
};
/**
* End namespace
*/
}

141
src/deferredreceiver.cpp Normal file
View File

@ -0,0 +1,141 @@
/**
* DeferredReceiver.cpp
*
* Implementation file for the DeferredReceiver class
*
* @copyright 2016 - 2018 Copernica B.V.
*/
/**
* Dependencies
*/
#include "amqpcpp/deferredreceiver.h"
#include "basicdeliverframe.h"
#include "basicgetokframe.h"
#include "basicheaderframe.h"
#include "bodyframe.h"
/**
* Start namespace
*/
namespace AMQP {
/**
* Process a delivery frame
*
* @param frame The frame to process
*/
void DeferredReceiver::process(BasicDeliverFrame &frame)
{
// retrieve the delivery tag and whether we were redelivered
_deliveryTag = frame.deliveryTag();
_redelivered = frame.redelivered();
// anybody interested in the new message?
if (_beginCallback) _beginCallback();
// do we have anybody interested in messages?
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
}
/**
* Process a delivery frame from a get request
*
* @param frame The frame to process
*/
void DeferredReceiver::process(BasicGetOKFrame &frame)
{
// retrieve the delivery tag and whether we were redelivered
_deliveryTag = frame.deliveryTag();
_redelivered = frame.redelivered();
// anybody interested in the new message?
if (_beginCallback) _beginCallback();
// do we have anybody interested in messages?
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
}
/**
* Process the message headers
*
* @param frame The frame to process
*/
void DeferredReceiver::process(BasicHeaderFrame &frame)
{
// store the body size
_bodySize = frame.bodySize();
// do we have a message?
if (_message)
{
// store the body size and metadata
_message->setBodySize(_bodySize);
_message->set(frame.metaData());
}
// anybody interested in the headers?
if (_headerCallback) _headerCallback(frame.metaData());
// no body data expected? then we are now complete
if (!_bodySize) complete();
}
/**
* Process the message data
*
* @param frame The frame to process
*/
void DeferredReceiver::process(BodyFrame &frame)
{
// make sure we stay in scope
auto self = shared_from_this();
// update the bytes still to receive
_bodySize -= frame.payloadSize();
// anybody interested in the data?
if (_dataCallback) _dataCallback(frame.payload(), frame.payloadSize());
// do we have a message? then append the data
if (_message) _message->append(frame.payload(), frame.payloadSize());
// if all bytes were received we are now complete
if (!_bodySize) complete();
}
/**
* Indicate that a message was done
*/
void DeferredReceiver::complete()
{
// make sure we stay in scope
auto self = shared_from_this();
// also monitor the channel
Monitor monitor{ _channel };
// do we have a message?
if (_message)
{
// announce the message
announce(*_message, _deliveryTag, _redelivered);
// and destroy it
_message.reset();
}
// do we have to inform anyone about completion?
if (_completeCallback) _completeCallback(_deliveryTag, _redelivered);
// do we still have a valid channel
if (!monitor.valid()) return;
// we are now done executing
_channel->complete();
}
/**
* End namespace
*/
}