Merge pull request #370 from CopernicaMarketingSoftware/onerror-throttle

onError and onLost for Throttle and Confirmed.
This commit is contained in:
Emiel Bruijntjes 2020-10-12 13:23:41 +02:00 committed by GitHub
commit eb732aa162
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 131 additions and 1 deletions

View File

@ -90,6 +90,7 @@ using NackCallback = std::function<void(uint64_t deliveryTag, bool mu
*/
using PublishAckCallback = std::function<void()>;
using PublishNackCallback = std::function<void()>;
using PublishLostCallback = std::function<void()>;
/**
* End namespace

View File

@ -45,6 +45,12 @@ private:
virtual void onAck(uint64_t deliveryTag, bool multiple) override;
virtual void onNack(uint64_t deliveryTag, bool multiple) override;
/**
* Method that is called to report an error
* @param message
*/
virtual void reportError(const char *message) override;
public:
/**
* Constructor

View File

@ -35,6 +35,12 @@ private:
*/
PublishNackCallback _nackCallback;
/**
* Callback to execute when message is lost (nack / error)
* @var LostCallback
*/
PublishLostCallback _lostCallback;
/**
* Report an ack, calls the callback.
*/
@ -51,6 +57,25 @@ private:
{
// check if the callback is set
if (_nackCallback) _nackCallback();
// message is 'lost'
if (_lostCallback) _lostCallback();
}
/**
* Indicate failure
* @param error Description of the error that occured
*/
void reportError(const char *error)
{
// from this moment on the object should be listed as failed
_failed = true;
// message is lost
if (_lostCallback) _lostCallback();
// execute callbacks if registered
if (_errorCallback) _errorCallback(error);
}
/**
@ -99,6 +124,20 @@ public:
// allow chaining
return *this;
}
/**
* Callback that is called when a message is lost, either through RabbitMQ
* rejecting it or because of a channel error
* @param callback the callback to execute
*/
DeferredConfirmedPublish &onLost(const PublishLostCallback &callback)
{
// store callback
_lostCallback = callback;
// allow chaining
return *this;
}
};
/**

View File

@ -80,6 +80,12 @@ protected:
*/
std::shared_ptr<Deferred> _close;
/**
* Callback to call when an error occurred
* @var ErrorCallback
*/
ErrorCallback _errorCallback;
/**
* Send method for a frame
* @param id
@ -87,6 +93,12 @@ protected:
*/
bool send(uint64_t id, const Frame &frame);
/**
* Method that is called to report an error
* @param message
*/
virtual void reportError(const char *message);
protected:
/**
* Called when the deliverytag(s) are acked/nacked
@ -98,7 +110,10 @@ protected:
public:
/**
* Constructor
* Constructor. Warning: this takes control of the channel, there should be no extra
* handlers set on the channel (onError) and no further publishes should be done on the
* raw channel either. Doing this will cause the throttle to work incorrectly, as the
* counters are not properly updated.
* @param channel
* @param throttle
*/
@ -177,6 +192,12 @@ public:
* @return Deferred&
*/
Deferred &close();
/**
* Install an error callback
* @param callback
*/
void onError(const ErrorCallback &callback);
};
/**

View File

@ -131,6 +131,37 @@ void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
Throttle::onNack(deliveryTag, multiple);
}
/**
* Method that is called to report an error
* @param message
*/
void Confirmed::reportError(const char *message)
{
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
// move the handlers out
auto handlers = std::move(_handlers);
// iterate over all the messages
// call the handlers
for (const auto &iter : handlers)
{
// call the handler
iter.second->reportError(message);
// if we were destructed in the meantime, we leap out
if (!monitor) return;
}
// if the monitor is no longer valid, leap out
if (!monitor) return;
// call base class to let it handle the errors
Throttle::reportError(message);
}
/**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.

View File

@ -35,6 +35,9 @@ Throttle::Throttle(Channel &channel, size_t throttle) : _implementation(channel.
// we might have failed, in which case we throw
if (!deferred) throw std::runtime_error("could not enable publisher confirms");
// we wrap a handling error callback that calls our member function
_implementation->onError([this](const char *message) { reportError(message); });
}
/**
@ -89,6 +92,16 @@ bool Throttle::send(uint64_t id, const Frame &frame)
return _implementation->send(frame);
}
/**
* Method that is called to report an error
* @param message
*/
void Throttle::reportError(const char *message)
{
// if a callback is set, call the handler with the message
if (_errorCallback) _errorCallback(message);
}
/**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
@ -212,6 +225,25 @@ Deferred &Throttle::close()
return *_close;
}
/**
* Install an error callback
* @param callback
*/
void Throttle::onError(const ErrorCallback &callback)
{
// we store the callback
_errorCallback = callback;
// check the callback
if (!callback) return;
// if the channel is no longer usable, report that
if (!_implementation->usable()) return callback("Channel is no longer usable");
// specify that we're already closing
if (_close) callback("Wrapped channel is closing down");
}
/**
* End of namespaces
*/