From 745ab512a5d4116d08fbd936d47a17e853fc9ee6 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Tue, 15 Apr 2014 11:39:52 +0200 Subject: [PATCH] the consumer message callback can now also be installed via the Deferred objects, and it is no longer passed a consumer tag, because it already is obvious what the consumer tag is supposed to be --- include/callbacks.h | 3 +- include/channelimpl.h | 53 +++++++++++++++-------------- include/deferredconsumer.h | 59 +++++++++++++++++++++----------- src/basicconsumeokframe.h | 2 +- src/channelimpl.cpp | 69 +++++++++++++++++--------------------- src/consumedmessage.h | 17 +++++++--- src/deferredconsumer.cpp | 43 ++++++++++++++++++++++++ src/messageimpl.h | 6 ---- src/receivedframe.cpp | 1 + src/returnedmessage.h | 9 ----- 10 files changed, 157 insertions(+), 105 deletions(-) create mode 100644 src/deferredconsumer.cpp diff --git a/include/callbacks.h b/include/callbacks.h index 50239a0..a974ef0 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -20,9 +20,10 @@ namespace AMQP { using SuccessCallback = std::function; using ErrorCallback = std::function; using FinalizeCallback = std::function; -using ConsumeCallback = std::function; +using MessageCallback = std::function; using QueueCallback = std::function; using DeleteCallback = std::function; +using ConsumeCallback = std::function; using CancelCallback = std::function; /** diff --git a/include/channelimpl.h b/include/channelimpl.h index 43a87ba..8666240 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -14,6 +14,11 @@ */ namespace AMQP { +/** + * Forward declarations + */ +class ConsumedMessage; + /** * Class definition */ @@ -45,11 +50,10 @@ private: ErrorCallback _errorCallback; /** - * Callback to execute when a message arrives - * - * @todo do this different?? + * Callbacks for all consumers that are active + * @var std::map */ - std::unique_ptr _consumer; + std::map _consumers; /** * Pointer to the oldest deferred result (the first one that is going @@ -84,9 +88,9 @@ private: /** * The message that is now being received - * @var MessageImpl + * @var ConsumedMessage */ - MessageImpl *_message = nullptr; + ConsumedMessage *_message = nullptr; /** * Construct a channel object @@ -491,6 +495,20 @@ public: // @todo destruct oldest callback } + /** + * Install a consumer callback + * @param consumertag The consumer tag + * @param callback The callback to be called + */ + void install(const std::string &consumertag, const MessageCallback &callback) + { + // install the callback if it is assigned + if (callback) _consumers[consumertag] = callback; + + // otherwise we erase the previously set callback + else _consumers.erase(consumertag); + } + /** * Report that a message was received */ @@ -499,34 +517,19 @@ public: /** * Create an incoming message * @param frame - * @return MessageImpl + * @return ConsumedMessage */ - MessageImpl *message(const BasicDeliverFrame &frame); + ConsumedMessage *message(const BasicDeliverFrame &frame); /** * Retrieve the current incoming message - * @return MessageImpl + * @return ConsumedMessage */ - MessageImpl *message() + ConsumedMessage *message() { return _message; } - /** - * Report that the consumer has started - * - * @param consumerTag the tag under which we are now consuming - */ - void reportConsumerStarted(const std::string& consumerTag) - { - // if we do not have a consumer, something is very wrong - if (!_consumer) reportError("Received basic consume ok frame, but no consumer was found"); - - // otherwise, we now report the consumer as started - // @todo look at this implementation - //else _consumer->success(consumerTag); - } - /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/include/deferredconsumer.h b/include/deferredconsumer.h index c1555ad..60104d1 100644 --- a/include/deferredconsumer.h +++ b/include/deferredconsumer.h @@ -17,25 +17,31 @@ namespace AMQP { class DeferredConsumer : public Deferred { private: + /** + * The channel to which the consumer is linked + * @var ChannelImpl + */ + ChannelImpl *_channel; + /** * Callback to execute when a message arrives - * @var ConsumeCallbacl + * @var ConsumeCallback */ ConsumeCallback _consumeCallback; /** - * Process a message - * - * @param message the message to process - * @param deliveryTag the message delivery tag - * @param consumerTag the tag we are consuming under - * @param is this a redelivered message? + * Callback for incoming messages + * @var MessageCallback */ - void message(const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) const - { - // do we have a valid callback - if (_consumeCallback) _consumeCallback(message, deliveryTag, consumerTag, redelivered); - } + MessageCallback _messageCallback; + + + /** + * Report success for frames that report start consumer operations + * @param name Consumer tag that is started + * @return Deferred + */ + virtual Deferred *reportSuccess(const std::string &name) const override; /** * The channel implementation may call our @@ -49,23 +55,36 @@ protected: * Protected constructor that can only be called * from within the channel implementation * - * @param boolea are we already failed? + * @param channel the channel implementation + * @param failed are we already failed? */ - DeferredConsumer(bool failed = false) : Deferred(failed) {} + DeferredConsumer(ChannelImpl *channel, bool failed = false) : + Deferred(failed), _channel(channel) {} public: + /** + * Register the function that is called when the consumer starts + * @param callback + */ + DeferredConsumer &onSuccess(const ConsumeCallback &callback) + { + // store the callback + _consumeCallback = callback; + + // allow chaining + return *this; + } + /** * Register a function to be called when a message arrives - * - * Only one callback can be registered. Successive calls - * to this function will clear callbacks registered before. - * * @param callback the callback to execute */ - DeferredConsumer& onReceived(const ConsumeCallback &callback) + DeferredConsumer& onReceived(const MessageCallback &callback) { // store callback - _consumeCallback = callback; + _messageCallback = callback; + + // allow chaining return *this; } diff --git a/src/basicconsumeokframe.h b/src/basicconsumeokframe.h index 1ed8ef4..31c44ef 100644 --- a/src/basicconsumeokframe.h +++ b/src/basicconsumeokframe.h @@ -94,7 +94,7 @@ public: if (!channel) return false; // report - channel->reportConsumerStarted(consumerTag()); + channel->reportSuccess(consumerTag()); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 53e7438..a28bf77 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -487,21 +487,17 @@ Deferred &ChannelImpl::setQos(uint16_t prefetchCount) */ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { - // create the deferred consumer - _consumer = std::unique_ptr(new DeferredConsumer(false)); - - // can we send the basic consume frame? - if (!send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments))) - { - // we set the consumer to be failed immediately - _consumer->_failed = true; - - // we should call the error function later - // TODO - } - - // return the consumer - return *_consumer; + // the frame to send + BasicConsumeFrame frame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments); + + // send the frame, and create deferred object + auto *deferred = new DeferredConsumer(this, send(frame)); + + // push to list + push(deferred, "Cannot send basic consume frame"); + + // done + return *deferred; } /** @@ -611,44 +607,39 @@ Deferred &ChannelImpl::send(const Frame &frame, const char *message) */ void ChannelImpl::reportMessage() { - // @todo what does this method do? - // skip if there is no message if (!_message) return; - // do we even have a consumer? - if (!_consumer) - { - // this should not be possible: receiving a message without doing a consume() call - reportError("Received message without having a consumer"); - } - else - { - // after the report the channel may be destructed, monitor that - Monitor monitor(this); + // look for the consumer + auto iter = _consumers.find(_message->consumer()); + if (iter == _consumers.end()) return; + + // is this a valid callback method + if (!iter->second) return; - // send message to the consumer - _message->report(*_consumer); - - // skip if channel was destructed - if (!monitor.valid()) return; - } + // after the report the channel may be destructed, monitor that + Monitor monitor(this); + + // call the callback + _message->report(iter->second); + + // skip if channel was destructed + if (!monitor.valid()) return; // no longer need the message - delete _message; - _message = nullptr; + delete _message; _message = nullptr; } /** * Create an incoming message * @param frame - * @return MessageImpl + * @return ConsumedMessage */ -MessageImpl *ChannelImpl::message(const BasicDeliverFrame &frame) +ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame) { - // it should not be possible that a message already exists, but lets check it anyhow + // destruct if message is already set if (_message) delete _message; - + // construct a message return _message = new ConsumedMessage(frame); } diff --git a/src/consumedmessage.h b/src/consumedmessage.h index 2227763..447bdc6 100644 --- a/src/consumedmessage.h +++ b/src/consumedmessage.h @@ -50,13 +50,22 @@ public: virtual ~ConsumedMessage() {} /** - * Report to the handler - * @param consumer + * Retrieve the consumer tag + * @return std::string */ - virtual void report(const DeferredConsumer& consumer) override + const std::string &consumer() const + { + return _consumerTag; + } + + /** + * Report to the handler + * @param callback + */ + void report(const MessageCallback &callback) const { // send ourselves to the consumer - consumer.message(*this, _deliveryTag, _consumerTag, _redelivered); + if (callback) callback(*this, _deliveryTag, _redelivered); } }; diff --git a/src/deferredconsumer.cpp b/src/deferredconsumer.cpp new file mode 100644 index 0000000..eb8c6e1 --- /dev/null +++ b/src/deferredconsumer.cpp @@ -0,0 +1,43 @@ +/** + * DeferredConsumer.cpp + * + * Implementation file for the DeferredConsumer class + * + * @copyright 2014 Copernica BV + */ +#include "includes.h" + +/** + * Namespace + */ +namespace AMQP { + +/** + * Report success for frames that report start consumer operations + * @param name Consumer tag that is started + * @return Deferred + */ +Deferred *DeferredConsumer::reportSuccess(const std::string &name) const +{ + // we now know the name, so we can install the message callback on the channel + _channel->install(name, _messageCallback); + + // @todo when a consumer stops, we should uninstall the message callback + + // skip if no special callback was installed + if (!_consumeCallback) return Deferred::reportSuccess(); + + // call the callback + _consumeCallback(name); + + // call finalize callback + if (_finalizeCallback) _finalizeCallback(); + + // return next object + return _next; +} + +/** + * End namespace + */ +} diff --git a/src/messageimpl.h b/src/messageimpl.h index 1b087dd..f9f839a 100644 --- a/src/messageimpl.h +++ b/src/messageimpl.h @@ -98,12 +98,6 @@ public: return _received >= _bodySize; } } - - /** - * Report to the handler - * @param consumer - */ - virtual void report(const DeferredConsumer& consumer) = 0; }; /** diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index cfe1c77..87aaba0 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -66,6 +66,7 @@ #include "transactionrollbackframe.h" #include "transactionrollbackokframe.h" #include "messageimpl.h" +#include "consumedmessage.h" #include "bodyframe.h" #include "basicheaderframe.h" #include "framecheck.h" diff --git a/src/returnedmessage.h b/src/returnedmessage.h index 322bb88..776d72d 100644 --- a/src/returnedmessage.h +++ b/src/returnedmessage.h @@ -45,15 +45,6 @@ public: * Destructor */ virtual ~ReturnedMessage() {} - - /** - * Report to the handler - * @param consumer - */ - virtual void report(const DeferredConsumer& consumer) override - { - // we no longer support returned messages - } }; /**