final step (although untested) for handling returned messages

This commit is contained in:
Emiel Bruijntjes 2018-03-01 22:27:27 +01:00
parent 1f3500cee8
commit 1ccd93cc5e
6 changed files with 82 additions and 12 deletions

View File

@ -342,6 +342,11 @@ public:
/** /**
* Publish a message to an exchange * Publish a message to an exchange
* *
* You have to supply the name of an exchange and a routing key. RabbitMQ will then try
* to send the message to one or more queues. With the optional flags parameter you can
* specify what should happen if the message could not be routed to a queue. By default,
* unroutable message are silently discarded.
*
* This method returns a reference to a DeferredPublisher object. You can use this returned * This method returns a reference to a DeferredPublisher object. You can use this returned
* object to install callbacks that are called when an undeliverable message is returned, or * object to install callbacks that are called when an undeliverable message is returned, or
* to set the callback that is called when the server confirms that the message was received. * to set the callback that is called when the server confirms that the message was received.
@ -351,21 +356,27 @@ public:
* pass in these flags, your callbacks will not be called. If you are not at all interested * pass in these flags, your callbacks will not be called. If you are not at all interested
* in returned messages or publish-confirms, you can ignore the flag and the returned object. * in returned messages or publish-confirms, you can ignore the flag and the returned object.
* *
* Watch out: the channel returns the same DeferredPublisher object for all calls to the * Watch out: the channel returns _the same_ DeferredPublisher object for all calls to the
* publish() method. This means that the callbacks that you install for the first published * publish() method. This means that the callbacks that you install for the first published
* message are also used for subsequent messages _and_ it means that if you install a different * message are also used for subsequent messages _and_ it means that if you install a different
* callback for a later publish operation, it overwrites your earlier callbacks * callback for a later publish operation, it overwrites your earlier callbacks
* *
* The following flags can be supplied:
*
* - mandatory If set, server returns messages that are not sent to a queue
* - immediate If set, server returns messages that can not immediately be forwarded to a consumer.
*
* @param exchange the exchange to publish to * @param exchange the exchange to publish to
* @param routingkey the routing key * @param routingkey the routing key
* @param envelope the full envelope to send * @param envelope the full envelope to send
* @param message the message to send * @param message the message to send
* @param size size of the message * @param size size of the message
* @param flags optional flags
*/ */
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation->publish(exchange, routingKey, envelope); } DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size())); } DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation->publish(exchange, routingKey, Envelope(message, size)); } DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message))); } DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
/** /**
* Set the Quality of Service (QOS) for this channel * Set the Quality of Service (QOS) for this channel

View File

@ -410,9 +410,10 @@ public:
* @param envelope the full envelope to send * @param envelope the full envelope to send
* @param message the message to send * @param message the message to send
* @param size size of the message * @param size size of the message
* @param flags optional flags
* @return DeferredPublisher * @return DeferredPublisher
*/ */
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope); DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags);
/** /**
* Set the Quality of Service (QOS) of the entire connection * Set the Quality of Service (QOS) of the entire connection
@ -707,6 +708,12 @@ public:
* @return The handler responsible for the current message * @return The handler responsible for the current message
*/ */
DeferredReceiver *receiver() const { return _receiver.get(); } DeferredReceiver *receiver() const { return _receiver.get(); }
/**
* Retrieve the deferred publisher that handles returned messages
* @return The deferred publisher object
*/
DeferredPublisher *publisher() const { return _publisher.get(); }
/** /**
* The channel class is its friend, thus can it instantiate this object * The channel class is its friend, thus can it instantiate this object

View File

@ -60,6 +60,13 @@ private:
*/ */
ReturnedCallback _completeCallback; ReturnedCallback _completeCallback;
/**
* Process a return frame
*
* @param frame The frame to process
*/
void process(BasicReturnFrame &frame);
/** /**
* Get reference to self to prevent that object falls out of scope * Get reference to self to prevent that object falls out of scope
* @return std::shared_ptr * @return std::shared_ptr
@ -70,6 +77,11 @@ private:
* Extended implementation of the complete method that is called when a message was fully received * Extended implementation of the complete method that is called when a message was fully received
*/ */
virtual void complete() override; virtual void complete() override;
/**
* Classes that can access private members
*/
friend class BasicReturnFrame;
public: public:
/** /**

View File

@ -1,7 +1,7 @@
/** /**
* Class describing a basic return frame * Class describing a basic return frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 - 2018 Copernica BV
*/ */
/** /**
@ -155,8 +155,23 @@ public:
*/ */
virtual bool process(ConnectionImpl *connection) override virtual bool process(ConnectionImpl *connection) override
{ {
// we no longer support returned messages // we need the appropriate channel
return false; auto channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// get the current publisher
auto publisher = channel->publisher();
// if there is no deferred publisher, we can just as well stop
if (publisher == nullptr) return false;
// initialize the object, because we're about to receive a message
publisher->process(*this);
// done
return true;
} }
}; };

View File

@ -451,9 +451,10 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
* @param envelope the full envelope to send * @param envelope the full envelope to send
* @param message the message to send * @param message the message to send
* @param size size of the message * @param size size of the message
* @param flags
* @return DeferredPublisher * @return DeferredPublisher
*/ */
DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
{ {
// we are going to send out multiple frames, each one will trigger a call to the handler, // we are going to send out multiple frames, each one will trigger a call to the handler,
// which in turn could destruct the channel object, we need to monitor that // which in turn could destruct the channel object, we need to monitor that
@ -465,7 +466,7 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::
if (!_publisher) _publisher.reset(new DeferredPublisher(this)); if (!_publisher) _publisher.reset(new DeferredPublisher(this));
// send the publish frame // send the publish frame
if (!send(BasicPublishFrame(_id, exchange, routingKey))) return *_publisher; if (!send(BasicPublishFrame(_id, exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return *_publisher;
// channel still valid? // channel still valid?
if (!monitor.valid()) return *_publisher; if (!monitor.valid()) return *_publisher;

View File

@ -7,12 +7,34 @@
* @copyright 2018 Copernica BV * @copyright 2018 Copernica BV
*/ */
#include "includes.h" #include "includes.h"
#include "basicreturnframe.h"
/** /**
* Begin of namespace * Begin of namespace
*/ */
namespace AMQP { namespace AMQP {
/**
* Process a return frame
*
* @param frame The frame to process
*/
void DeferredPublisher::process(BasicReturnFrame &frame)
{
// this object will handle all future frames with header and body data
_channel->install(shared_from_this());
// retrieve the delivery tag and whether we were redelivered
_code = frame.replyCode();
_description = frame.replyText();
// notify user space of the begin of the returned message
if (_beginCallback) _beginCallback(_code, _description);
// initialize the object for the next message
initialize(frame.exchange(), frame.routingKey());
}
/** /**
* Indicate that a message was done * Indicate that a message was done
*/ */
@ -29,6 +51,9 @@ void DeferredPublisher::complete()
// for the next iteration we want a new message // for the next iteration we want a new message
_message.reset(); _message.reset();
// the description can be thrown away too
_description.clear();
// do we still have a valid channel // do we still have a valid channel
if (!monitor.valid()) return; if (!monitor.valid()) return;
@ -42,4 +67,3 @@ void DeferredPublisher::complete()
*/ */
} }