From d1ab8b179ae944915d87e275d823b705928588a6 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Mon, 6 Jan 2014 06:49:31 -0800 Subject: [PATCH] implemented returned messages --- README.md | 19 ++++++++--- include/channel.h | 9 +++--- include/channelhandler.h | 20 +++++++++++- include/channelimpl.h | 5 +-- include/classes.h | 1 + include/message.h | 52 ++---------------------------- src/basicreturnframe.h | 20 ++++++++++++ src/bodyframe.h | 2 +- src/channelimpl.cpp | 37 ++++++++++++++++++++-- src/consumedmessage.h | 68 ++++++++++++++++++++++++++++++++++++++++ src/messageimpl.h | 26 +++++++++------ src/returnedmessage.h | 65 ++++++++++++++++++++++++++++++++++++++ tests/Makefile | 2 +- tests/myconnection.cpp | 30 ++++++++++++++---- tests/myconnection.h | 32 ++++++++++++++----- 15 files changed, 298 insertions(+), 90 deletions(-) create mode 100644 src/consumedmessage.h create mode 100644 src/returnedmessage.h diff --git a/README.md b/README.md index 1b06f52..989954a 100644 --- a/README.md +++ b/README.md @@ -264,12 +264,21 @@ channel.declareQueue("my-queue-name", AMQP::durable + AMQP::autodelete, argument WORK IN PROGRESS ================ -Almost all AMQP features have been implemented. We only need to add support for -returned messages. We also need to add more safety checks so that strange data -from RabbitMQ does not break the library (although in reality RabbitMQ only sends -valid data). +Almost all AMQP features have been implemented. But the following things might +need additional attention: -It would also be nice to have sample implementations for the ConnectionHandler + - ability to set up secure connections (or is this fully done on the IO level) + - login with other protocols than login/password + - publish confirms + - closing down the connection + +We also need to add more safety checks so that strange or invalid data from +RabbitMQ does not break the library (although in reality RabbitMQ only sends +valid data). Also, when we now receive an answer from RabbitMQ that does not +match the request that we earlier sent, we do not report an error (this is also +an issue that only occurs in theory). + +It would be nice to have sample implementations for the ConnectionHandler class that can be directly plugged into libev, libevent and libuv event loops. For performance reasons, we need to investigate if we can limit the number of times diff --git a/include/channel.h b/include/channel.h index 8ccc59f..0b36dba 100644 --- a/include/channel.h +++ b/include/channel.h @@ -256,7 +256,10 @@ public: * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method * - * @todo implement to onReturned() method + * If either of the two flags is set, and the message could not immediately + * be published, the message is returned by the server to the client. If you + * want to catch such returned messages, you need to implement the + * ChannelHandler::onReturned() method. * * @param exchange the exchange to publish to * @param routingkey the routing key @@ -354,12 +357,10 @@ public: * - multiple acknoledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too * * @param deliveryTag the unique delivery tag of the message - * @param message the message * @param flags optional flags * @return bool */ bool ack(uint64_t deliveryTag, int flags=0) { return _implementation.ack(deliveryTag, flags); } - bool ack(const Message &message, int flags=0) { return _implementation.ack(message.deliveryTag(), flags); } /** * Reject or nack a message @@ -374,12 +375,10 @@ public: * - requeue if set, the message is put back in the queue, otherwise it is dead-lettered/removed * * @param deliveryTag the unique delivery tag of the message - * @param message the original message * @param flags optional flags * @return bool */ bool reject(uint64_t deliveryTag, int flags=0) { return _implementation.reject(deliveryTag, flags); } - bool reject(const Message &message, int flags=0) { return _implementation.reject(message.deliveryTag(), flags); } /** * Recover all messages that were not yet acked diff --git a/include/channelhandler.h b/include/channelhandler.h index ef3f7f5..480041b 100644 --- a/include/channelhandler.h +++ b/include/channelhandler.h @@ -170,10 +170,28 @@ public: /** * Method that is called when a message has been received on a channel + * This message will be called for every message that is received after + * you started consuming. Make sure you acknowledge the messages when its + * safe to remove them from RabbitMQ (unless you set no-ack option when you + * started the consumer) * @param channel the channel on which the consumer was started * @param message the consumed message + * @param deliveryTag the delivery tag, you need this to acknowledge the message + * @param consumerTag the consumer identifier that was used to retrieve this message + * @param redelivered is this a redelivered message? */ - virtual void onReceived(Channel *channel, const Message &message) {} + virtual void onReceived(Channel *channel, const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {} + + /** + * Method that is called when a message you tried to publish was returned + * by the server. This only happens when the 'mandatory' or 'immediate' flag + * was set with the Channel::publish() call. + * @param channel the channel on which the message was returned + * @param message the returned message + * @param code the reply code + * @param text human readable reply reason + */ + virtual void onReturned(Channel *channel, const Message &message, int16_t code, const std::string &text) {} /** * Method that is called when the server starts recovering messages diff --git a/include/channelimpl.h b/include/channelimpl.h index 8c34edd..13e7b47 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -16,7 +16,7 @@ namespace AMQP { /** * Class definition */ -class ChannelImpl +class ChannelImpl : public Watchable { private: /** @@ -463,7 +463,7 @@ public: /** * Report that a message was received */ - void reportReceived(); + void reportMessage(); /** * Report that the recover operation has started @@ -479,6 +479,7 @@ public: * @return MessageImpl */ MessageImpl *message(const BasicDeliverFrame &frame); + MessageImpl *message(const BasicReturnFrame &frame); /** * Retrieve the current incoming message diff --git a/include/classes.h b/include/classes.h index 57ef08d..af49200 100644 --- a/include/classes.h +++ b/include/classes.h @@ -16,6 +16,7 @@ namespace AMQP { class Array; class BasicDeliverFrame; class BasicHeaderFrame; +class BasicReturnFrame; class BodyFrame; class Channel; class Connection; diff --git a/include/message.h b/include/message.h index 4349bc0..a036420 100644 --- a/include/message.h +++ b/include/message.h @@ -21,24 +21,6 @@ namespace AMQP { class Message : public Envelope { protected: - /** - * The consumer tag over which it was delivered - * @var string - */ - std::string _consumerTag; - - /** - * Unique delivery tag to identify and ack the mesage - * @var uint64_t - */ - uint64_t _deliveryTag; - - /** - * Is this a redelivered message / has it been delivered before? - * @var bool - */ - bool _redelivered; - /** * The exchange to which it was originally published * @var string @@ -55,14 +37,11 @@ protected: /** * The constructor is protected to ensure that endusers can not * instantiate a message - * @param consumerTag - * @param deliveryTag - * @param redelivered * @param exchange * @param routingKey */ - Message(const std::string &consumerTag, uint64_t deliveryTag, bool redelivered, const std::string &exchange, const std::string &routingKey) : - Envelope(nullptr, 0), _consumerTag(consumerTag), _deliveryTag(deliveryTag), _redelivered(redelivered), _exchange(exchange), _routingKey(routingKey) + Message(const std::string &exchange, const std::string &routingKey) : + Envelope(nullptr, 0), _exchange(exchange), _routingKey(routingKey) {} public: @@ -71,33 +50,6 @@ public: */ virtual ~Message() {} - /** - * The consumer tag over which it was delivered - * @return string - */ - const std::string &consumerTag() const - { - return _consumerTag; - } - - /** - * Unique delivery tag to identify and ack the mesage - * @return uint64_t - */ - uint64_t deliveryTag() const - { - return _deliveryTag; - } - - /** - * Is this a redelivered message / has it been delivered before? - * @var bool - */ - bool redelivered() const - { - return _redelivered; - } - /** * The exchange to which it was originally published * @var string diff --git a/src/basicreturnframe.h b/src/basicreturnframe.h index 7f1d40f..1759240 100644 --- a/src/basicreturnframe.h +++ b/src/basicreturnframe.h @@ -136,6 +136,26 @@ public: { return _replyCode; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // construct the message + channel->message(*this); + + // done + return true; + } }; /** diff --git a/src/bodyframe.h b/src/bodyframe.h index a75cdc0..15500a6 100644 --- a/src/bodyframe.h +++ b/src/bodyframe.h @@ -113,7 +113,7 @@ public: if (!message->append(_payload, _size)) return true; // the message is complete - channel->reportReceived(); + channel->reportMessage(); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 819ed0e..3b499f1 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -7,7 +7,10 @@ */ #include "includes.h" #include "basicdeliverframe.h" +#include "basicreturnframe.h" #include "messageimpl.h" +#include "consumedmessage.h" +#include "returnedmessage.h" #include "channelopenframe.h" #include "channelflowframe.h" #include "channelcloseokframe.h" @@ -540,9 +543,23 @@ size_t ChannelImpl::send(const Frame &frame) /** * Report the received message */ -void ChannelImpl::reportReceived() +void ChannelImpl::reportMessage() { - if (_handler) _handler->onReceived(_parent, *_message); + // skip if there is no message + if (!_message) return; + + // after the report the channel may be destructed, monitor that + Monitor monitor(this); + + // do we have a handler? + if (_handler) _message->report(_parent, _handler); + + // skip if channel was destructed + if (!monitor.valid()) return; + + // no longer need the message + delete _message; + _message = nullptr; } /** @@ -556,7 +573,21 @@ MessageImpl *ChannelImpl::message(const BasicDeliverFrame &frame) if (_message) delete _message; // construct a message - return _message = new MessageImpl(frame); + return _message = new ConsumedMessage(frame); +} + +/** + * Create an incoming message + * @param frame + * @return MessageImpl + */ +MessageImpl *ChannelImpl::message(const BasicReturnFrame &frame) +{ + // it should not be possible that a message already exists, but lets check it anyhow + if (_message) delete _message; + + // construct a message + return _message = new ReturnedMessage(frame); } /** diff --git a/src/consumedmessage.h b/src/consumedmessage.h new file mode 100644 index 0000000..30ed208 --- /dev/null +++ b/src/consumedmessage.h @@ -0,0 +1,68 @@ +/** + * Base class for a message implementation + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class ConsumedMessage : public MessageImpl +{ +private: + /** + * The consumer tag + * @var string + */ + std::string _consumerTag; + + /** + * The delivery tag + * @var uint64_t + */ + uint64_t _deliveryTag; + + /** + * Is this a redelivered message? + * @var bool + */ + bool _redelivered; + + +public: + /** + * Constructor + * @param frame + */ + ConsumedMessage(const BasicDeliverFrame &frame) : + MessageImpl(frame.exchange(), frame.routingKey()), + _consumerTag(frame.consumerTag()), _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered()) + {} + + /** + * Destructor + */ + virtual ~ConsumedMessage() {} + + /** + * Report to the handler + * @param channel + * @param handler + */ + virtual void report(Channel *channel, ChannelHandler *handler) override + { + // report to the handler + handler->onReceived(channel, *this, _deliveryTag, _consumerTag, _redelivered); + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/messageimpl.h b/src/messageimpl.h index e4f60c6..27d5b85 100644 --- a/src/messageimpl.h +++ b/src/messageimpl.h @@ -1,8 +1,8 @@ /** - * MessageImpl.h + * Base class for a message implementation * - * Implementation of the message object that is only accessible for the - * AMQP library internals + * This is the base class for either the returned message or the consumed + * message. * * @copyright 2014 Copernica BV */ @@ -30,17 +30,18 @@ private: */ bool _selfAllocated; - -public: +protected: /** * Constructor - * @param frame + * @param exchange + * @param routingKey */ - MessageImpl(const BasicDeliverFrame &frame) : - Message(frame.consumerTag(), frame.deliveryTag(), frame.redelivered(), frame.exchange(), frame.routingKey()), + MessageImpl(const std::string &exchange, const std::string &routingKey) : + Message(exchange, routingKey), _received(0), _selfAllocated(false) {} - + +public: /** * Destructor */ @@ -97,6 +98,13 @@ public: return _received >= _bodySize; } } + + /** + * Report to the handler + * @param channel + * @param handler + */ + virtual void report(Channel *channel, ChannelHandler *handler) = 0; }; /** diff --git a/src/returnedmessage.h b/src/returnedmessage.h new file mode 100644 index 0000000..61115d7 --- /dev/null +++ b/src/returnedmessage.h @@ -0,0 +1,65 @@ +/** + * ReturnedMessage.h + * + * Message that is received via a return call from the server, because it + * was published with the immediate or mandatory flag, and could not be + * delivered according to those rules. + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class ReturnedMessage : public MessageImpl +{ +private: + /** + * The reply code + * @var int16_t + */ + int16_t _replyCode; + + /** + * The reply message + * @var string + */ + std::string _replyText; + + +public: + /** + * Constructor + * @param frame + */ + ReturnedMessage(const BasicReturnFrame &frame) : + MessageImpl(frame.exchange(), frame.routingKey()), + _replyCode(frame.replyCode()), _replyText(frame.replyText()) {} + + /** + * Destructor + */ + virtual ~ReturnedMessage() {} + + /** + * Report to the handler + * @param channel + * @param handler + */ + virtual void report(Channel *channel, ChannelHandler *handler) override + { + // report to the handler + handler->onReturned(channel, *this, _replyCode, _replyText); + } +}; + +/** + * End of namespace + */ +} + diff --git a/tests/Makefile b/tests/Makefile index 1c1dec2..5c06aee 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,4 +1,4 @@ -CPP = g++ +CPP = g++-4.8 CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g LD = g++ LDFLAGS = -llibamqp -lcopernica_event -lcopernica_network -lev diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index 715f10f..c57fb56 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -356,11 +356,11 @@ void MyConnection::onQueueBound(AMQP::Channel *channel) std::cout << "AMQP Queue bound" << std::endl; // _connection->setQos(10); - _channel->setQos(1); +// _channel->setQos(1); - _channel->publish("my_exchange", "key", "this is the message"); - _channel->consume("my_queue"); + _channel->publish("my_exchange", "invalid-key", AMQP::mandatory, "this is the message"); +// _channel->consume("my_queue"); } /** @@ -417,17 +417,35 @@ void MyConnection::onConsumerStarted(AMQP::Channel *channel, const std::string & } /** - * Method that is called when a message has been consumed + * Method that is called when a message has been received on a channel * @param channel the channel on which the consumer was started * @param message the consumed message + * @param deliveryTag the delivery tag, you need this to acknowledge the message + * @param consumerTag the consumer identifier that was used to retrieve this message + * @param redelivered is this a redelivered message? */ -void MyConnection::onReceived(AMQP::Channel *channel, const AMQP::Message &message) +void MyConnection::onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) { // show std::cout << "AMQP consumed: " << message.message() << std::endl; // ack the message - channel->ack(message); + channel->ack(deliveryTag); +} + +/** + * Method that is called when a message you tried to publish was returned + * by the server. This only happens when the 'mandatory' or 'immediate' flag + * was set with the Channel::publish() call. + * @param channel the channel on which the message was returned + * @param message the returned message + * @param code the reply code + * @param text human readable reply reason + */ +void MyConnection::onReturned(AMQP::Channel *channel, const AMQP::Message &message, int16_t code, const std::string &text) +{ + // show + std::cout << "AMQP message returned: " << text << std::endl; } /** diff --git a/tests/myconnection.h b/tests/myconnection.h index a1f6b11..61b0e1e 100644 --- a/tests/myconnection.h +++ b/tests/myconnection.h @@ -225,13 +225,6 @@ private: * @param tag the consumer tag */ virtual void onConsumerStarted(AMQP::Channel *channel, const std::string &tag) override; - - /** - * Method that is called when a message has been consumed - * @param channel the channel on which the consumer was started - * @param message the consumed message - */ - virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message) override; /** * Method that is called when a consumer was stopped @@ -240,6 +233,31 @@ private: * @param tag the consumer tag */ virtual void onConsumerStopped(AMQP::Channel *channel, const std::string &tag) override; + + /** + * Method that is called when a message has been received on a channel + * This message will be called for every message that is received after + * you started consuming. Make sure you acknowledge the messages when its + * safe to remove them from RabbitMQ (unless you set no-ack option when you + * started the consumer) + * @param channel the channel on which the consumer was started + * @param message the consumed message + * @param deliveryTag the delivery tag, you need this to acknowledge the message + * @param consumerTag the consumer identifier that was used to retrieve this message + * @param redelivered is this a redelivered message? + */ + virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) override; + + /** + * Method that is called when a message you tried to publish was returned + * by the server. This only happens when the 'mandatory' or 'immediate' flag + * was set with the Channel::publish() call. + * @param channel the channel on which the message was returned + * @param message the returned message + * @param code the reply code + * @param text human readable reply reason + */ + virtual void onReturned(AMQP::Channel *channel, const AMQP::Message &message, int16_t code, const std::string &text) override; public: