implemented channel.ack() method

This commit is contained in:
Emiel Bruijntjes 2014-01-05 12:21:09 -08:00
parent cf1cbfa551
commit d1ae133f42
12 changed files with 76 additions and 26 deletions

View File

@ -341,6 +341,26 @@ public:
*/ */
bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); }
/**
* Acknoldge a received message
*
* When a message is received in the ChannelHandler::onReceived() method,
* you must acknoledge it so that RabbitMQ removes it from the queue (unless
* you are consuming with the noack option). This method can be used for
* this acknoledging.
*
* The following flags are supported:
*
* - multiple acknoledge multiple messages: all messages that were earlier delivered are acknowledged too
*
* @param deliveryTag The delivery tag
* @param message The message
* @param flags
* @return bool
*/
bool ack(uint64_t deliveryTag, int flags=0) { return _implementation.ack(deliveryTag, flags); }
bool ack(const Message &message, int flags=0) { return _implementation.ack(message.deliveryTag(), flags); }
/** /**
* Close the current channel * Close the current channel
* @return bool * @return bool

View File

@ -160,13 +160,6 @@ public:
*/ */
virtual void onConsumerStarted(Channel *channel, const std::string &tag) {} virtual void onConsumerStarted(Channel *channel, const std::string &tag) {}
/**
* Method that is called when a message has been consumed
* @param channel the channel on which the consumer was started
* @param message the consumed message
*/
virtual void onConsumed(Channel *channel, const Message &message) {}
/** /**
* Method that is called when a consumer was stopped * Method that is called when a consumer was stopped
* This is the result of a call to Channel::cancel() * This is the result of a call to Channel::cancel()
@ -175,6 +168,15 @@ public:
*/ */
virtual void onConsumerStopped(Channel *channel, const std::string &tag) {} virtual void onConsumerStopped(Channel *channel, const std::string &tag) {}
/**
* Method that is called when a message has been received on a channel
* @param channel the channel on which the consumer was started
* @param message the consumed message
*/
virtual void onReceived(Channel *channel, const Message &message) {}
}; };
/** /**

View File

@ -256,9 +256,18 @@ public:
* Cancel a running consumer * Cancel a running consumer
* @param tag the consumer tag * @param tag the consumer tag
* @param flags optional flags * @param flags optional flags
* @return bool
*/ */
bool cancel(const std::string &tag, int flags); bool cancel(const std::string &tag, int flags);
/**
* Acknoledge a message
* @param deliveryTag the delivery tag
* @param flags optional flags
* @return bool
*/
bool ack(uint64_t deliveryTag, int flags);
/** /**
* Close the current channel * Close the current channel
* @return bool * @return bool
@ -437,9 +446,9 @@ public:
} }
/** /**
* Report the consumed message * Report that a message was received
*/ */
void reportDelivery(); void reportReceived();
/** /**
* Create an incoming message * Create an incoming message

View File

@ -75,7 +75,7 @@ public:
* The consumer tag over which it was delivered * The consumer tag over which it was delivered
* @return string * @return string
*/ */
std::string &consumerTag() const std::string &consumerTag() const
{ {
return _consumerTag; return _consumerTag;
} }
@ -84,7 +84,7 @@ public:
* Unique delivery tag to identify and ack the mesage * Unique delivery tag to identify and ack the mesage
* @return uint64_t * @return uint64_t
*/ */
uint64_t deliveryTag() uint64_t deliveryTag() const
{ {
return _deliveryTag; return _deliveryTag;
} }
@ -93,7 +93,7 @@ public:
* Is this a redelivered message / has it been delivered before? * Is this a redelivered message / has it been delivered before?
* @var bool * @var bool
*/ */
bool redelivered() bool redelivered() const
{ {
return _redelivered; return _redelivered;
} }
@ -102,7 +102,7 @@ public:
* The exchange to which it was originally published * The exchange to which it was originally published
* @var string * @var string
*/ */
std::string &exchange() const std::string &exchange() const
{ {
return _exchange; return _exchange;
} }
@ -111,7 +111,7 @@ public:
* The routing key that was originally used * The routing key that was originally used
* @var string * @var string
*/ */
std::string &routingKey() const std::string &routingKey() const
{ {
return _routingKey; return _routingKey;
} }

