Add ACK and NACK callbacks to Channel

This commit is contained in:
Marcin Gibula 2017-06-08 22:50:45 +02:00
parent 30e652c669
commit f0712cfdc2
3 changed files with 80 additions and 0 deletions

View File

@ -48,6 +48,8 @@ using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using SizeCallback = std::function<void(uint32_t messagecount)>;
using ConsumeCallback = std::function<void(const std::string &consumer)>;
using CancelCallback = std::function<void(const std::string &consumer)>;
using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
using NackCallback = std::function<void(uint64_t deliveryTag, bool multiple, bool requeue)>;
/**
* End namespace

View File

@ -145,6 +145,36 @@ public:
return _implementation->messageCounter();
}
/**
* Callback that is called when the broker confirmed message publication
*
* Only one callback can be registered. Calling this function multiple
* times will remove the old callback.
*
* For this callback to be called, the channel needs to be in confirm mode.
*
* @param callback the callback to execute
*/
void onAck(const AckCallback &callback)
{
_implementation->onAck(callback);
}
/**
* Callback that is called when the broker denied message publication
*
* Only one callback can be registered. Calling this function multiple
* times will remove the old callback.
*
* For this callback to be called, the channel needs to be in confirm mode.
*
* @param callback the callback to execute
*/
void onNack(const NackCallback &callback)
{
_implementation->onNack(callback);
}
/**
* Start a transaction
*

View File

@ -139,6 +139,18 @@ private:
*/
uint64_t _messageCounter = 0;
/**
* Callback when broker confirmed message publication
* @var SuccessCallback
*/
AckCallback _ackCallback;
/**
* Callback when broker denied message publication
* @var ErrorCallback
*/
NackCallback _nackCallback;
/**
* Attach the connection
* @param connection
@ -221,6 +233,26 @@ public:
*/
void onError(const ErrorCallback &callback);
/**
* Callback that is called when the broker confirmed message publication
* @param callback the callback to execute
*/
void onAck(const AckCallback &callback)
{
// store callback
_ackCallback = callback;
}
/**
* Callback that is called when the broker denied message publication
* @param callback the callback to execute
*/
void onNack(const NackCallback &callback)
{
// store callback
_nackCallback = callback;
}
/**
* Pause deliveries on a channel
*
@ -576,6 +608,22 @@ public:
*/
void onSynchronized();
/**
* Report to the handler that message has been published
*/
void reportAck(uint64_t deliveryTag, bool multiple)
{
if (_ackCallback) _ackCallback(deliveryTag, multiple);
}
/**
* Report to the handler that message has not been published
*/
void reportNack(uint64_t deliveryTag, bool multiple, bool requeue)
{
if (_nackCallback) _nackCallback(deliveryTag, multiple, requeue);
}
/**
* Report to the handler that the channel is opened
*/