Replace old API with DeferredConfirm class

This commit is contained in:
Marcin Gibula 2018-05-14 21:12:34 +02:00
parent bf1caa1eae
commit 2aa55b83c7
5 changed files with 43 additions and 87 deletions

View File

@ -133,41 +133,11 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &confirmSelect()
DeferredConfirm &confirmSelect()
{
return _implementation->confirmSelect();
}
/**
* Callback that is called when the broker confirmed message publication
*
* Only one callback can be registered. Calling this function multiple
* times will remove the old callback.
*
* For this callback to be called, the channel needs to be in confirm mode.
*
* @param callback the callback to execute
*/
void onAck(const AckCallback &callback)
{
_implementation->onAck(callback);
}
/**
* Callback that is called when the broker denied message publication
*
* Only one callback can be registered. Calling this function multiple
* times will remove the old callback.
*
* For this callback to be called, the channel needs to be in confirm mode.
*
* @param callback the callback to execute
*/
void onNack(const NackCallback &callback)
{
_implementation->onNack(callback);
}
/**
* Start a transaction
*

View File

@ -42,6 +42,7 @@ class ConsumedMessage;
class ConnectionImpl;
class DeferredDelete;
class DeferredCancel;
class DeferredConfirm;
class DeferredQueue;
class DeferredGet;
class DeferredPublisher;
@ -80,6 +81,12 @@ private:
*/
std::shared_ptr<DeferredPublisher> _publisher;
/**
* Handler that deals with publisher confirms frames
* @var std::shared_ptr<DeferredConfirm>
*/
std::shared_ptr<DeferredConfirm> _confirm;
/**
* Handlers for all consumers that are active
* @var std::map<std::string,std::shared_ptr<DeferredConsumer>
@ -141,18 +148,6 @@ private:
*/
std::shared_ptr<DeferredReceiver> _receiver;
/**
* Callback when broker confirmed message publication
* @var SuccessCallback
*/
AckCallback _ackCallback;
/**
* Callback when broker denied message publication
* @var ErrorCallback
*/
NackCallback _nackCallback;
/**
* Attach the connection
* @param connection
@ -235,26 +230,6 @@ public:
*/
void onError(const ErrorCallback &callback);
/**
* Callback that is called when the broker confirmed message publication
* @param callback the callback to execute
*/
void onAck(const AckCallback &callback)
{
// store callback
_ackCallback = callback;
}
/**
* Callback that is called when the broker denied message publication
* @param callback the callback to execute
*/
void onNack(const NackCallback &callback)
{
// store callback
_nackCallback = callback;
}
/**
* Pause deliveries on a channel
*
@ -287,7 +262,7 @@ public:
/**
* Put channel in a confirm mode (RabbitMQ specific)
*/
Deferred &confirmSelect();
DeferredConfirm &confirmSelect();
/**
* Start a transaction
@ -603,22 +578,6 @@ public:
*/
void onSynchronized();
/**
* Report to the handler that message has been published
*/
void reportAck(uint64_t deliveryTag, bool multiple)
{
if (_ackCallback) _ackCallback(deliveryTag, multiple);
}
/**
* Report to the handler that message has not been published
*/
void reportNack(uint64_t deliveryTag, bool multiple, bool requeue)
{
if (_nackCallback) _nackCallback(deliveryTag, multiple, requeue);
}
/**
* Report to the handler that the channel is opened
*/
@ -768,6 +727,12 @@ public:
*/
DeferredPublisher *publisher() const { return _publisher.get(); }
/**
* Retrieve the deferred confirm that handles publisher confirms
* @return The deferred confirm object
*/
DeferredConfirm *confirm() const { return _confirm.get(); }
/**
* The channel class is its friend, thus can it instantiate this object
*/

View File

@ -124,8 +124,14 @@ public:
// channel does not exist
if(!channel) return false;
// start message counter
channel->reportAck(deliveryTag(), multiple());
// get the current confirm
auto confirm = channel->confirm();
// if there is no deferred confirm, we can just as well stop
if (confirm == nullptr) return false;
// process the frame
confirm->process(*this);
// done
return true;

View File

@ -122,8 +122,14 @@ public:
// channel does not exist
if(!channel) return false;
// start message counter
channel->reportNack(deliveryTag(), multiple(), requeue());
// get the current confirm
auto confirm = channel->confirm();
// if there is no deferred confirm, we can just as well stop
if (confirm == nullptr) return false;
// process the frame
confirm->process(*this);
// done
return true;

View File

@ -187,10 +187,19 @@ Deferred &ChannelImpl::resume()
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::confirmSelect()
DeferredConfirm &ChannelImpl::confirmSelect()
{
// send a transaction frame
return push(ConfirmSelectFrame(_id));
// the frame to send
ConfirmSelectFrame frame(_id);
// send the frame, and create deferred object
auto deferred = std::make_shared<DeferredConfirm>(!send(frame));
// push to list
push(deferred);
// done
return *deferred;
}
/**