diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index 898bdf0..18f9f07 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -133,41 +133,11 @@ public: * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ - Deferred &confirmSelect() + DeferredConfirm &confirmSelect() { return _implementation->confirmSelect(); } - /** - * Callback that is called when the broker confirmed message publication - * - * Only one callback can be registered. Calling this function multiple - * times will remove the old callback. - * - * For this callback to be called, the channel needs to be in confirm mode. - * - * @param callback the callback to execute - */ - void onAck(const AckCallback &callback) - { - _implementation->onAck(callback); - } - - /** - * Callback that is called when the broker denied message publication - * - * Only one callback can be registered. Calling this function multiple - * times will remove the old callback. - * - * For this callback to be called, the channel needs to be in confirm mode. - * - * @param callback the callback to execute - */ - void onNack(const NackCallback &callback) - { - _implementation->onNack(callback); - } - /** * Start a transaction * diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index f1f4534..d3ef475 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -42,6 +42,7 @@ class ConsumedMessage; class ConnectionImpl; class DeferredDelete; class DeferredCancel; +class DeferredConfirm; class DeferredQueue; class DeferredGet; class DeferredPublisher; @@ -80,6 +81,12 @@ private: */ std::shared_ptr _publisher; + /** + * Handler that deals with publisher confirms frames + * @var std::shared_ptr + */ + std::shared_ptr _confirm; + /** * Handlers for all consumers that are active * @var std::map @@ -141,18 +148,6 @@ private: */ std::shared_ptr _receiver; - /** - * Callback when broker confirmed message publication - * @var SuccessCallback - */ - AckCallback _ackCallback; - - /** - * Callback when broker denied message publication - * @var ErrorCallback - */ - NackCallback _nackCallback; - /** * Attach the connection * @param connection @@ -235,26 +230,6 @@ public: */ void onError(const ErrorCallback &callback); - /** - * Callback that is called when the broker confirmed message publication - * @param callback the callback to execute - */ - void onAck(const AckCallback &callback) - { - // store callback - _ackCallback = callback; - } - - /** - * Callback that is called when the broker denied message publication - * @param callback the callback to execute - */ - void onNack(const NackCallback &callback) - { - // store callback - _nackCallback = callback; - } - /** * Pause deliveries on a channel * @@ -287,7 +262,7 @@ public: /** * Put channel in a confirm mode (RabbitMQ specific) */ - Deferred &confirmSelect(); + DeferredConfirm &confirmSelect(); /** * Start a transaction @@ -603,22 +578,6 @@ public: */ void onSynchronized(); - /** - * Report to the handler that message has been published - */ - void reportAck(uint64_t deliveryTag, bool multiple) - { - if (_ackCallback) _ackCallback(deliveryTag, multiple); - } - - /** - * Report to the handler that message has not been published - */ - void reportNack(uint64_t deliveryTag, bool multiple, bool requeue) - { - if (_nackCallback) _nackCallback(deliveryTag, multiple, requeue); - } - /** * Report to the handler that the channel is opened */ @@ -768,6 +727,12 @@ public: */ DeferredPublisher *publisher() const { return _publisher.get(); } + /** + * Retrieve the deferred confirm that handles publisher confirms + * @return The deferred confirm object + */ + DeferredConfirm *confirm() const { return _confirm.get(); } + /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/src/basicackframe.h b/src/basicackframe.h index 9d10e13..514d693 100644 --- a/src/basicackframe.h +++ b/src/basicackframe.h @@ -124,8 +124,14 @@ public: // channel does not exist if(!channel) return false; - // start message counter - channel->reportAck(deliveryTag(), multiple()); + // get the current confirm + auto confirm = channel->confirm(); + + // if there is no deferred confirm, we can just as well stop + if (confirm == nullptr) return false; + + // process the frame + confirm->process(*this); // done return true; diff --git a/src/basicnackframe.h b/src/basicnackframe.h index f6e5fc1..589fb15 100644 --- a/src/basicnackframe.h +++ b/src/basicnackframe.h @@ -122,8 +122,14 @@ public: // channel does not exist if(!channel) return false; - // start message counter - channel->reportNack(deliveryTag(), multiple(), requeue()); + // get the current confirm + auto confirm = channel->confirm(); + + // if there is no deferred confirm, we can just as well stop + if (confirm == nullptr) return false; + + // process the frame + confirm->process(*this); // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index ecdef51..7e79149 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -187,10 +187,19 @@ Deferred &ChannelImpl::resume() * This function returns a deferred handler. Callbacks can be installed * using onSuccess(), onError() and onFinalize() methods. */ -Deferred &ChannelImpl::confirmSelect() +DeferredConfirm &ChannelImpl::confirmSelect() { - // send a transaction frame - return push(ConfirmSelectFrame(_id)); + // the frame to send + ConfirmSelectFrame frame(_id); + + // send the frame, and create deferred object + auto deferred = std::make_shared(!send(frame)); + + // push to list + push(deferred); + + // done + return *deferred; } /**