diff --git a/include/amqpcpp.h b/include/amqpcpp.h index 179350c..a3ae8b6 100644 --- a/include/amqpcpp.h +++ b/include/amqpcpp.h @@ -73,7 +73,7 @@ #include "amqpcpp/deferredcancel.h" #include "amqpcpp/deferredconfirm.h" #include "amqpcpp/deferredget.h" -#include "amqpcpp/deferredpublisher.h" +#include "amqpcpp/deferredrecall.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" #include "amqpcpp/throttle.h" diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index 0b9f107..ebc0313 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -384,19 +384,10 @@ public: * specify what should happen if the message could not be routed to a queue. By default, * unroutable message are silently discarded. * - * This method returns a reference to a DeferredPublisher object. You can use this returned - * object to install callbacks that are called when an undeliverable message is returned, or - * to set the callback that is called when the server confirms that the message was received. - * - * To enable handling returned messages, or to enable publisher-confirms, you must not only - * set the callback, but also pass in appropriate flags to enable this feature. If you do not - * pass in these flags, your callbacks will not be called. If you are not at all interested - * in returned messages or publish-confirms, you can ignore the flag and the returned object. - * - * Watch out: the channel returns _the same_ DeferredPublisher object for all calls to the - * publish() method. This means that the callbacks that you install for the first published - * message are also used for subsequent messages _and_ it means that if you install a different - * callback for a later publish operation, it overwrites your earlier callbacks + * If you set the 'mandatory' and/or 'immediate' flag, messages that could not be handled + * are returned to the application. Make sure that you have called the recall()-method and + * have set up all appropriate handlers to process these returned messages before you start + * publishing. * * The following flags can be supplied: * @@ -410,10 +401,10 @@ public: * @param size size of the message * @param flags optional flags */ - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); } - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); } - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } + bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); } + bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } /** * Set the Quality of Service (QOS) for this channel @@ -472,6 +463,18 @@ public: DeferredConsumer &consume(const std::string &queue, int flags = 0) { return _implementation->consume(queue, std::string(), flags, Table()); } DeferredConsumer &consume(const std::string &queue, const Table &arguments) { return _implementation->consume(queue, std::string(), 0, arguments); } + /** + * Tell the messages that you are ready to recall/take back messages that messages thar are unroutable. + * + * When you use the publish() method in combination with the 'immediate' or 'mandatory' flag, rabbitmq + * sends back unroutable messages. With this recall() method you can install a sort of pseudo-consumer + * that defines how such returned-messages are processed. + * + * Watch out: when you call this method more than once, you always get access to the same object. You + * can thus not install multiple callbacks for the same event. + */ + DeferredRecall &recall() { return _implementation->recall(); } + /** * Cancel a running consume call * diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index 3cf95bd..5c53bbf 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -45,7 +45,7 @@ class DeferredCancel; class DeferredConfirm; class DeferredQueue; class DeferredGet; -class DeferredPublisher; +class DeferredRecall; class Connection; class Envelope; class Table; @@ -77,13 +77,13 @@ private: /** * Handler that deals with incoming messages as a result of publish operations - * @var std::shared_ptr + * @var DeferredRecall */ - std::shared_ptr _publisher; + std::shared_ptr _recall; /** - * Handler that deals with publisher confirms frames - * @var std::shared_ptr + * Handler that deals with publisher confirms frames + * @var std::shared_ptr */ std::shared_ptr _confirm; @@ -433,9 +433,9 @@ public: * @param message the message to send * @param size size of the message * @param flags optional flags - * @return DeferredPublisher + * @return bool */ - DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags); + bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags); /** * Set the Quality of Service (QOS) of the entire connection @@ -471,6 +471,17 @@ public: */ DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments); + /** + * Tell that you are prepared to recall/take back messages that could not be + * published. This is only meaningful if you pass the 'immediate' or 'mandatory' + * flag to publish() operations. + * + * THis function returns a deferred handler more or less similar to the object + * return by the consume() method and that can be used to install callbacks that + * handle the recalled messages. + */ + DeferredRecall &recall(); + /** * Cancel a running consumer * @param tag the consumer tag @@ -750,10 +761,10 @@ public: DeferredReceiver *receiver() const { return _receiver.get(); } /** - * Retrieve the deferred publisher that handles returned messages - * @return The deferred publisher object + * Retrieve the recalls-object that handles bounces + * @return The deferred recall object */ - DeferredPublisher *publisher() const { return _publisher.get(); } + DeferredRecall *recalls() const { return _recall.get(); } /** * Retrieve the deferred confirm that handles publisher confirms diff --git a/include/amqpcpp/deferredextreceiver.h b/include/amqpcpp/deferredextreceiver.h index 3d427e3..9500a35 100644 --- a/include/amqpcpp/deferredextreceiver.h +++ b/include/amqpcpp/deferredextreceiver.h @@ -4,11 +4,11 @@ * Extended receiver that _wants_ to receive message (because it is * consuming or get'ting messages. This is the base class for both * the DeferredConsumer as well as the DeferredGet classes, but not - * the base of the DeferredPublisher (which can also receive returned + * the base of the DeferredRecall (which can also receive returned * messages, but not as a result of an explicit request) * * @author Emiel Bruijntjes - * @copyright 2018 Copernica BV + * @copyright 2018 - 2020 Copernica BV */ /** diff --git a/include/amqpcpp/deferredpublisher.h b/include/amqpcpp/deferredrecall.h similarity index 83% rename from include/amqpcpp/deferredpublisher.h rename to include/amqpcpp/deferredrecall.h index 3c9fb17..5451110 100644 --- a/include/amqpcpp/deferredpublisher.h +++ b/include/amqpcpp/deferredrecall.h @@ -1,12 +1,11 @@ /** - * DeferredPublisher.h + * DeferredRecall.h * - * Class that is returned when channel::publish() is called, and that - * can be used to install callback methods that define how returned - * messages should be handled. + * Class that an be used to install callback methods that define how + * returned messages should be handled. * * @author Emiel Bruijntjes - * @copyright 2018 Copernica BV + * @copyright 2018 - 2020 Copernica BV */ /** @@ -27,7 +26,7 @@ class ChannelImpl; /** * Class definition */ -class DeferredPublisher : public DeferredReceiver, public std::enable_shared_from_this +class DeferredRecall : public DeferredReceiver, public std::enable_shared_from_this { private: /** @@ -94,7 +93,7 @@ public: * @param channel the channel implementation * @param failed are we already failed? */ - DeferredPublisher(ChannelImpl *channel, bool failed = false) : + DeferredRecall(ChannelImpl *channel, bool failed = false) : DeferredReceiver(failed, channel) {} public: @@ -102,7 +101,7 @@ public: * Register a function to be called when a full message is returned * @param callback the callback to execute */ - DeferredPublisher &onReceived(const BounceCallback &callback) + DeferredRecall &onReceived(const BounceCallback &callback) { // store callback _bounceCallback = callback; @@ -115,7 +114,7 @@ public: * Alias for onReceived() (see above) * @param callback the callback to execute */ - DeferredPublisher &onMessage(const BounceCallback &callback) + DeferredRecall &onMessage(const BounceCallback &callback) { // store callback _bounceCallback = callback; @@ -128,7 +127,7 @@ public: * Alias for onReceived() (see above) * @param callback the callback to execute */ - DeferredPublisher &onReturned(const BounceCallback &callback) + DeferredRecall &onReturned(const BounceCallback &callback) { // store callback _bounceCallback = callback; @@ -141,7 +140,7 @@ public: * Alias for onReceived() (see above) * @param callback the callback to execute */ - DeferredPublisher &onBounced(const BounceCallback &callback) + DeferredRecall &onBounced(const BounceCallback &callback) { // store callback _bounceCallback = callback; @@ -157,7 +156,7 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredPublisher &onBegin(const ReturnCallback &callback) + DeferredRecall &onBegin(const ReturnCallback &callback) { // store callback _beginCallback = callback; @@ -172,7 +171,7 @@ public: * @param callback The callback to invoke for message headers * @return Same object for chaining */ - DeferredPublisher &onSize(const SizeCallback &callback) + DeferredRecall &onSize(const SizeCallback &callback) { // store callback _sizeCallback = callback; @@ -187,7 +186,7 @@ public: * @param callback The callback to invoke for message headers * @return Same object for chaining */ - DeferredPublisher &onHeaders(const HeaderCallback &callback) + DeferredRecall &onHeaders(const HeaderCallback &callback) { // store callback _headerCallback = callback; @@ -209,7 +208,7 @@ public: * @param callback The callback to invoke for chunks of message data * @return Same object for chaining */ - DeferredPublisher &onData(const DataCallback &callback) + DeferredRecall &onData(const DataCallback &callback) { // store callback _dataCallback = callback; @@ -224,7 +223,7 @@ public: * @param callback The callback to invoke * @return Same object for chaining */ - DeferredPublisher &onComplete(const ReturnedCallback &callback) + DeferredRecall &onComplete(const ReturnedCallback &callback) { // store callback _completeCallback = callback; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 403993b..6601c98 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -49,7 +49,7 @@ add_sources( deferredconsumer.cpp deferredreceiver.cpp deferredextreceiver.cpp - deferredpublisher.cpp + deferredrecall.cpp deferredget.cpp exchangebindframe.h exchangebindokframe.h diff --git a/src/basicreturnframe.h b/src/basicreturnframe.h index f14ff65..96ddbec 100644 --- a/src/basicreturnframe.h +++ b/src/basicreturnframe.h @@ -1,7 +1,7 @@ /** * Class describing a basic return frame * - * @copyright 2014 - 2018 Copernica BV + * @copyright 2014 - 2020 Copernica BV */ /** @@ -161,14 +161,14 @@ public: // channel does not exist if (!channel) return false; - // get the current publisher - auto publisher = channel->publisher(); + // get the object that handles bounces + auto recalls = channel->recalls(); // if there is no deferred publisher, we can just as well stop - if (publisher == nullptr) return false; + if (recalls == nullptr) return false; // initialize the object, because we're about to receive a message - publisher->process(*this); + recalls->process(*this); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index b2de96d..8aa8bbe 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -484,9 +484,9 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) * @param message the message to send * @param size size of the message * @param flags - * @return DeferredPublisher + * @return bool */ -DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) +bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) { // we are going to send out multiple frames, each one will trigger a call to the handler, // which in turn could destruct the channel object, we need to monitor that @@ -494,22 +494,20 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std:: // @todo do not copy the entire buffer to individual frames - // @todo this seems utterly (conceptually) broken - - // make sure we have a deferred object to return - if (!_publisher) _publisher.reset(new DeferredPublisher(this)); - // send the publish frame - if (!send(BasicPublishFrame(_id, exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return *_publisher; + if (!send(BasicPublishFrame(_id, exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false; // channel still valid? - if (!monitor.valid()) return *_publisher; + if (!monitor.valid()) return false; // send header - if (!send(BasicHeaderFrame(_id, envelope))) return *_publisher; + if (!send(BasicHeaderFrame(_id, envelope))) return false; + + // if everything has been sent by now + if (envelope.bodySize() == 0) return true; // channel and connection still valid? - if (!monitor.valid() || !_connection) return *_publisher; + if (!monitor.valid() || !_connection) return false; // the max payload size is the max frame size minus the bytes for headers and trailer uint32_t maxpayload = _connection->maxPayload(); @@ -526,10 +524,10 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std:: uint64_t chunksize = std::min(static_cast(maxpayload), bytesleft); // send out a body frame - if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return *_publisher; + if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return false; // channel still valid? - if (!monitor.valid()) return *_publisher; + if (!monitor.valid()) return false; // update counters bytessent += chunksize; @@ -537,7 +535,7 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std:: } // done - return *_publisher; + return true; } /** diff --git a/src/deferredpublisher.cpp b/src/deferredrecall.cpp similarity index 88% rename from src/deferredpublisher.cpp rename to src/deferredrecall.cpp index b75e6df..9841251 100644 --- a/src/deferredpublisher.cpp +++ b/src/deferredrecall.cpp @@ -1,10 +1,10 @@ /** - * DeferredPublisher.cpp + * DeferredRecall.cpp * - * Implementation file for the DeferredPublisher class + * Implementation file for the DeferredRecall class * * @author Emiel Bruijntjes - * @copyright 2018 Copernica BV + * @copyright 2018 - 2020 Copernica BV */ #include "includes.h" #include "basicreturnframe.h" @@ -19,7 +19,7 @@ namespace AMQP { * * @param frame The frame to process */ -void DeferredPublisher::process(BasicReturnFrame &frame) +void DeferredRecall::process(BasicReturnFrame &frame) { // this object will handle all future frames with header and body data _channel->install(shared_from_this()); @@ -41,7 +41,7 @@ void DeferredPublisher::process(BasicReturnFrame &frame) /** * Indicate that a message was done */ -void DeferredPublisher::complete() +void DeferredRecall::complete() { // also monitor the channel Monitor monitor(_channel); diff --git a/src/includes.h b/src/includes.h index 7dafe4b..f3251db 100644 --- a/src/includes.h +++ b/src/includes.h @@ -69,7 +69,7 @@ #include "amqpcpp/callbacks.h" #include "amqpcpp/deferred.h" #include "amqpcpp/deferredconsumer.h" -#include "amqpcpp/deferredpublisher.h" +#include "amqpcpp/deferredrecall.h" #include "amqpcpp/deferredqueue.h" #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h"