diff --git a/README.md b/README.md index 2ac160d..d6cbeab 100644 --- a/README.md +++ b/README.md @@ -994,27 +994,15 @@ in almost any form: /** * Publish a message to an exchange * - * You have to supply the name of an exchange and a routing key. RabbitMQ will - * then try to send the message to one or more queues. With the optional flags - * parameter you can specify what should happen if the message could not be routed - * to a queue. By default, unroutable message are silently discarded. + * You have to supply the name of an exchange and a routing key. RabbitMQ will then try + * to send the message to one or more queues. With the optional flags parameter you can + * specify what should happen if the message could not be routed to a queue. By default, + * unroutable message are silently discarded. * - * This method returns a reference to a DeferredPublisher object. You can use - * this returned object to install callbacks that are called when an undeliverable - * message is returned, or to set the callback that is called when the server - * confirms that the message was received. - * - * To enable handling returned messages, or to enable publisher-confirms, you must - * not only set the callback, but also pass in appropriate flags to enable this - * feature. If you do not pass in these flags, your callbacks will not be called. - * If you are not at all interested in returned messages or publish-confirms, you - * can ignore the flag and the returned object. - * - * Watch out: the channel returns _the same_ DeferredPublisher object for all - * calls to the publish() method. This means that the callbacks that you install - * for the first published message are also used for subsequent messages _and_ - * it means that if you install a different callback for a later publish - * operation, it overwrites your earlier callbacks + * If you set the 'mandatory' and/or 'immediate' flag, messages that could not be handled + * are returned to the application. Make sure that you have called the recall()-method and + * have set up all appropriate handlers to process these returned messages before you start + * publishing. * * The following flags can be supplied: * @@ -1028,16 +1016,17 @@ in almost any form: * @param size size of the message * @param flags optional flags */ -DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); } -DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } -DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); } -DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } +bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); } +bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } +bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); } +bool publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } ```` Published messages are normally not confirmed by the server, and the RabbitMQ will not send a report back to inform you whether the message was successfully published or not. But with the flags you can instruct RabbitMQ to send back -the message if it was undeliverable. +the message if it was undeliverable. In you use these flags you must also install +callbacks that will process these bounced messages. You can also use transactions to ensure that your messages get delivered. Let's say that you are publishing many messages in a row. If you get diff --git a/include/amqpcpp/confirmed.h b/include/amqpcpp/confirmed.h index 77296cc..47b02e6 100644 --- a/include/amqpcpp/confirmed.h +++ b/include/amqpcpp/confirmed.h @@ -16,7 +16,7 @@ /** * Includes */ -#include "deferredconfirmedpublish.h" +#include "deferredpublish.h" #include /** @@ -35,7 +35,7 @@ private: * removal will be cheaper for whole ranges. * @var size_t */ - std::map> _handlers; + std::map> _handlers; /** * Called when the deliverytag(s) are acked/nacked @@ -90,10 +90,10 @@ public: * @param flags optional flags * @return bool */ - DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0); - DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } - DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); } - DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } + DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0); + DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } + DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); } + DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } }; /** diff --git a/include/amqpcpp/deferredconfirmedpublish.h b/include/amqpcpp/deferredconfirmedpublish.h deleted file mode 100644 index 9d36c3f..0000000 --- a/include/amqpcpp/deferredconfirmedpublish.h +++ /dev/null @@ -1,146 +0,0 @@ -/** - * DeferredConfirmedPublish.h - * - * Deferred callback for RabbitMQ-specific publisher confirms mechanism per-message. - * - * @author Michael van der Werve - * @copyright 2020 Copernica BV - */ - -/** - * Include guard - */ -#pragma once - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * We extend from the default deferred and add extra functionality - */ -class DeferredConfirmedPublish : public Deferred -{ -private: - /** - * Callback to execute when server confirms that message is processed - * @var AckCallback - */ - PublishAckCallback _ackCallback; - - /** - * Callback to execute when server sends negative acknowledgement - * @var NackCallback - */ - PublishNackCallback _nackCallback; - - /** - * Callback to execute when message is lost (nack / error) - * @var LostCallback - */ - PublishLostCallback _lostCallback; - - /** - * Report an ack, calls the callback. - */ - void reportAck() - { - // check if the callback is set - if (_ackCallback) _ackCallback(); - } - - /** - * Report an nack, calls the callback if set. - */ - void reportNack() - { - // check if the callback is set - if (_nackCallback) _nackCallback(); - - // message is 'lost' - if (_lostCallback) _lostCallback(); - } - - /** - * Indicate failure - * @param error Description of the error that occured - */ - void reportError(const char *error) - { - // from this moment on the object should be listed as failed - _failed = true; - - // message is lost - if (_lostCallback) _lostCallback(); - - // execute callbacks if registered - if (_errorCallback) _errorCallback(error); - } - - /** - * The wrapped confirmed channel implementation may call our - * private members and construct us - */ - friend class Confirmed; - - -public: - /** - * Protected constructor that can only be called - * from within the channel implementation - * - * Note: this constructor _should_ be protected, but because make_shared - * will then not work, we have decided to make it public after all, - * because the work-around would result in not-so-easy-to-read code. - * - * @param boolean are we already failed? - */ - DeferredConfirmedPublish(bool failed = false) : Deferred(failed) {} - -public: - /** - * Callback that is called when the broker confirmed message publication - * @param callback the callback to execute - */ - DeferredConfirmedPublish &onAck(const PublishAckCallback &callback) - { - // store callback - _ackCallback = callback; - - // allow chaining - return *this; - } - - /** - * Callback that is called when the broker denied message publication - * @param callback the callback to execute - */ - DeferredConfirmedPublish &onNack(const PublishNackCallback &callback) - { - // store callback - _nackCallback = callback; - - // allow chaining - return *this; - } - - /** - * Callback that is called when a message is lost, either through RabbitMQ - * rejecting it or because of a channel error - * @param callback the callback to execute - */ - DeferredConfirmedPublish &onLost(const PublishLostCallback &callback) - { - // store callback - _lostCallback = callback; - - // allow chaining - return *this; - } -}; - -/** - * End namespace - */ -} diff --git a/src/confirmed.cpp b/src/confirmed.cpp index 3908cbe..062e36e 100644 --- a/src/confirmed.cpp +++ b/src/confirmed.cpp @@ -181,7 +181,7 @@ void Confirmed::reportError(const char *message) * @param size size of the message * @param flags optional flags */ -DeferredConfirmedPublish &Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) +DeferredPublish &Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) { // copy the current identifier, this will be the ID that will come back auto current = _current; @@ -190,7 +190,7 @@ DeferredConfirmedPublish &Confirmed::publish(const std::string &exchange, const bool failed = !Throttle::publish(exchange, routingKey, envelope, flags); // create the open - auto handler = std::make_shared(failed); + auto handler = std::make_shared(failed); // add it to the open handlers _handlers[current] = handler;