diff --git a/amqpcpp.h b/amqpcpp.h index 328cf08..b63e7fe 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -60,6 +60,7 @@ #include #include #include +#include #include #include #include diff --git a/include/callbacks.h b/include/callbacks.h index a974ef0..c611f89 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -20,6 +20,7 @@ namespace AMQP { using SuccessCallback = std::function; using ErrorCallback = std::function; using FinalizeCallback = std::function; +using EmptyCallback = std::function; using MessageCallback = std::function; using QueueCallback = std::function; using DeleteCallback = std::function; diff --git a/include/channel.h b/include/channel.h index 3cfc989..e4da8bb 100644 --- a/include/channel.h +++ b/include/channel.h @@ -391,9 +391,9 @@ public: * * The onSuccess() callback that you can install should have the following signature: * - * void myCallback(AMQP::Channel *channel, const std::string& tag); + * void myCallback(const std::string& tag); * - * For example: channel.cancel("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * For example: channel.cancel("myqueue").onSuccess([](const std::string& tag) { * * std::cout << "Stopped consuming under tag " << tag << std::endl; * @@ -401,6 +401,40 @@ public: */ DeferredCancel &cancel(const std::string &tag) { return _implementation.cancel(tag); } + /** + * Retrieve a single message from RabbitMQ + * + * When you call this method, you can get one single message from the queue (or none + * at all if the queue is empty). The deferred object that is returned, should be used + * to install a onEmpty() and onSuccess() callback function that will be called + * when the message is consumed and/or when the message could not be consumed. + * + * The following flags are supported: + * + * - noack if set, consumed messages do not have to be acked, this happens automatically + * + * @param queue name of the queue to consume from + * @param flags optional flags + * + * The object returns a deferred handler. Callbacks can be installed + * using onSuccess(), onEmpty(), onError() and onFinalize() methods. + * + * The onSuccess() callback has the following signature: + * + * void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered); + * + * For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) { + * + * std::cout << "Message fetched" << std::endl; + * + * }).onEmpty([]() { + * + * std::cout << "Queue is empty" << std::endl; + * + * }); + */ + DeferredGet &get(const std::string &queue, int flags = 0) { return _implementation.get(queue, flags); } + /** * Acknoldge a received message * diff --git a/include/channelimpl.h b/include/channelimpl.h index 0812a9c..10a31dd 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -381,9 +381,9 @@ public: * * The onSuccess() callback that you can install should have the following signature: * - * void myCallback(AMQP::Channel *channel, const std::string& tag); + * void myCallback(const std::string& tag); * - * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * For example: channel.declareQueue("myqueue").onSuccess([](const std::string& tag) { * * std::cout << "Started consuming under tag " << tag << std::endl; * @@ -391,6 +391,40 @@ public: */ DeferredCancel &cancel(const std::string &tag); + /** + * Retrieve a single message from RabbitMQ + * + * When you call this method, you can get one single message from the queue (or none + * at all if the queue is empty). The deferred object that is returned, should be used + * to install a onEmpty() and onSuccess() callback function that will be called + * when the message is consumed and/or when the message could not be consumed. + * + * The following flags are supported: + * + * - noack if set, consumed messages do not have to be acked, this happens automatically + * + * @param queue name of the queue to consume from + * @param flags optional flags + * + * The object returns a deferred handler. Callbacks can be installed + * using onSuccess(), onEmpty(), onError() and onFinalize() methods. + * + * The onSuccess() callback has the following signature: + * + * void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered); + * + * For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) { + * + * std::cout << "Message fetched" << std::endl; + * + * }).onEmpty([]() { + * + * std::cout << "Queue is empty" << std::endl; + * + * }); + */ + DeferredGet &get(const std::string &queue, int flags = 0); + /** * Acknowledge a message * @param deliveryTag the delivery tag @@ -596,6 +630,7 @@ public: * @return ConsumedMessage */ ConsumedMessage *message(const BasicDeliverFrame &frame); + ConsumedMessage *message(const BasicGetOKFrame &frame); /** * Retrieve the current incoming message diff --git a/include/classes.h b/include/classes.h index b4a5500..ec9e38e 100644 --- a/include/classes.h +++ b/include/classes.h @@ -16,6 +16,7 @@ namespace AMQP { */ class Array; class BasicDeliverFrame; +class BasicGetOKFrame; class BasicHeaderFrame; class BasicReturnFrame; class BodyFrame; diff --git a/include/deferred.h b/include/deferred.h index 01a3fe8..3d950cf 100644 --- a/include/deferred.h +++ b/include/deferred.h @@ -67,7 +67,7 @@ protected: * Indicate success * @return Deferred Next deferred result */ - Deferred *reportSuccess() const + virtual Deferred *reportSuccess() const { // execute callbacks if registered if (_successCallback) _successCallback(); @@ -111,7 +111,7 @@ protected: // this is the same as a regular success message return reportSuccess(); } - + /** * Indicate failure * @param error Description of the error that occured diff --git a/include/deferredget.h b/include/deferredget.h new file mode 100644 index 0000000..6642c35 --- /dev/null +++ b/include/deferredget.h @@ -0,0 +1,130 @@ +/** + * DeferredGet.h + * + * @author Emiel Bruijntjes + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class DeferredGet : public Deferred +{ +private: + /** + * Pointer to the channel + * @var ChannelImpl + */ + ChannelImpl *_channel; + + /** + * Callback for incoming messages + * @var MessageCallback + */ + MessageCallback _messageCallback; + + /** + * Callback in case the queue is empty + * @var EmptyCallback + */ + EmptyCallback _emptyCallback; + + /** + * Report success when a message is indeed expected + * @param count number of messages in the queue + * @return Deferred + */ + virtual Deferred *reportSuccess(uint32_t messagecount) const override; + + /** + * Report success when queue was empty + * @return Deferred + */ + virtual Deferred *reportSuccess() const override; + + /** + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + friend class ConsumedMessage; + + +protected: + /** + * Protected constructor that can only be called + * from within the channel implementation + * + * @param channel the channel implementation + * @param failed are we already failed? + */ + DeferredGet(ChannelImpl *channel, bool failed = false) : + Deferred(failed), _channel(channel) {} + +public: + /** + * Register a function to be called when a message arrives + * This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it + * @param callback + */ + DeferredGet &onSuccess(const MessageCallback &callback) + { + // store the callback + _messageCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function to be called when a message arrives + * This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it + * @param callback the callback to execute + */ + DeferredGet &onReceived(const MessageCallback &callback) + { + // store callback + _messageCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function to be called when a message arrives + * This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it + * @param callback the callback to execute + */ + DeferredGet &onMessage(const MessageCallback &callback) + { + // store callback + _messageCallback = callback; + + // allow chaining + return *this; + } + + /** + * Register a function to be called if no message could be fetched + * @param callback the callback to execute + */ + DeferredGet &onEmpty(const EmptyCallback &callback) + { + // store callback + _emptyCallback = callback; + + // allow chaining + return *this; + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/basicgetemptyframe.h b/src/basicgetemptyframe.h index d74af57..6ba5ace 100644 --- a/src/basicgetemptyframe.h +++ b/src/basicgetemptyframe.h @@ -69,6 +69,27 @@ public: { return 72; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report + if (channel->reportSuccess()) channel->synchronized(); + + // done + return true; + } + }; /** diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index 5abc6f7..6945adb 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -156,6 +156,33 @@ public: { return _redelivered.get(0); } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // report (if this function returns false, it means that the channel + // object no longer is valid) + if (!channel->reportSuccess(_messageCount)) return true; + + // construct the message + channel->message(*this); + + // we're synchronized + channel->synchronized(); + + // done + return true; + } }; /** diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 5102090..9d30ab9 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -7,6 +7,7 @@ */ #include "includes.h" #include "basicdeliverframe.h" +#include "basicgetokframe.h" #include "basicreturnframe.h" #include "messageimpl.h" #include "consumedmessage.h" @@ -37,6 +38,8 @@ #include "basicnackframe.h" #include "basicrecoverframe.h" #include "basicrejectframe.h" +#include "basicgetframe.h" + /** * Set up namespace @@ -516,9 +519,9 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri * * The onSuccess() callback that you can install should have the following signature: * - * void myCallback(AMQP::Channel *channel, const std::string& tag); + * void myCallback(const std::string& tag); * - * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * For example: channel.declareQueue("myqueue").onSuccess([](const std::string& tag) { * * std::cout << "Started consuming under tag " << tag << std::endl; * @@ -539,6 +542,53 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag) return *deferred; } +/** + * Retrieve a single message from RabbitMQ + * + * When you call this method, you can get one single message from the queue (or none + * at all if the queue is empty). The deferred object that is returned, should be used + * to install a onEmpty() and onSuccess() callback function that will be called + * when the message is consumed and/or when the message could not be consumed. + * + * The following flags are supported: + * + * - noack if set, consumed messages do not have to be acked, this happens automatically + * + * @param queue name of the queue to consume from + * @param flags optional flags + * + * The object returns a deferred handler. Callbacks can be installed + * using onSuccess(), onEmpty(), onError() and onFinalize() methods. + * + * The onSuccess() callback has the following signature: + * + * void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered); + * + * For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) { + * + * std::cout << "Message fetched" << std::endl; + * + * }).onEmpty([]() { + * + * std::cout << "Queue is empty" << std::endl; + * + * }); + */ +DeferredGet &ChannelImpl::get(const std::string &queue, int flags) +{ + // the get frame to send + BasicGetFrame frame(_id, queue, flags & noack); + + // send the frame, and create deferred object + auto *deferred = new DeferredGet(this, send(frame)); + + // push to list + push(deferred); + + // done + return *deferred; +} + /** * Acknowledge a message * @param deliveryTag the delivery tag @@ -674,7 +724,7 @@ void ChannelImpl::reportMessage() } /** - * Create an incoming message + * Create an incoming message from a consume call * @param frame * @return ConsumedMessage */ @@ -687,6 +737,20 @@ ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame) return _message = new ConsumedMessage(frame); } +/** + * Create an incoming message from a get call + * @param frame + * @return ConsumedMessage + */ +ConsumedMessage *ChannelImpl::message(const BasicGetOKFrame &frame) +{ + // destruct if message is already set + if (_message) delete _message; + + // construct message + return _message = new ConsumedMessage(frame); +} + /** * End of namespace */ diff --git a/src/consumedmessage.h b/src/consumedmessage.h index 447bdc6..09c9c0e 100644 --- a/src/consumedmessage.h +++ b/src/consumedmessage.h @@ -44,6 +44,16 @@ public: _consumerTag(frame.consumerTag()), _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered()) {} + /** + * Constructor + * @param frame + */ + ConsumedMessage(const BasicGetOKFrame &frame) : + MessageImpl(frame.exchange(), frame.routingKey()), + _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered()) + {} + + /** * Destructor */ diff --git a/src/deferredget.cpp b/src/deferredget.cpp new file mode 100644 index 0000000..f412ba4 --- /dev/null +++ b/src/deferredget.cpp @@ -0,0 +1,64 @@ +/** + * DeferredGet.cpp + * + * Implementation of the DeferredGet call + * + * @author Emiel Bruijntjes + * @copyright 2014 Copernica BV + */ + +/** + * Dependencies + */ +#include "includes.h" + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Report success, a get message succeeded and the message is expected soon + * @param messageCount Message count + * @return Deferred + */ +Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const +{ + // make copies of the callbacks + auto messageCallback = _messageCallback; + auto finalizeCallback = _finalizeCallback; + + // we now know the name, so we can install the message callback on the channel + _channel->install("", [messageCallback, finalizeCallback](const Message &message, uint64_t deliveryTag, bool redelivered) { + + // call the callbacks + if (messageCallback) messageCallback(message, deliveryTag, redelivered); + + // call the finalize callback + if (finalizeCallback) finalizeCallback(); + }); + + // return next object + return _next; +} + +/** + * Report success, although no message could be get + */ +Deferred *DeferredGet::reportSuccess() const +{ + // check if a callback was set + if (_emptyCallback) _emptyCallback(); + + // call finalize callback + if (_finalizeCallback) _finalizeCallback(); + + // return next object + return _next; +} + +/** + * End of namespace + */ +} +