the consumer message callback can now also be installed via the Deferred objects, and it is no longer passed a consumer tag, because it already is obvious what the consumer tag is supposed to be

This commit is contained in:
Emiel Bruijntjes 2014-04-15 11:39:52 +02:00
parent 921f24ae06
commit 745ab512a5
10 changed files with 157 additions and 105 deletions

View File

@ -20,9 +20,10 @@ namespace AMQP {
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
using ConsumeCallback = std::function<void(const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)>;
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using ConsumeCallback = std::function<void(const std::string &consumer)>;
using CancelCallback = std::function<void(const std::string &consumer)>;
/**

View File

@ -14,6 +14,11 @@
*/
namespace AMQP {
/**
* Forward declarations
*/
class ConsumedMessage;
/**
* Class definition
*/
@ -45,11 +50,10 @@ private:
ErrorCallback _errorCallback;
/**
* Callback to execute when a message arrives
*
* @todo do this different??
* Callbacks for all consumers that are active
* @var std::map<std::string,MessageCallback>
*/
std::unique_ptr<DeferredConsumer> _consumer;
std::map<std::string,MessageCallback> _consumers;
/**
* Pointer to the oldest deferred result (the first one that is going
@ -84,9 +88,9 @@ private:
/**
* The message that is now being received
* @var MessageImpl
* @var ConsumedMessage
*/
MessageImpl *_message = nullptr;
ConsumedMessage *_message = nullptr;
/**
* Construct a channel object
@ -491,6 +495,20 @@ public:
// @todo destruct oldest callback
}
/**
* Install a consumer callback
* @param consumertag The consumer tag
* @param callback The callback to be called
*/
void install(const std::string &consumertag, const MessageCallback &callback)
{
// install the callback if it is assigned
if (callback) _consumers[consumertag] = callback;
// otherwise we erase the previously set callback
else _consumers.erase(consumertag);
}
/**
* Report that a message was received
*/
@ -499,34 +517,19 @@ public:
/**
* Create an incoming message
* @param frame
* @return MessageImpl
* @return ConsumedMessage
*/
MessageImpl *message(const BasicDeliverFrame &frame);
ConsumedMessage *message(const BasicDeliverFrame &frame);
/**
* Retrieve the current incoming message
* @return MessageImpl
* @return ConsumedMessage
*/
MessageImpl *message()
ConsumedMessage *message()
{
return _message;
}
/**
* Report that the consumer has started
*
* @param consumerTag the tag under which we are now consuming
*/
void reportConsumerStarted(const std::string& consumerTag)
{
// if we do not have a consumer, something is very wrong
if (!_consumer) reportError("Received basic consume ok frame, but no consumer was found");
// otherwise, we now report the consumer as started
// @todo look at this implementation
//else _consumer->success(consumerTag);
}
/**
* The channel class is its friend, thus can it instantiate this object
*/

View File

@ -17,25 +17,31 @@ namespace AMQP {
class DeferredConsumer : public Deferred
{
private:
/**
* The channel to which the consumer is linked
* @var ChannelImpl
*/
ChannelImpl *_channel;
/**
* Callback to execute when a message arrives
* @var ConsumeCallbacl
* @var ConsumeCallback
*/
ConsumeCallback _consumeCallback;
/**
* Process a message
*
* @param message the message to process
* @param deliveryTag the message delivery tag
* @param consumerTag the tag we are consuming under
* @param is this a redelivered message?
* Callback for incoming messages
* @var MessageCallback
*/
void message(const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) const
{
// do we have a valid callback
if (_consumeCallback) _consumeCallback(message, deliveryTag, consumerTag, redelivered);
}
MessageCallback _messageCallback;
/**
* Report success for frames that report start consumer operations
* @param name Consumer tag that is started
* @return Deferred
*/
virtual Deferred *reportSuccess(const std::string &name) const override;
/**
* The channel implementation may call our
@ -49,23 +55,36 @@ protected:
* Protected constructor that can only be called
* from within the channel implementation
*
* @param boolea are we already failed?
* @param channel the channel implementation
* @param failed are we already failed?
*/
DeferredConsumer(bool failed = false) : Deferred(failed) {}
DeferredConsumer(ChannelImpl *channel, bool failed = false) :
Deferred(failed), _channel(channel) {}
public:
/**
* Register the function that is called when the consumer starts
* @param callback
*/
DeferredConsumer &onSuccess(const ConsumeCallback &callback)
{
// store the callback
_consumeCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function to be called when a message arrives
*
* Only one callback can be registered. Successive calls
* to this function will clear callbacks registered before.
*
* @param callback the callback to execute
*/
DeferredConsumer& onReceived(const ConsumeCallback &callback)
DeferredConsumer& onReceived(const MessageCallback &callback)
{
// store callback
_consumeCallback = callback;
_messageCallback = callback;
// allow chaining
return *this;
}

View File

@ -94,7 +94,7 @@ public:
if (!channel) return false;
// report
channel->reportConsumerStarted(consumerTag());
channel->reportSuccess(consumerTag());
// done
return true;

View File

@ -487,21 +487,17 @@ Deferred &ChannelImpl::setQos(uint16_t prefetchCount)
*/
DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments)
{
// create the deferred consumer
_consumer = std::unique_ptr<DeferredConsumer>(new DeferredConsumer(false));
// can we send the basic consume frame?
if (!send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments)))
{
// we set the consumer to be failed immediately
_consumer->_failed = true;
// we should call the error function later
// TODO
}
// return the consumer
return *_consumer;
// the frame to send
BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments);
// send the frame, and create deferred object
auto *deferred = new DeferredConsumer(this, send(frame));
// push to list
push(deferred, "Cannot send basic consume frame");
// done
return *deferred;
}
/**
@ -611,44 +607,39 @@ Deferred &ChannelImpl::send(const Frame &frame, const char *message)
*/
void ChannelImpl::reportMessage()
{
// @todo what does this method do?
// skip if there is no message
if (!_message) return;
// do we even have a consumer?
if (!_consumer)
{
// this should not be possible: receiving a message without doing a consume() call
reportError("Received message without having a consumer");
}
else
{
// after the report the channel may be destructed, monitor that
Monitor monitor(this);
// look for the consumer
auto iter = _consumers.find(_message->consumer());
if (iter == _consumers.end()) return;
// is this a valid callback method
if (!iter->second) return;
// send message to the consumer
_message->report(*_consumer);
// skip if channel was destructed
if (!monitor.valid()) return;
}
// after the report the channel may be destructed, monitor that
Monitor monitor(this);
// call the callback
_message->report(iter->second);
// skip if channel was destructed
if (!monitor.valid()) return;
// no longer need the message
delete _message;
_message = nullptr;
delete _message; _message = nullptr;
}
/**
* Create an incoming message
* @param frame
* @return MessageImpl
* @return ConsumedMessage
*/
MessageImpl *ChannelImpl::message(const BasicDeliverFrame &frame)
ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame)
{
// it should not be possible that a message already exists, but lets check it anyhow
// destruct if message is already set
if (_message) delete _message;
// construct a message
return _message = new ConsumedMessage(frame);
}

