From 1ccd93cc5e15d3fd3582432c5ec65a2dc7cdb20a Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 22:27:27 +0100 Subject: [PATCH] final step (although untested) for handling returned messages --- include/amqpcpp/channel.h | 21 ++++++++++++++++----- include/amqpcpp/channelimpl.h | 9 ++++++++- include/amqpcpp/deferredpublisher.h | 12 ++++++++++++ src/basicreturnframe.h | 21 ++++++++++++++++++--- src/channelimpl.cpp | 5 +++-- src/deferredpublisher.cpp | 26 +++++++++++++++++++++++++- 6 files changed, 82 insertions(+), 12 deletions(-) diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index 2be43cd..b42fc3a 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -342,6 +342,11 @@ public: /** * Publish a message to an exchange * + * You have to supply the name of an exchange and a routing key. RabbitMQ will then try + * to send the message to one or more queues. With the optional flags parameter you can + * specify what should happen if the message could not be routed to a queue. By default, + * unroutable message are silently discarded. + * * This method returns a reference to a DeferredPublisher object. You can use this returned * object to install callbacks that are called when an undeliverable message is returned, or * to set the callback that is called when the server confirms that the message was received. @@ -351,21 +356,27 @@ public: * pass in these flags, your callbacks will not be called. If you are not at all interested * in returned messages or publish-confirms, you can ignore the flag and the returned object. * - * Watch out: the channel returns the same DeferredPublisher object for all calls to the + * Watch out: the channel returns _the same_ DeferredPublisher object for all calls to the * publish() method. This means that the callbacks that you install for the first published * message are also used for subsequent messages _and_ it means that if you install a different * callback for a later publish operation, it overwrites your earlier callbacks * + * The following flags can be supplied: + * + * - mandatory If set, server returns messages that are not sent to a queue + * - immediate If set, server returns messages that can not immediately be forwarded to a consumer. + * * @param exchange the exchange to publish to * @param routingkey the routing key * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @param flags optional flags */ - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); } - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); } - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); } - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); } + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } /** * Set the Quality of Service (QOS) for this channel diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 41d5742..40d0603 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -410,9 +410,10 @@ public: * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @param flags optional flags * @return DeferredPublisher */ - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope); + DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags); /** * Set the Quality of Service (QOS) of the entire connection @@ -707,6 +708,12 @@ public: * @return The handler responsible for the current message */ DeferredReceiver *receiver() const { return _receiver.get(); } + + /** + * Retrieve the deferred publisher that handles returned messages + * @return The deferred publisher object + */ + DeferredPublisher *publisher() const { return _publisher.get(); } /** * The channel class is its friend, thus can it instantiate this object diff --git a/include/amqpcpp/deferredpublisher.h b/include/amqpcpp/deferredpublisher.h index 77c2465..3c9fb17 100644 --- a/include/amqpcpp/deferredpublisher.h +++ b/include/amqpcpp/deferredpublisher.h @@ -60,6 +60,13 @@ private: */ ReturnedCallback _completeCallback; + /** + * Process a return frame + * + * @param frame The frame to process + */ + void process(BasicReturnFrame &frame); + /** * Get reference to self to prevent that object falls out of scope * @return std::shared_ptr @@ -70,6 +77,11 @@ private: * Extended implementation of the complete method that is called when a message was fully received */ virtual void complete() override; + + /** + * Classes that can access private members + */ + friend class BasicReturnFrame; public: /** diff --git a/src/basicreturnframe.h b/src/basicreturnframe.h index e140e9c..f14ff65 100644 --- a/src/basicreturnframe.h +++ b/src/basicreturnframe.h @@ -1,7 +1,7 @@ /** * Class describing a basic return frame * - * @copyright 2014 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ /** @@ -155,8 +155,23 @@ public: */ virtual bool process(ConnectionImpl *connection) override { - // we no longer support returned messages - return false; + // we need the appropriate channel + auto channel = connection->channel(this->channel()); + + // channel does not exist + if (!channel) return false; + + // get the current publisher + auto publisher = channel->publisher(); + + // if there is no deferred publisher, we can just as well stop + if (publisher == nullptr) return false; + + // initialize the object, because we're about to receive a message + publisher->process(*this); + + // done + return true; } }; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index fff8873..053e479 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -451,9 +451,10 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) * @param envelope the full envelope to send * @param message the message to send * @param size size of the message + * @param flags * @return DeferredPublisher */ -DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) +DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) { // we are going to send out multiple frames, each one will trigger a call to the handler, // which in turn could destruct the channel object, we need to monitor that @@ -465,7 +466,7 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std:: if (!_publisher) _publisher.reset(new DeferredPublisher(this)); // send the publish frame - if (!send(BasicPublishFrame(_id, exchange, routingKey))) return *_publisher; + if (!send(BasicPublishFrame(_id, exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return *_publisher; // channel still valid? if (!monitor.valid()) return *_publisher; diff --git a/src/deferredpublisher.cpp b/src/deferredpublisher.cpp index 72843f1..f25b167 100644 --- a/src/deferredpublisher.cpp +++ b/src/deferredpublisher.cpp @@ -7,12 +7,34 @@ * @copyright 2018 Copernica BV */ #include "includes.h" +#include "basicreturnframe.h" /** * Begin of namespace */ namespace AMQP { +/** + * Process a return frame + * + * @param frame The frame to process + */ +void DeferredPublisher::process(BasicReturnFrame &frame) +{ + // this object will handle all future frames with header and body data + _channel->install(shared_from_this()); + + // retrieve the delivery tag and whether we were redelivered + _code = frame.replyCode(); + _description = frame.replyText(); + + // notify user space of the begin of the returned message + if (_beginCallback) _beginCallback(_code, _description); + + // initialize the object for the next message + initialize(frame.exchange(), frame.routingKey()); +} + /** * Indicate that a message was done */ @@ -29,6 +51,9 @@ void DeferredPublisher::complete() // for the next iteration we want a new message _message.reset(); + + // the description can be thrown away too + _description.clear(); // do we still have a valid channel if (!monitor.valid()) return; @@ -42,4 +67,3 @@ void DeferredPublisher::complete() */ } -