From f0712cfdc25b76403fa712aa19162aced2499b16 Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Thu, 8 Jun 2017 22:50:45 +0200 Subject: [PATCH] Add ACK and NACK callbacks to Channel --- include/callbacks.h | 2 ++ include/channel.h | 30 +++++++++++++++++++++++++++ include/channelimpl.h | 48 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+) diff --git a/include/callbacks.h b/include/callbacks.h index a7c85b9..8037a88 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -48,6 +48,8 @@ using DeleteCallback = std::function; using SizeCallback = std::function; using ConsumeCallback = std::function; using CancelCallback = std::function; +using AckCallback = std::function; +using NackCallback = std::function; /** * End namespace diff --git a/include/channel.h b/include/channel.h index 37baadb..bb51b8e 100644 --- a/include/channel.h +++ b/include/channel.h @@ -145,6 +145,36 @@ public: return _implementation->messageCounter(); } + /** + * Callback that is called when the broker confirmed message publication + * + * Only one callback can be registered. Calling this function multiple + * times will remove the old callback. + * + * For this callback to be called, the channel needs to be in confirm mode. + * + * @param callback the callback to execute + */ + void onAck(const AckCallback &callback) + { + _implementation->onAck(callback); + } + + /** + * Callback that is called when the broker denied message publication + * + * Only one callback can be registered. Calling this function multiple + * times will remove the old callback. + * + * For this callback to be called, the channel needs to be in confirm mode. + * + * @param callback the callback to execute + */ + void onNack(const NackCallback &callback) + { + _implementation->onNack(callback); + } + /** * Start a transaction * diff --git a/include/channelimpl.h b/include/channelimpl.h index c95b1f1..9267bc0 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -139,6 +139,18 @@ private: */ uint64_t _messageCounter = 0; + /** + * Callback when broker confirmed message publication + * @var SuccessCallback + */ + AckCallback _ackCallback; + + /** + * Callback when broker denied message publication + * @var ErrorCallback + */ + NackCallback _nackCallback; + /** * Attach the connection * @param connection @@ -221,6 +233,26 @@ public: */ void onError(const ErrorCallback &callback); + /** + * Callback that is called when the broker confirmed message publication + * @param callback the callback to execute + */ + void onAck(const AckCallback &callback) + { + // store callback + _ackCallback = callback; + } + + /** + * Callback that is called when the broker denied message publication + * @param callback the callback to execute + */ + void onNack(const NackCallback &callback) + { + // store callback + _nackCallback = callback; + } + /** * Pause deliveries on a channel * @@ -576,6 +608,22 @@ public: */ void onSynchronized(); + /** + * Report to the handler that message has been published + */ + void reportAck(uint64_t deliveryTag, bool multiple) + { + if (_ackCallback) _ackCallback(deliveryTag, multiple); + } + + /** + * Report to the handler that message has not been published + */ + void reportNack(uint64_t deliveryTag, bool multiple, bool requeue) + { + if (_nackCallback) _nackCallback(deliveryTag, multiple, requeue); + } + /** * Report to the handler that the channel is opened */