View File

@ -50,13 +50,22 @@ public:
virtual ~ConsumedMessage() {}
/**
* Report to the handler
* @param consumer
* Retrieve the consumer tag
* @return std::string
*/
virtual void report(const DeferredConsumer& consumer) override
const std::string &consumer() const
{
return _consumerTag;
}
/**
* Report to the handler
* @param callback
*/
void report(const MessageCallback &callback) const
{
// send ourselves to the consumer
consumer.message(*this, _deliveryTag, _consumerTag, _redelivered);
if (callback) callback(*this, _deliveryTag, _redelivered);
}
};

43
src/deferredconsumer.cpp Normal file
View File

@ -0,0 +1,43 @@
/**
* DeferredConsumer.cpp
*
* Implementation file for the DeferredConsumer class
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
/**
* Namespace
*/
namespace AMQP {
/**
* Report success for frames that report start consumer operations
* @param name Consumer tag that is started
* @return Deferred
*/
Deferred *DeferredConsumer::reportSuccess(const std::string &name) const
{
// we now know the name, so we can install the message callback on the channel
_channel->install(name, _messageCallback);
// @todo when a consumer stops, we should uninstall the message callback
// skip if no special callback was installed
if (!_consumeCallback) return Deferred::reportSuccess();
// call the callback
_consumeCallback(name);
// call finalize callback
if (_finalizeCallback) _finalizeCallback();
// return next object
return _next;
}
/**
* End namespace
*/
}

View File

@ -98,12 +98,6 @@ public:
return _received >= _bodySize;
}
}
/**
* Report to the handler
* @param consumer
*/
virtual void report(const DeferredConsumer& consumer) = 0;
};
/**

View File

@ -66,6 +66,7 @@
#include "transactionrollbackframe.h"
#include "transactionrollbackokframe.h"
#include "messageimpl.h"
#include "consumedmessage.h"
#include "bodyframe.h"
#include "basicheaderframe.h"
#include "framecheck.h"

View File

@ -45,15 +45,6 @@ public:
* Destructor
*/
virtual ~ReturnedMessage() {}
/**
* Report to the handler
* @param consumer
*/
virtual void report(const DeferredConsumer& consumer) override
{
// we no longer support returned messages
}
};
/**