added Channel::get() that allows one to retrieve a single message from the channel

This commit is contained in:
Emiel Bruijntjes 2014-07-31 12:58:13 +02:00 committed by Richard Hodges
parent f905c9db49
commit b4270f39bc
12 changed files with 397 additions and 9 deletions

View File

@ -60,6 +60,7 @@
#include <amqpcpp/deferredqueue.h>
#include <amqpcpp/deferreddelete.h>
#include <amqpcpp/deferredcancel.h>
#include <amqpcpp/deferredget.h>
#include <amqpcpp/channelimpl.h>
#include <amqpcpp/channel.h>
#include <amqpcpp/login.h>

View File

@ -20,6 +20,7 @@ namespace AMQP {
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
using EmptyCallback = std::function<void()>;
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;

View File

@ -391,9 +391,9 @@ public:
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
* void myCallback(const std::string& tag);
*
* For example: channel.cancel("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
* For example: channel.cancel("myqueue").onSuccess([](const std::string& tag) {
*
* std::cout << "Stopped consuming under tag " << tag << std::endl;
*
@ -401,6 +401,40 @@ public:
*/
DeferredCancel &cancel(const std::string &tag) { return _implementation.cancel(tag); }
/**
* Retrieve a single message from RabbitMQ
*
* When you call this method, you can get one single message from the queue (or none
* at all if the queue is empty). The deferred object that is returned, should be used
* to install a onEmpty() and onSuccess() callback function that will be called
* when the message is consumed and/or when the message could not be consumed.
*
* The following flags are supported:
*
* - noack if set, consumed messages do not have to be acked, this happens automatically
*
* @param queue name of the queue to consume from
* @param flags optional flags
*
* The object returns a deferred handler. Callbacks can be installed
* using onSuccess(), onEmpty(), onError() and onFinalize() methods.
*
* The onSuccess() callback has the following signature:
*
* void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered);
*
* For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) {
*
* std::cout << "Message fetched" << std::endl;
*
* }).onEmpty([]() {
*
* std::cout << "Queue is empty" << std::endl;
*
* });
*/
DeferredGet &get(const std::string &queue, int flags = 0) { return _implementation.get(queue, flags); }
/**
* Acknoldge a received message
*

View File

@ -381,9 +381,9 @@ public:
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
* void myCallback(const std::string& tag);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
* For example: channel.declareQueue("myqueue").onSuccess([](const std::string& tag) {
*
* std::cout << "Started consuming under tag " << tag << std::endl;
*
@ -391,6 +391,40 @@ public:
*/
DeferredCancel &cancel(const std::string &tag);
/**
* Retrieve a single message from RabbitMQ
*
* When you call this method, you can get one single message from the queue (or none
* at all if the queue is empty). The deferred object that is returned, should be used
* to install a onEmpty() and onSuccess() callback function that will be called
* when the message is consumed and/or when the message could not be consumed.
*
* The following flags are supported:
*
* - noack if set, consumed messages do not have to be acked, this happens automatically
*
* @param queue name of the queue to consume from
* @param flags optional flags
*
* The object returns a deferred handler. Callbacks can be installed
* using onSuccess(), onEmpty(), onError() and onFinalize() methods.
*
* The onSuccess() callback has the following signature:
*
* void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered);
*
* For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) {
*
* std::cout << "Message fetched" << std::endl;
*
* }).onEmpty([]() {
*
* std::cout << "Queue is empty" << std::endl;
*
* });
*/
DeferredGet &get(const std::string &queue, int flags = 0);
/**
* Acknowledge a message
* @param deliveryTag the delivery tag
@ -596,6 +630,7 @@ public:
* @return ConsumedMessage
*/
ConsumedMessage *message(const BasicDeliverFrame &frame);
ConsumedMessage *message(const BasicGetOKFrame &frame);
/**
* Retrieve the current incoming message

View File

@ -16,6 +16,7 @@ namespace AMQP {
*/
class Array;
class BasicDeliverFrame;
class BasicGetOKFrame;
class BasicHeaderFrame;
class BasicReturnFrame;
class BodyFrame;

View File

@ -67,7 +67,7 @@ protected:
* Indicate success
* @return Deferred Next deferred result
*/
Deferred *reportSuccess() const
virtual Deferred *reportSuccess() const
{
// execute callbacks if registered
if (_successCallback) _successCallback();
@ -111,7 +111,7 @@ protected:
// this is the same as a regular success message
return reportSuccess();
}
/**
* Indicate failure
* @param error Description of the error that occured

130
include/deferredget.h Normal file
View File

@ -0,0 +1,130 @@
/**
* DeferredGet.h
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class DeferredGet : public Deferred
{
private:
/**
* Pointer to the channel
* @var ChannelImpl
*/
ChannelImpl *_channel;
/**
* Callback for incoming messages
* @var MessageCallback
*/
MessageCallback _messageCallback;
/**
* Callback in case the queue is empty
* @var EmptyCallback
*/
EmptyCallback _emptyCallback;
/**
* Report success when a message is indeed expected
* @param count number of messages in the queue
* @return Deferred
*/
virtual Deferred *reportSuccess(uint32_t messagecount) const override;
/**
* Report success when queue was empty
* @return Deferred
*/
virtual Deferred *reportSuccess() const override;
/**
* The channel implementation may call our
* private members and construct us
*/
friend class ChannelImpl;
friend class ConsumedMessage;
protected:
/**
* Protected constructor that can only be called
* from within the channel implementation
*
* @param channel the channel implementation
* @param failed are we already failed?
*/
DeferredGet(ChannelImpl *channel, bool failed = false) :
Deferred(failed), _channel(channel) {}
public:
/**
* Register a function to be called when a message arrives
* This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it
* @param callback
*/
DeferredGet &onSuccess(const MessageCallback &callback)
{
// store the callback
_messageCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function to be called when a message arrives
* This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it
* @param callback the callback to execute
*/
DeferredGet &onReceived(const MessageCallback &callback)
{
// store callback
_messageCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function to be called when a message arrives
* This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it
* @param callback the callback to execute
*/
DeferredGet &onMessage(const MessageCallback &callback)
{
// store callback
_messageCallback = callback;
// allow chaining
return *this;
}
/**
* Register a function to be called if no message could be fetched
* @param callback the callback to execute
*/
DeferredGet &onEmpty(const EmptyCallback &callback)
{
// store callback
_emptyCallback = callback;
// allow chaining
return *this;
}
};
/**
* End of namespace
*/
}

View File

@ -69,6 +69,27 @@ public:
{
return 72;
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
if (channel->reportSuccess()) channel->synchronized();
// done
return true;
}
};
/**

View File

@ -156,6 +156,33 @@ public:
{
return _redelivered.get(0);
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report (if this function returns false, it means that the channel
// object no longer is valid)
if (!channel->reportSuccess(_messageCount)) return true;
// construct the message
channel->message(*this);
// we're synchronized
channel->synchronized();
// done
return true;
}
};
/**

View File

@ -7,6 +7,7 @@
*/
#include "includes.h"
#include "basicdeliverframe.h"
#include "basicgetokframe.h"
#include "basicreturnframe.h"
#include "messageimpl.h"
#include "consumedmessage.h"
@ -37,6 +38,8 @@
#include "basicnackframe.h"
#include "basicrecoverframe.h"
#include "basicrejectframe.h"
#include "basicgetframe.h"
/**
* Set up namespace
@ -516,9 +519,9 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string& tag);
* void myCallback(const std::string& tag);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
* For example: channel.declareQueue("myqueue").onSuccess([](const std::string& tag) {
*
* std::cout << "Started consuming under tag " << tag << std::endl;
*
@ -539,6 +542,53 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag)
return *deferred;
}
/**
* Retrieve a single message from RabbitMQ
*
* When you call this method, you can get one single message from the queue (or none
* at all if the queue is empty). The deferred object that is returned, should be used
* to install a onEmpty() and onSuccess() callback function that will be called
* when the message is consumed and/or when the message could not be consumed.
*
* The following flags are supported:
*
* - noack if set, consumed messages do not have to be acked, this happens automatically
*
* @param queue name of the queue to consume from
* @param flags optional flags
*
* The object returns a deferred handler. Callbacks can be installed
* using onSuccess(), onEmpty(), onError() and onFinalize() methods.
*
* The onSuccess() callback has the following signature:
*
* void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered);
*
* For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) {
*
* std::cout << "Message fetched" << std::endl;
*
* }).onEmpty([]() {
*
* std::cout << "Queue is empty" << std::endl;
*
* });
*/
DeferredGet &ChannelImpl::get(const std::string &queue, int flags)
{
// the get frame to send
BasicGetFrame frame(_id, queue, flags & noack);
// send the frame, and create deferred object
auto *deferred = new DeferredGet(this, send(frame));
// push to list
push(deferred);
// done
return *deferred;
}
/**
* Acknowledge a message
* @param deliveryTag the delivery tag
@ -674,7 +724,7 @@ void ChannelImpl::reportMessage()
}
/**
* Create an incoming message
* Create an incoming message from a consume call
* @param frame
* @return ConsumedMessage
*/
@ -687,6 +737,20 @@ ConsumedMessage *ChannelImpl::message(const BasicDeliverFrame &frame)
return _message = new ConsumedMessage(frame);
}
/**
* Create an incoming message from a get call
* @param frame
* @return ConsumedMessage
*/
ConsumedMessage *ChannelImpl::message(const BasicGetOKFrame &frame)
{
// destruct if message is already set
if (_message) delete _message;
// construct message
return _message = new ConsumedMessage(frame);
}
/**
* End of namespace
*/

View File

@ -44,6 +44,16 @@ public:
_consumerTag(frame.consumerTag()), _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered())
{}
/**
* Constructor
* @param frame
*/
ConsumedMessage(const BasicGetOKFrame &frame) :
MessageImpl(frame.exchange(), frame.routingKey()),
_deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered())
{}
/**
* Destructor
*/

64
src/deferredget.cpp Normal file
View File

@ -0,0 +1,64 @@
/**
* DeferredGet.cpp
*
* Implementation of the DeferredGet call
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2014 Copernica BV
*/
/**
* Dependencies
*/
#include "includes.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Report success, a get message succeeded and the message is expected soon
* @param messageCount Message count
* @return Deferred
*/
Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const
{
// make copies of the callbacks
auto messageCallback = _messageCallback;
auto finalizeCallback = _finalizeCallback;
// we now know the name, so we can install the message callback on the channel
_channel->install("", [messageCallback, finalizeCallback](const Message &message, uint64_t deliveryTag, bool redelivered) {
// call the callbacks
if (messageCallback) messageCallback(message, deliveryTag, redelivered);
// call the finalize callback
if (finalizeCallback) finalizeCallback();
});
// return next object
return _next;
}
/**
* Report success, although no message could be get
*/
Deferred *DeferredGet::reportSuccess() const
{
// check if a callback was set
if (_emptyCallback) _emptyCallback();
// call finalize callback
if (_finalizeCallback) _finalizeCallback();
// return next object
return _next;
}
/**
* End of namespace
*/
}