diff --git a/include/channel.h b/include/channel.h index 1aecbda..bb8ecc9 100644 --- a/include/channel.h +++ b/include/channel.h @@ -341,6 +341,26 @@ public: */ bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } + /** + * Acknoldge a received message + * + * When a message is received in the ChannelHandler::onReceived() method, + * you must acknoledge it so that RabbitMQ removes it from the queue (unless + * you are consuming with the noack option). This method can be used for + * this acknoledging. + * + * The following flags are supported: + * + * - multiple acknoledge multiple messages: all messages that were earlier delivered are acknowledged too + * + * @param deliveryTag The delivery tag + * @param message The message + * @param flags + * @return bool + */ + bool ack(uint64_t deliveryTag, int flags=0) { return _implementation.ack(deliveryTag, flags); } + bool ack(const Message &message, int flags=0) { return _implementation.ack(message.deliveryTag(), flags); } + /** * Close the current channel * @return bool diff --git a/include/channelhandler.h b/include/channelhandler.h index 97c4921..d95cffa 100644 --- a/include/channelhandler.h +++ b/include/channelhandler.h @@ -159,14 +159,7 @@ public: * @param tag the consumer tag */ virtual void onConsumerStarted(Channel *channel, const std::string &tag) {} - - /** - * Method that is called when a message has been consumed - * @param channel the channel on which the consumer was started - * @param message the consumed message - */ - virtual void onConsumed(Channel *channel, const Message &message) {} - + /** * Method that is called when a consumer was stopped * This is the result of a call to Channel::cancel() @@ -174,6 +167,15 @@ public: * @param tag the consumer tag */ virtual void onConsumerStopped(Channel *channel, const std::string &tag) {} + + /** + * Method that is called when a message has been received on a channel + * @param channel the channel on which the consumer was started + * @param message the consumed message + */ + virtual void onReceived(Channel *channel, const Message &message) {} + + }; diff --git a/include/channelimpl.h b/include/channelimpl.h index dd1668d..1bdc169 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -256,8 +256,17 @@ public: * Cancel a running consumer * @param tag the consumer tag * @param flags optional flags + * @return bool */ bool cancel(const std::string &tag, int flags); + + /** + * Acknoledge a message + * @param deliveryTag the delivery tag + * @param flags optional flags + * @return bool + */ + bool ack(uint64_t deliveryTag, int flags); /** * Close the current channel @@ -437,9 +446,9 @@ public: } /** - * Report the consumed message + * Report that a message was received */ - void reportDelivery(); + void reportReceived(); /** * Create an incoming message diff --git a/include/message.h b/include/message.h index c55b42b..4349bc0 100644 --- a/include/message.h +++ b/include/message.h @@ -75,7 +75,7 @@ public: * The consumer tag over which it was delivered * @return string */ - std::string &consumerTag() + const std::string &consumerTag() const { return _consumerTag; } @@ -84,7 +84,7 @@ public: * Unique delivery tag to identify and ack the mesage * @return uint64_t */ - uint64_t deliveryTag() + uint64_t deliveryTag() const { return _deliveryTag; } @@ -93,7 +93,7 @@ public: * Is this a redelivered message / has it been delivered before? * @var bool */ - bool redelivered() + bool redelivered() const { return _redelivered; } @@ -102,7 +102,7 @@ public: * The exchange to which it was originally published * @var string */ - std::string &exchange() + const std::string &exchange() const { return _exchange; } @@ -111,7 +111,7 @@ public: * The routing key that was originally used * @var string */ - std::string &routingKey() + const std::string &routingKey() const { return _routingKey; } diff --git a/src/basiccancelokframe.h b/src/basiccancelokframe.h index a4edff6..cbb99da 100644 --- a/src/basiccancelokframe.h +++ b/src/basiccancelokframe.h @@ -87,7 +87,7 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) + virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); diff --git a/src/basicconsumeokframe.h b/src/basicconsumeokframe.h index 3a26e87..cf7298a 100644 --- a/src/basicconsumeokframe.h +++ b/src/basicconsumeokframe.h @@ -85,7 +85,7 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) + virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); diff --git a/src/basicheaderframe.h b/src/basicheaderframe.h index 942f39d..29059ed 100644 --- a/src/basicheaderframe.h +++ b/src/basicheaderframe.h @@ -107,7 +107,7 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) + virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); diff --git a/src/basicqosokframe.h b/src/basicqosokframe.h index 3aaac49..5d8bca1 100644 --- a/src/basicqosokframe.h +++ b/src/basicqosokframe.h @@ -58,7 +58,7 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) + virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); diff --git a/src/bodyframe.h b/src/bodyframe.h index f8318c0..a75cdc0 100644 --- a/src/bodyframe.h +++ b/src/bodyframe.h @@ -97,7 +97,7 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) + virtual bool process(ConnectionImpl *connection) override { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); @@ -113,7 +113,7 @@ public: if (!message->append(_payload, _size)) return true; // the message is complete - channel->reportDelivery(); + channel->reportReceived(); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index a3fdebb..2e3279b 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -30,6 +30,7 @@ #include "basicqosframe.h" #include "basicconsumeframe.h" #include "basiccancelframe.h" +#include "basicackframe.h" /** * Set up namespace @@ -479,6 +480,21 @@ bool ChannelImpl::cancel(const std::string &tag, int flags) return true; } +/** + * Acknoledge a message + * @param deliveryTag the delivery tag + * @param flags optional flags + * @return bool + */ +bool ChannelImpl::ack(uint64_t deliveryTag, int flags) +{ + // send an ack frame + send(BasicAckFrame(_id, deliveryTag, flags & multiple)); + + // done + return true; +} + /** * Send a frame over the channel * @param frame frame to send @@ -491,11 +507,11 @@ size_t ChannelImpl::send(const Frame &frame) } /** - * Report the consumed message + * Report the received message */ -void ChannelImpl::reportDelivery() +void ChannelImpl::reportReceived() { - if (_handler) _handler->onConsumed(_parent, *_message); + if (_handler) _handler->onReceived(_parent, *_message); } /** diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index a98cdfa..715f10f 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -421,10 +421,13 @@ void MyConnection::onConsumerStarted(AMQP::Channel *channel, const std::string & * @param channel the channel on which the consumer was started * @param message the consumed message */ -void MyConnection::onConsumed(AMQP::Channel *channel, const AMQP::Message &message) +void MyConnection::onReceived(AMQP::Channel *channel, const AMQP::Message &message) { // show std::cout << "AMQP consumed: " << message.message() << std::endl; + + // ack the message + channel->ack(message); } /** diff --git a/tests/myconnection.h b/tests/myconnection.h index f84a49b..a1f6b11 100644 --- a/tests/myconnection.h +++ b/tests/myconnection.h @@ -231,7 +231,7 @@ private: * @param channel the channel on which the consumer was started * @param message the consumed message */ - virtual void onConsumed(AMQP::Channel *channel, const AMQP::Message &message) override; + virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message) override; /** * Method that is called when a consumer was stopped