onError handler can be set on throttle, and onError & onLost can also be set on messages sent via confirmed.

This commit is contained in:
Michael van der Werve 2020-10-12 12:02:04 +02:00
parent d542ba8e44
commit c58cb6748d
6 changed files with 131 additions and 1 deletions

View File

@ -90,6 +90,7 @@ using NackCallback = std::function<void(uint64_t deliveryTag, bool mu
*/ */
using PublishAckCallback = std::function<void()>; using PublishAckCallback = std::function<void()>;
using PublishNackCallback = std::function<void()>; using PublishNackCallback = std::function<void()>;
using PublishLostCallback = std::function<void()>;
/** /**
* End namespace * End namespace

View File

@ -45,6 +45,12 @@ private:
virtual void onAck(uint64_t deliveryTag, bool multiple) override; virtual void onAck(uint64_t deliveryTag, bool multiple) override;
virtual void onNack(uint64_t deliveryTag, bool multiple) override; virtual void onNack(uint64_t deliveryTag, bool multiple) override;
/**
* Method that is called to report an error
* @param message
*/
virtual void reportError(const char *message) override;
public: public:
/** /**
* Constructor * Constructor

View File

@ -35,6 +35,12 @@ private:
*/ */
PublishNackCallback _nackCallback; PublishNackCallback _nackCallback;
/**
* Callback to execute when message is lost (nack / error)
* @var LostCallback
*/
PublishLostCallback _lostCallback;
/** /**
* Report an ack, calls the callback. * Report an ack, calls the callback.
*/ */
@ -51,6 +57,25 @@ private:
{ {
// check if the callback is set // check if the callback is set
if (_nackCallback) _nackCallback(); if (_nackCallback) _nackCallback();
// message is 'lost'
if (_lostCallback) _lostCallback();
}
/**
* Indicate failure
* @param error Description of the error that occured
*/
void reportError(const char *error)
{
// from this moment on the object should be listed as failed
_failed = true;
// message is lost
if (_lostCallback) _lostCallback();
// execute callbacks if registered
if (_errorCallback) _errorCallback(error);
} }
/** /**
@ -99,6 +124,20 @@ public:
// allow chaining // allow chaining
return *this; return *this;
} }
/**
* Callback that is called when a message is lost, either through RabbitMQ
* rejecting it or because of a channel error
* @param callback the callback to execute
*/
DeferredConfirmedPublish &onLost(const PublishLostCallback &callback)
{
// store callback
_lostCallback = callback;
// allow chaining
return *this;
}
}; };
/** /**

View File

@ -80,6 +80,12 @@ protected:
*/ */
std::shared_ptr<Deferred> _close; std::shared_ptr<Deferred> _close;
/**
* Callback to call when an error occurred
* @var ErrorCallback
*/
ErrorCallback _errorCallback;
/** /**
* Send method for a frame * Send method for a frame
* @param id * @param id
@ -87,6 +93,12 @@ protected:
*/ */
bool send(uint64_t id, const Frame &frame); bool send(uint64_t id, const Frame &frame);
/**
* Method that is called to report an error
* @param message
*/
virtual void reportError(const char *message);
protected: protected:
/** /**
* Called when the deliverytag(s) are acked/nacked * Called when the deliverytag(s) are acked/nacked
@ -98,7 +110,10 @@ protected:
public: public:
/** /**
* Constructor * Constructor. Warning: this takes control of the channel, there should be no extra
* handlers set on the channel (onError) and no further publishes should be done on the
* raw channel either. Doing this will cause the throttle to work incorrectly, as the
* counters are not properly updated.
* @param channel * @param channel
* @param throttle * @param throttle
*/ */
@ -177,6 +192,12 @@ public:
* @return Deferred& * @return Deferred&
*/ */
Deferred &close(); Deferred &close();
/**
* Install an error callback
* @param callback
*/
void onError(const ErrorCallback &callback);
}; };
/** /**

View File

@ -131,6 +131,37 @@ void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
Throttle::onNack(deliveryTag, multiple); Throttle::onNack(deliveryTag, multiple);
} }
/**
* Method that is called to report an error
* @param message
*/
void Confirmed::reportError(const char *message)
{
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
// move the handlers out
auto handlers = std::move(_handlers);
// iterate over all the messages
// call the handlers
for (const auto &iter : handlers)
{
// call the handler
iter.second->reportError(message);
// if we were destructed in the meantime, we leap out
if (!monitor) return;
}
// if the monitor is no longer valid, leap out
if (!monitor) return;
// call base class to let it handle the errors
Throttle::reportError(message);
}
/** /**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ. * Delays actual publishing depending on the publisher confirms sent by RabbitMQ.

View File

@ -35,6 +35,9 @@ Throttle::Throttle(Channel &channel, size_t throttle) : _implementation(channel.
// we might have failed, in which case we throw // we might have failed, in which case we throw
if (!deferred) throw std::runtime_error("could not enable publisher confirms"); if (!deferred) throw std::runtime_error("could not enable publisher confirms");
// we wrap a handling error callback that calls our member function
_implementation->onError([this](const char *message) { reportError(message); });
} }
/** /**
@ -89,6 +92,16 @@ bool Throttle::send(uint64_t id, const Frame &frame)
return _implementation->send(frame); return _implementation->send(frame);
} }
/**
* Method that is called to report an error
* @param message
*/
void Throttle::reportError(const char *message)
{
// if a callback is set, call the handler with the message
if (_errorCallback) _errorCallback(message);
}
/** /**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ. * Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
@ -212,6 +225,25 @@ Deferred &Throttle::close()
return *_close; return *_close;
} }
/**
* Install an error callback
* @param callback
*/
void Throttle::onError(const ErrorCallback &callback)
{
// we store the callback
_errorCallback = callback;
// check the callback
if (!callback) return;
// if the channel is no longer usable, report that
if (!_implementation->usable()) return callback("Channel is no longer usable");
// specify that we're already closing
if (_close) callback("Wrapped channel is closing down");
}
/** /**
* End of namespaces * End of namespaces
*/ */