Breaking change: the channel.publish() method no longer returns a DeferredPublisher-object, because it was not logical that you had to reinstall the callbacks for bounce-methods after each individual publish() call, not was it logical that the callbacks were not always logically linked to the publish() operation that failed (they were always sent to the handler of the most recent publish() instead of to the handler of the failed publish()). At the same time we added a channel.recall() method that can be used to notify the library that user-space is prepared to recall/take-back/accept bounced messages

This commit is contained in:
Emiel Bruijntjes 2020-10-19 08:52:50 +02:00
parent a7f2991b35
commit 31f2d78778
10 changed files with 83 additions and 72 deletions

View File

@ -73,7 +73,7 @@
#include "amqpcpp/deferredcancel.h"
#include "amqpcpp/deferredconfirm.h"
#include "amqpcpp/deferredget.h"
#include "amqpcpp/deferredpublisher.h"
#include "amqpcpp/deferredrecall.h"
#include "amqpcpp/channelimpl.h"
#include "amqpcpp/channel.h"
#include "amqpcpp/throttle.h"

View File

@ -384,19 +384,10 @@ public:
* specify what should happen if the message could not be routed to a queue. By default,
* unroutable message are silently discarded.
*
* This method returns a reference to a DeferredPublisher object. You can use this returned
* object to install callbacks that are called when an undeliverable message is returned, or
* to set the callback that is called when the server confirms that the message was received.
*
* To enable handling returned messages, or to enable publisher-confirms, you must not only
* set the callback, but also pass in appropriate flags to enable this feature. If you do not
* pass in these flags, your callbacks will not be called. If you are not at all interested
* in returned messages or publish-confirms, you can ignore the flag and the returned object.
*
* Watch out: the channel returns _the same_ DeferredPublisher object for all calls to the
* publish() method. This means that the callbacks that you install for the first published
* message are also used for subsequent messages _and_ it means that if you install a different
* callback for a later publish operation, it overwrites your earlier callbacks
* If you set the 'mandatory' and/or 'immediate' flag, messages that could not be handled
* are returned to the application. Make sure that you have called the recall()-method and
* have set up all appropriate handlers to process these returned messages before you start
* publishing.
*
* The following flags can be supplied:
*
@ -410,10 +401,10 @@ public:
* @param size size of the message
* @param flags optional flags
*/
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); }
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); }
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
/**
* Set the Quality of Service (QOS) for this channel
@ -472,6 +463,18 @@ public:
DeferredConsumer &consume(const std::string &queue, int flags = 0) { return _implementation->consume(queue, std::string(), flags, Table()); }
DeferredConsumer &consume(const std::string &queue, const Table &arguments) { return _implementation->consume(queue, std::string(), 0, arguments); }
/**
* Tell the messages that you are ready to recall/take back messages that messages thar are unroutable.
*
* When you use the publish() method in combination with the 'immediate' or 'mandatory' flag, rabbitmq
* sends back unroutable messages. With this recall() method you can install a sort of pseudo-consumer
* that defines how such returned-messages are processed.
*
* Watch out: when you call this method more than once, you always get access to the same object. You
* can thus not install multiple callbacks for the same event.
*/
DeferredRecall &recall() { return _implementation->recall(); }
/**
* Cancel a running consume call
*

View File

@ -45,7 +45,7 @@ class DeferredCancel;
class DeferredConfirm;
class DeferredQueue;
class DeferredGet;
class DeferredPublisher;
class DeferredRecall;
class Connection;
class Envelope;
class Table;
@ -77,13 +77,13 @@ private:
/**
* Handler that deals with incoming messages as a result of publish operations
* @var std::shared_ptr<DeferredPublisher>
* @var DeferredRecall
*/
std::shared_ptr<DeferredPublisher> _publisher;
std::shared_ptr<DeferredRecall> _recall;
/**
* Handler that deals with publisher confirms frames
* @var std::shared_ptr<DeferredConfirm>
* Handler that deals with publisher confirms frames
* @var std::shared_ptr<DeferredConfirm>
*/
std::shared_ptr<DeferredConfirm> _confirm;
@ -433,9 +433,9 @@ public:
* @param message the message to send
* @param size size of the message
* @param flags optional flags
* @return DeferredPublisher
* @return bool
*/
DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags);
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags);
/**
* Set the Quality of Service (QOS) of the entire connection
@ -471,6 +471,17 @@ public:
*/
DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments);
/**
* Tell that you are prepared to recall/take back messages that could not be
* published. This is only meaningful if you pass the 'immediate' or 'mandatory'
* flag to publish() operations.
*
* THis function returns a deferred handler more or less similar to the object
* return by the consume() method and that can be used to install callbacks that
* handle the recalled messages.
*/
DeferredRecall &recall();
/**
* Cancel a running consumer
* @param tag the consumer tag
@ -750,10 +761,10 @@ public:
DeferredReceiver *receiver() const { return _receiver.get(); }
/**
* Retrieve the deferred publisher that handles returned messages
* @return The deferred publisher object
* Retrieve the recalls-object that handles bounces
* @return The deferred recall object
*/
DeferredPublisher *publisher() const { return _publisher.get(); }
DeferredRecall *recalls() const { return _recall.get(); }
/**
* Retrieve the deferred confirm that handles publisher confirms

View File

@ -4,11 +4,11 @@
* Extended receiver that _wants_ to receive message (because it is
* consuming or get'ting messages. This is the base class for both
* the DeferredConsumer as well as the DeferredGet classes, but not
* the base of the DeferredPublisher (which can also receive returned
* the base of the DeferredRecall (which can also receive returned
* messages, but not as a result of an explicit request)
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
* @copyright 2018 - 2020 Copernica BV
*/
/**

View File

@ -1,12 +1,11 @@
/**
* DeferredPublisher.h
* DeferredRecall.h
*
* Class that is returned when channel::publish() is called, and that
* can be used to install callback methods that define how returned
* messages should be handled.
* Class that an be used to install callback methods that define how
* returned messages should be handled.
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
* @copyright 2018 - 2020 Copernica BV
*/
/**
@ -27,7 +26,7 @@ class ChannelImpl;
/**
* Class definition
*/
class DeferredPublisher : public DeferredReceiver, public std::enable_shared_from_this<DeferredPublisher>
class DeferredRecall : public DeferredReceiver, public std::enable_shared_from_this<DeferredRecall>
{
private:
/**
@ -94,7 +93,7 @@ public:
* @param channel the channel implementation
* @param failed are we already failed?
*/
DeferredPublisher(ChannelImpl *channel, bool failed = false) :
DeferredRecall(ChannelImpl *channel, bool failed = false) :
DeferredReceiver(failed, channel) {}
public:
@ -102,7 +101,7 @@ public:
* Register a function to be called when a full message is returned
* @param callback the callback to execute
*/
DeferredPublisher &onReceived(const BounceCallback &callback)
DeferredRecall &onReceived(const BounceCallback &callback)
{
// store callback
_bounceCallback = callback;
@ -115,7 +114,7 @@ public:
* Alias for onReceived() (see above)
* @param callback the callback to execute
*/
DeferredPublisher &onMessage(const BounceCallback &callback)
DeferredRecall &onMessage(const BounceCallback &callback)
{
// store callback
_bounceCallback = callback;
@ -128,7 +127,7 @@ public:
* Alias for onReceived() (see above)
* @param callback the callback to execute
*/
DeferredPublisher &onReturned(const BounceCallback &callback)
DeferredRecall &onReturned(const BounceCallback &callback)
{
// store callback
_bounceCallback = callback;
@ -141,7 +140,7 @@ public:
* Alias for onReceived() (see above)
* @param callback the callback to execute
*/
DeferredPublisher &onBounced(const BounceCallback &callback)
DeferredRecall &onBounced(const BounceCallback &callback)
{
// store callback
_bounceCallback = callback;
@ -157,7 +156,7 @@ public:
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredPublisher &onBegin(const ReturnCallback &callback)
DeferredRecall &onBegin(const ReturnCallback &callback)
{
// store callback
_beginCallback = callback;
@ -172,7 +171,7 @@ public:
* @param callback The callback to invoke for message headers
* @return Same object for chaining
*/
DeferredPublisher &onSize(const SizeCallback &callback)
DeferredRecall &onSize(const SizeCallback &callback)
{
// store callback
_sizeCallback = callback;
@ -187,7 +186,7 @@ public:
* @param callback The callback to invoke for message headers
* @return Same object for chaining
*/
DeferredPublisher &onHeaders(const HeaderCallback &callback)
DeferredRecall &onHeaders(const HeaderCallback &callback)
{
// store callback
_headerCallback = callback;
@ -209,7 +208,7 @@ public:
* @param callback The callback to invoke for chunks of message data
* @return Same object for chaining
*/
DeferredPublisher &onData(const DataCallback &callback)
DeferredRecall &onData(const DataCallback &callback)
{
// store callback
_dataCallback = callback;
@ -224,7 +223,7 @@ public:
* @param callback The callback to invoke
* @return Same object for chaining
*/
DeferredPublisher &onComplete(const ReturnedCallback &callback)
DeferredRecall &onComplete(const ReturnedCallback &callback)
{
// store callback
_completeCallback = callback;

View File

@ -49,7 +49,7 @@ add_sources(
deferredconsumer.cpp
deferredreceiver.cpp
deferredextreceiver.cpp
deferredpublisher.cpp
deferredrecall.cpp
deferredget.cpp
exchangebindframe.h
exchangebindokframe.h

View File

@ -1,7 +1,7 @@
/**
* Class describing a basic return frame
*
* @copyright 2014 - 2018 Copernica BV
* @copyright 2014 - 2020 Copernica BV
*/
/**
@ -161,14 +161,14 @@ public:
// channel does not exist
if (!channel) return false;
// get the current publisher
auto publisher = channel->publisher();
// get the object that handles bounces
auto recalls = channel->recalls();
// if there is no deferred publisher, we can just as well stop
if (publisher == nullptr) return false;
if (recalls == nullptr) return false;
// initialize the object, because we're about to receive a message
publisher->process(*this);
recalls->process(*this);
// done
return true;

View File

@ -484,9 +484,9 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
* @param message the message to send
* @param size size of the message
* @param flags
* @return DeferredPublisher
* @return bool
*/
DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
{
// we are going to send out multiple frames, each one will trigger a call to the handler,
// which in turn could destruct the channel object, we need to monitor that
@ -494,22 +494,20 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::
// @todo do not copy the entire buffer to individual frames
// @todo this seems utterly (conceptually) broken
// make sure we have a deferred object to return
if (!_publisher) _publisher.reset(new DeferredPublisher(this));
// send the publish frame
if (!send(BasicPublishFrame(_id, exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return *_publisher;
if (!send(BasicPublishFrame(_id, exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false;
// channel still valid?
if (!monitor.valid()) return *_publisher;
if (!monitor.valid()) return false;
// send header
if (!send(BasicHeaderFrame(_id, envelope))) return *_publisher;
if (!send(BasicHeaderFrame(_id, envelope))) return false;
// if everything has been sent by now
if (envelope.bodySize() == 0) return true;
// channel and connection still valid?
if (!monitor.valid() || !_connection) return *_publisher;
if (!monitor.valid() || !_connection) return false;
// the max payload size is the max frame size minus the bytes for headers and trailer
uint32_t maxpayload = _connection->maxPayload();
@ -526,10 +524,10 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
// send out a body frame
if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return *_publisher;
if (!send(BodyFrame(_id, data + bytessent, (uint32_t)chunksize))) return false;
// channel still valid?
if (!monitor.valid()) return *_publisher;
if (!monitor.valid()) return false;
// update counters
bytessent += chunksize;
@ -537,7 +535,7 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::
}
// done
return *_publisher;
return true;
}
/**

View File

@ -1,10 +1,10 @@
/**
* DeferredPublisher.cpp
* DeferredRecall.cpp
*
* Implementation file for the DeferredPublisher class
* Implementation file for the DeferredRecall class
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2018 Copernica BV
* @copyright 2018 - 2020 Copernica BV
*/
#include "includes.h"
#include "basicreturnframe.h"
@ -19,7 +19,7 @@ namespace AMQP {
*
* @param frame The frame to process
*/
void DeferredPublisher::process(BasicReturnFrame &frame)
void DeferredRecall::process(BasicReturnFrame &frame)
{
// this object will handle all future frames with header and body data
_channel->install(shared_from_this());
@ -41,7 +41,7 @@ void DeferredPublisher::process(BasicReturnFrame &frame)
/**
* Indicate that a message was done
*/
void DeferredPublisher::complete()
void DeferredRecall::complete()
{
// also monitor the channel
Monitor monitor(_channel);

View File

@ -69,7 +69,7 @@
#include "amqpcpp/callbacks.h"
#include "amqpcpp/deferred.h"
#include "amqpcpp/deferredconsumer.h"
#include "amqpcpp/deferredpublisher.h"
#include "amqpcpp/deferredrecall.h"
#include "amqpcpp/deferredqueue.h"
#include "amqpcpp/deferreddelete.h"
#include "amqpcpp/deferredcancel.h"