refactored handling of incoming messages from consume and get operations, to prepare for future handling of returned messages and publisher confirms. this also implies a small change to the api: the begin-callback when a message is received now also gets the original exchange and routing key (which could be useful)

This commit is contained in:
Emiel Bruijntjes 2018-03-01 17:34:27 +01:00
parent 3ccc6af475
commit 520fe40201
12 changed files with 137 additions and 182 deletions

View File

@ -38,7 +38,7 @@ using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
using EmptyCallback = std::function<void()>;
using BeginCallback = std::function<void()>;
using BeginCallback = std::function<void(const std::string &exchange, const std::string &routingkey)>;
using HeaderCallback = std::function<void(const MetaData &metaData)>;
using DataCallback = std::function<void(const char *data, size_t size)>;
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;

View File

@ -75,9 +75,9 @@ private:
/**
* Handlers for all consumers that are active
* @var std::map<std::string,std::shared_ptr<DeferredReceiver>
* @var std::map<std::string,std::shared_ptr<DeferredConsumer>
*/
std::map<std::string,std::shared_ptr<DeferredReceiver>> _consumers;
std::map<std::string,std::shared_ptr<DeferredConsumer>> _consumers;
/**
* Pointer to the oldest deferred result (the first one that is going
@ -130,7 +130,7 @@ private:
/**
* The current object that is busy receiving a message
* @var std::shared_ptr<DeferredReceiver>
* @var std::shared_ptr<DeferredReceiver>
*/
std::shared_ptr<DeferredReceiver> _receiver;
@ -659,18 +659,23 @@ public:
/**
* Install a consumer
*
* @param consumertag The consumer tag
* @param consumer The consumer handler
* @param active Is this the new active consumer
* @param consumer The consumer object
*/
void install(std::string consumertag, const std::shared_ptr<DeferredReceiver> &consumer, bool active = false)
void install(const std::string &consumertag, const std::shared_ptr<DeferredConsumer> &consumer)
{
// install the consumer handler
_consumers[consumertag] = consumer;
}
// should we become the current consumer?
if (active) _receiver = consumer;
/**
* Install the current consumer
* @param receiver The receiver object
*/
void install(const std::shared_ptr<DeferredReceiver> &receiver)
{
// store object as current receiver
_receiver = receiver;
}
/**
@ -684,23 +689,17 @@ public:
}
/**
* Process incoming delivery
*
* @param frame The frame to process
* Fetch the receiver for a specific consumer tag
* @param consumertag the consumer tag
* @return the receiver object
*/
void process(BasicDeliverFrame &frame);
DeferredConsumer *consumer(const std::string &consumertag) const;
/**
* Retrieve the current object that is receiving a message
*
* @return The handler responsible for the current message
*/
DeferredReceiver *receiver();
/**
* Mark the current consumer as done
*/
void complete();
DeferredReceiver *receiver() const { return _receiver.get(); }
/**
* The channel class is its friend, thus can it instantiate this object

View File

@ -20,11 +20,16 @@
* Set up namespace
*/
namespace AMQP {
/**
* Forward declararions
*/
class BasicDeliverFrame;
/**
* We extend from the default deferred and add extra functionality
*/
class DeferredConsumer : public DeferredReceiver
class DeferredConsumer : public DeferredReceiver, public std::enable_shared_from_this<DeferredConsumer>
{
private:
/**
@ -33,6 +38,13 @@ private:
*/
ConsumeCallback _consumeCallback;
/**
* Process a delivery frame
*
* @param frame The frame to process
*/
void process(BasicDeliverFrame &frame);
/**
* Report success for frames that report start consumer operations
* @param name Consumer tag that is started
@ -41,12 +53,10 @@ private:
virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) override;
/**
* Announce that a message has been received
* @param message The message to announce
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
* Get reference to self to prevent that object falls out of scope
* @return std::shared_ptr
*/
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override;
virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }
/**
* The channel implementation may call our
@ -54,6 +64,7 @@ private:
*/
friend class ChannelImpl;
friend class ConsumedMessage;
friend class BasicDeliverFrame;
public:
/**

View File

@ -27,7 +27,7 @@ namespace AMQP {
* it grabs a self-pointer when the callback is running, otherwise the onFinalize()
* is called before the actual message is consumed.
*/
class DeferredGet : public DeferredReceiver
class DeferredGet : public DeferredReceiver, public std::enable_shared_from_this<DeferredGet>
{
private:
/**
@ -57,12 +57,15 @@ private:
virtual const std::shared_ptr<Deferred> &reportSuccess() const override;
/**
* Announce that a message has been received
* @param message The message to announce
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
* Get reference to self to prevent that object falls out of scope
* @return std::shared_ptr
*/
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const override;
virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }
/**
* Extended implementation of the complete method that is called when a message was fully received
*/
virtual void complete() override;
/**
* The channel implementation may call our

View File

@ -35,9 +35,7 @@ class BodyFrame;
/**
* Base class for deferred consumers
*/
class DeferredReceiver :
public Deferred,
public std::enable_shared_from_this<DeferredReceiver>
class DeferredReceiver : public Deferred
{
private:
/**
@ -46,20 +44,27 @@ private:
*/
uint64_t _bodySize = 0;
protected:
/**
* Process a delivery frame
*
* @param frame The frame to process
* Initialize the object to send out a message
* @param exchange the exchange to which the message was published
* @param routingkey the routing key that was used to publish the message
*/
void process(BasicDeliverFrame &frame);
void initialize(const std::string &exchange, const std::string &routingkey);
/**
* Get reference to self to prevent that object falls out of scope
* @return std::shared_ptr
*/
virtual std::shared_ptr<DeferredReceiver> lock() = 0;
/**
* Process a delivery frame from a get request
*
* @param frame The frame to process
* Indicate that a message was done
*/
void process(BasicGetOKFrame &frame);
virtual void complete();
private:
/**
* Process the message headers
*
@ -74,27 +79,14 @@ private:
*/
void process(BodyFrame &frame);
/**
* Indicate that a message was done
*/
void complete();
/**
* Announce that a message has been received
* @param message The message to announce
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
virtual void announce(const Message &message, uint64_t deliveryTag, bool redelivered) const = 0;
/**
* Frames may be processed
*/
friend class ChannelImpl;
friend class BasicDeliverFrame;
friend class BasicGetOKFrame;
friend class BasicHeaderFrame;
friend class BodyFrame;
protected:
/**
* The delivery tag for the current message

View File

@ -1,7 +1,7 @@
/**
* Class describing a basic deliver frame
*
* @copyright 2014 Copernica BV
* @copyright 2014 - 2018 Copernica BV
*/
/**
@ -16,6 +16,7 @@
#include "amqpcpp/stringfield.h"
#include "amqpcpp/booleanset.h"
#include "amqpcpp/connectionimpl.h"
#include "amqpcpp/deferredconsumer.h"
/**
* Set up namespace
@ -193,8 +194,14 @@ public:
// channel does not exist
if (!channel) return false;
// construct the message
channel->process(*this);
// get the appropriate consumer object
auto consumer = channel->consumer(_consumerTag);
// skip if there was no consumer for this tag
if (consumer == nullptr) return false;
// initialize the object, because we're about to receive a message
consumer->process(*this);
// done
return true;

View File

@ -170,8 +170,8 @@ public:
// channel does not exist
if (!channel) return false;
// report success for the get operation
channel->reportSuccess(messageCount(), deliveryTag(), redelivered());
// report success for the get operation (this will also update the current receiver!)
channel->reportSuccess(messageCount(), _deliveryTag, redelivered());
// get the current receiver object
auto *receiver = channel->receiver();
@ -179,8 +179,8 @@ public:
// check if we have a valid receiver
if (receiver == nullptr) return false;
// pass on to consumer
receiver->process(*this);
// initialize the receiver for the upcoming message
receiver->initialize(_exchange, _routingKey);
// done
return true;

View File

@ -6,7 +6,6 @@
* @copyright 2014 - 2018 Copernica BV
*/
#include "includes.h"
#include "basicdeliverframe.h"
#include "basicgetokframe.h"
#include "basicreturnframe.h"
#include "consumedmessage.h"
@ -816,41 +815,17 @@ void ChannelImpl::reportError(const char *message, bool notifyhandler)
}
/**
* Process incoming delivery
*
* @param frame The frame to process
* Get the current receiver for a given consumer tag
* @param consumertag the consumer frame
* @return DeferredConsumer
*/
void ChannelImpl::process(BasicDeliverFrame &frame)
DeferredConsumer *ChannelImpl::consumer(const std::string &consumertag) const
{
// find the consumer for this frame
auto iter = _consumers.find(frame.consumerTag());
if (iter == _consumers.end()) return;
// we are going to be receiving a message, store
// the handler for the incoming message
_receiver = iter->second;
// let the consumer process the frame
_receiver->process(frame);
}
/**
* Retrieve the current receiver handler
*
* @return The handler responsible for the current message
*/
DeferredReceiver *ChannelImpl::receiver()
{
return _receiver.get();
}
/**
* Mark the current consumer as done
*/
void ChannelImpl::complete()
{
// no more receiver
_receiver.reset();
// look in the map
auto iter = _consumers.find(consumertag);
// return the result
return iter == _consumers.end() ? nullptr : iter->second.get();
}
/**

View File

@ -1,7 +1,7 @@
/**
* Base class for a message implementation
*
* @copyright 2014 - 2017 Copernica BV
* @copyright 2014 - 2018 Copernica BV
*/
/**

View File

@ -3,15 +3,34 @@
*
* Implementation file for the DeferredConsumer class
*
* @copyright 2014 - 2017 Copernica BV
* @copyright 2014 - 2018 Copernica BV
*/
#include "includes.h"
#include "basicdeliverframe.h"
/**
* Namespace
*/
namespace AMQP {
/**
* Process a delivery frame
*
* @param frame The frame to process
*/
void DeferredConsumer::process(BasicDeliverFrame &frame)
{
// this object will handle all future frames with header and body data
_channel->install(shared_from_this());
// retrieve the delivery tag and whether we were redelivered
_deliveryTag = frame.deliveryTag();
_redelivered = frame.redelivered();
// initialize the object for the next message
initialize(frame.exchange(), frame.routingKey());
}
/**
* Report success for frames that report start consumer operations
* @param name Consumer tag that is started
@ -32,18 +51,6 @@ const std::shared_ptr<Deferred> &DeferredConsumer::reportSuccess(const std::stri
return _next;
}
/**
* Announce that a message was received
* @param message The message to announce
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
*/
void DeferredConsumer::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const
{
// simply execute the message callback
_messageCallback(message, deliveryTag, redelivered);
}
/**
* End namespace
*/

View File

@ -4,7 +4,7 @@
* Implementation of the DeferredGet call
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2014 - 2017 Copernica BV
* @copyright 2014 - 2018 Copernica BV
*/
/**
@ -19,20 +19,19 @@ namespace AMQP {
/**
* Report success for a get operation
*
* @param messagecount Number of messages left in the queue
* @param deliveryTag Delivery tag of the message coming in
* @param redelivered Was the message redelivered?
*/
const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered)
{
// install this object as the handler for the upcoming header and body frames
_channel->install(shared_from_this());
// store delivery tag and redelivery status
_deliveryTag = deliveryTag;
_redelivered = redelivered;
// install ourselves in the channel
_channel->install("", shared_from_this(), true);
// report the size (note that this is the size _minus_ the message that is retrieved
// (and for which the callback will be called later), so it could be zero)
if (_sizeCallback) _sizeCallback(messagecount);
@ -58,27 +57,15 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
}
/**
* Announce that a message has been received
* @param message The message to announce
* @param deliveryTag The delivery tag (for ack()ing)
* @param redelivered Is this a redelivered message
* Extended implementation of the complete method that is called when a message was fully received
*/
void DeferredGet::announce(const Message &message, uint64_t deliveryTag, bool redelivered) const
void DeferredGet::complete()
{
// monitor the channel
Monitor monitor{ _channel };
// the channel is now synchronized
// the channel is now synchronized, delayed frames may now be sent
_channel->onSynchronized();
// simply execute the message callback
_messageCallback(std::move(message), deliveryTag, redelivered);
// check if the channel is still valid
if (!monitor.valid()) return;
// stop consuming now
_channel->uninstall({});
// pass on to normal implementation
DeferredReceiver::complete();
}
/**

View File

@ -21,39 +21,17 @@
namespace AMQP {
/**
* Process a delivery frame
*
* @param frame The frame to process
* Initialize the object: we are going to receive a message, next frames will be header and data
* @param exchange
* @param routingkey
*/
void DeferredReceiver::process(BasicDeliverFrame &frame)
void DeferredReceiver::initialize(const std::string &exchange, const std::string &routingkey)
{
// retrieve the delivery tag and whether we were redelivered
_deliveryTag = frame.deliveryTag();
_redelivered = frame.redelivered();
// anybody interested in the new message?
if (_beginCallback) _beginCallback();
if (_beginCallback) _beginCallback(exchange, routingkey);
// do we have anybody interested in messages?
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
}
/**
* Process a delivery frame from a get request
*
* @param frame The frame to process
*/
void DeferredReceiver::process(BasicGetOKFrame &frame)
{
// retrieve the delivery tag and whether we were redelivered
_deliveryTag = frame.deliveryTag();
_redelivered = frame.redelivered();
// anybody interested in the new message?
if (_beginCallback) _beginCallback();
// do we have anybody interested in messages?
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
// do we have anybody interested in messages? in that case we construct the message
if (_messageCallback) _message.construct(exchange, routingkey);
}
/**
@ -63,6 +41,9 @@ void DeferredReceiver::process(BasicGetOKFrame &frame)
*/
void DeferredReceiver::process(BasicHeaderFrame &frame)
{
// make sure we stay in scope
auto self = lock();
// store the body size
_bodySize = frame.bodySize();
@ -78,7 +59,7 @@ void DeferredReceiver::process(BasicHeaderFrame &frame)
if (_headerCallback) _headerCallback(frame.metaData());
// no body data expected? then we are now complete
if (!_bodySize) complete();
if (_bodySize == 0) complete();
}
/**
@ -89,7 +70,7 @@ void DeferredReceiver::process(BasicHeaderFrame &frame)
void DeferredReceiver::process(BodyFrame &frame)
{
// make sure we stay in scope
auto self = shared_from_this();
auto self = lock();
// update the bytes still to receive
_bodySize -= frame.payloadSize();
@ -101,7 +82,7 @@ void DeferredReceiver::process(BodyFrame &frame)
if (_message) _message->append(frame.payload(), frame.payloadSize());
// if all bytes were received we are now complete
if (!_bodySize) complete();
if (_bodySize == 0) complete();
}
/**
@ -109,30 +90,23 @@ void DeferredReceiver::process(BodyFrame &frame)
*/
void DeferredReceiver::complete()
{
// make sure we stay in scope
auto self = shared_from_this();
// also monitor the channel
Monitor monitor{ _channel };
Monitor monitor(_channel);
// do we have a message?
if (_message)
{
// announce the message
announce(*_message, _deliveryTag, _redelivered);
// and destroy it
_message.reset();
}
if (_message) _messageCallback(*_message, _deliveryTag, _redelivered);
// do we have to inform anyone about completion?
if (_completeCallback) _completeCallback(_deliveryTag, _redelivered);
// for the next iteration we want a new message
_message.reset();
// do we still have a valid channel
if (!monitor.valid()) return;
// we are now done executing
_channel->complete();
// we are now done executing, so the channel can forget the current receiving object
_channel->install(nullptr);
}
/**