View File

@ -87,7 +87,7 @@ public:
* @param connection The connection over which it was received * @param connection The connection over which it was received
* @return bool Was it succesfully processed? * @return bool Was it succesfully processed?
*/ */
virtual bool process(ConnectionImpl *connection) virtual bool process(ConnectionImpl *connection) override
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());

View File

@ -85,7 +85,7 @@ public:
* @param connection The connection over which it was received * @param connection The connection over which it was received
* @return bool Was it succesfully processed? * @return bool Was it succesfully processed?
*/ */
virtual bool process(ConnectionImpl *connection) virtual bool process(ConnectionImpl *connection) override
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());

View File

@ -107,7 +107,7 @@ public:
* @param connection The connection over which it was received * @param connection The connection over which it was received
* @return bool Was it succesfully processed? * @return bool Was it succesfully processed?
*/ */
virtual bool process(ConnectionImpl *connection) virtual bool process(ConnectionImpl *connection) override
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());

View File

@ -58,7 +58,7 @@ public:
* @param connection The connection over which it was received * @param connection The connection over which it was received
* @return bool Was it succesfully processed? * @return bool Was it succesfully processed?
*/ */
virtual bool process(ConnectionImpl *connection) virtual bool process(ConnectionImpl *connection) override
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());

View File

@ -97,7 +97,7 @@ public:
* @param connection The connection over which it was received * @param connection The connection over which it was received
* @return bool Was it succesfully processed? * @return bool Was it succesfully processed?
*/ */
virtual bool process(ConnectionImpl *connection) virtual bool process(ConnectionImpl *connection) override
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
@ -113,7 +113,7 @@ public:
if (!message->append(_payload, _size)) return true; if (!message->append(_payload, _size)) return true;
// the message is complete // the message is complete
channel->reportDelivery(); channel->reportReceived();
// done // done
return true; return true;

View File

@ -30,6 +30,7 @@
#include "basicqosframe.h" #include "basicqosframe.h"
#include "basicconsumeframe.h" #include "basicconsumeframe.h"
#include "basiccancelframe.h" #include "basiccancelframe.h"
#include "basicackframe.h"
/** /**
* Set up namespace * Set up namespace
@ -479,6 +480,21 @@ bool ChannelImpl::cancel(const std::string &tag, int flags)
return true; return true;
} }
/**
* Acknoledge a message
* @param deliveryTag the delivery tag
* @param flags optional flags
* @return bool
*/
bool ChannelImpl::ack(uint64_t deliveryTag, int flags)
{
// send an ack frame
send(BasicAckFrame(_id, deliveryTag, flags & multiple));
// done
return true;
}
/** /**
* Send a frame over the channel * Send a frame over the channel
* @param frame frame to send * @param frame frame to send
@ -491,11 +507,11 @@ size_t ChannelImpl::send(const Frame &frame)
} }
/** /**
* Report the consumed message * Report the received message
*/ */
void ChannelImpl::reportDelivery() void ChannelImpl::reportReceived()
{ {
if (_handler) _handler->onConsumed(_parent, *_message); if (_handler) _handler->onReceived(_parent, *_message);
} }
/** /**

View File

@ -421,10 +421,13 @@ void MyConnection::onConsumerStarted(AMQP::Channel *channel, const std::string &
* @param channel the channel on which the consumer was started * @param channel the channel on which the consumer was started
* @param message the consumed message * @param message the consumed message
*/ */
void MyConnection::onConsumed(AMQP::Channel *channel, const AMQP::Message &message) void MyConnection::onReceived(AMQP::Channel *channel, const AMQP::Message &message)
{ {
// show // show
std::cout << "AMQP consumed: " << message.message() << std::endl; std::cout << "AMQP consumed: " << message.message() << std::endl;
// ack the message
channel->ack(message);
} }
/** /**

View File

@ -231,7 +231,7 @@ private:
* @param channel the channel on which the consumer was started * @param channel the channel on which the consumer was started
* @param message the consumed message * @param message the consumed message
*/ */
virtual void onConsumed(AMQP::Channel *channel, const AMQP::Message &message) override; virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message) override;
/** /**
* Method that is called when a consumer was stopped * Method that is called when a consumer was stopped