diff --git a/README.md b/README.md index bd0219a..7fbca89 100644 --- a/README.md +++ b/README.md @@ -264,10 +264,10 @@ channel.declareQueue("my-queue-name", AMQP::durable + AMQP::autodelete, argument WORK IN PROGRESS ================ -Almost all AMQP features have been implemented. We only need to add reject/nack support, -recover support, and support for returned messages. We also need to add more safety -checks so that strange data from RabbitMQ does not break the library (although -in reality such strange data does not exist). +Almost all AMQP features have been implemented. We only need to add recover support +and support for returned messages. We also need to add more safety checks so that +strange data from RabbitMQ does not break the library (although in reality RabbitMQ +only sends valid data). It would also be nice to have sample implementations for the ConnectionHandler class that can be directly plugged into libev, libevent and libuv event loops. diff --git a/include/channel.h b/include/channel.h index bb8ecc9..3ec5d9d 100644 --- a/include/channel.h +++ b/include/channel.h @@ -351,16 +351,36 @@ public: * * The following flags are supported: * - * - multiple acknoledge multiple messages: all messages that were earlier delivered are acknowledged too + * - multiple acknoledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too * - * @param deliveryTag The delivery tag - * @param message The message - * @param flags + * @param deliveryTag the unique delivery tag of the message + * @param message the message + * @param flags optional flags * @return bool */ bool ack(uint64_t deliveryTag, int flags=0) { return _implementation.ack(deliveryTag, flags); } bool ack(const Message &message, int flags=0) { return _implementation.ack(message.deliveryTag(), flags); } + /** + * Reject or nack a message + * + * When a message was received in the ChannelHandler::onReceived() method, + * and you don't want to acknoledge it, you can also choose to reject it by + * calling this reject method. + * + * The following flags are supported: + * + * - multiple reject multiple messages: all un-acked messages that were earlier delivered are unacked too + * - requeue if set, the message is put back in the queue, otherwise it is dead-lettered/removed + * + * @param deliveryTag the unique delivery tag of the message + * @param message the original message + * @param flags optional flags + * @return bool + */ + bool reject(uint64_t deliveryTag, int flags=0) { return _implementation.reject(deliveryTag, flags); } + bool reject(const Message &message, int flags=0) { return _implementation.reject(message.deliveryTag(), flags); } + /** * Close the current channel * @return bool diff --git a/include/channelimpl.h b/include/channelimpl.h index 1bdc169..80448f9 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -267,6 +267,14 @@ public: * @return bool */ bool ack(uint64_t deliveryTag, int flags); + + /** + * Reject a message + * @param deliveryTag the delivery tag + * @param flags optional flags + * @return bool + */ + bool reject(uint64_t deliveryTag, int flags); /** * Close the current channel diff --git a/src/Makefile b/src/Makefile index b7bfee2..25e88d7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,4 +1,4 @@ -CPP = g++ +CPP = g++-4.8 RM = rm -f CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g LD = g++ diff --git a/src/basicnackframe.h b/src/basicnackframe.h new file mode 100644 index 0000000..ca18e53 --- /dev/null +++ b/src/basicnackframe.h @@ -0,0 +1,117 @@ +/** + * Class describing a basic negative-acknowledgement frame + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class defintion + */ +class BasicNackFrame : public BasicFrame { +private: + /** + * server-assigned and channel specific delivery tag + * @var uint64_t + */ + uint64_t _deliveryTag; + + /** + * The additional bits + * @var BooleanSet + */ + BooleanSet _bits; + +protected: + /** + * Encode a frame on a string buffer + * + * @param buffer buffer to write frame to + * @return pointer to object to allow for chaining + */ + virtual void fill(OutBuffer& buffer) const override + { + // call base + BasicFrame::fill(buffer); + + // add the delivery tag + buffer.add(_deliveryTag); + + // add the booleans + _bits.fill(buffer); + } + +public: + /** + * Construct a basic negative-acknowledgement frame + * + * @param channel Channel identifier + * @param deliveryTag server-assigned and channel specific delivery tag + * @param multiple nack mutiple messages + * @param requeue requeue the message + */ + BasicNackFrame(uint16_t channel, uint64_t deliveryTag, bool multiple = false, bool requeue = false) : + BasicFrame(channel, 9), + _deliveryTag(deliveryTag), + _bits(multiple, requeue) {} + + /** + * Construct based on received frame + * @param frame + */ + BasicNackFrame(ReceivedFrame &frame) : + BasicFrame(frame), + _deliveryTag(frame.nextUint64()), + _bits(frame) {} + + /** + * Destructor + */ + virtual ~BasicNackFrame() {} + + /** + * Return the method ID + * @return uint16_t + */ + virtual uint16_t methodID() const override + { + return 120; + } + + /** + * Return the server-assigned and channel specific delivery tag + * @return uint64_t + */ + uint64_t deliveryTag() const + { + return _deliveryTag; + } + + /** + * Return whether to acknowledgement multiple messages + * @return bool + */ + bool multiple() + { + return _bits.get(0); + } + + /** + * Should the message be put back in the queue? + * @return bool + */ + bool requeue() + { + return _bits.get(1); + } +}; + +/** + * end namespace + */ +} + diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 2e3279b..9ae9591 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -31,6 +31,7 @@ #include "basicconsumeframe.h" #include "basiccancelframe.h" #include "basicackframe.h" +#include "basicnackframe.h" /** * Set up namespace @@ -495,6 +496,21 @@ bool ChannelImpl::ack(uint64_t deliveryTag, int flags) return true; } +/** + * Reject a message + * @param deliveryTag the delivery tag + * @param flags optional flags + * @return bool + */ +bool ChannelImpl::reject(uint64_t deliveryTag, int flags) +{ + // send a nack frame + send(BasicNackFrame(_id, deliveryTag, flags & multiple, flags & requeue)); + + // done + return true; +} + /** * Send a frame over the channel * @param frame frame to send diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index b1053c7..cfe1c77 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -54,6 +54,7 @@ #include "basicgetokframe.h" #include "basicgetemptyframe.h" #include "basicackframe.h" +#include "basicnackframe.h" #include "basicrejectframe.h" #include "basicrecoverasyncframe.h" #include "basicrecoverframe.h" @@ -477,6 +478,7 @@ bool ReceivedFrame::processBasicFrame(ConnectionImpl *connection) case 100: return BasicRecoverAsyncFrame(*this).process(connection); case 110: return BasicRecoverFrame(*this).process(connection); case 111: return BasicRecoverOKFrame(*this).process(connection); + case 120: return BasicNackFrame(*this).process(connection); } // this is a problem