From bf1caa1eae45cbffcc75908690514adb8f18b1ce Mon Sep 17 00:00:00 2001 From: Marcin Gibula Date: Mon, 14 May 2018 21:05:49 +0200 Subject: [PATCH] Add DeferredConfirm class --- include/amqpcpp/deferredconfirm.h | 93 +++++++++++++++++++++++++++++++ src/deferredconfirm.cpp | 40 +++++++++++++ src/includes.h | 1 + 3 files changed, 134 insertions(+) create mode 100644 include/amqpcpp/deferredconfirm.h create mode 100644 src/deferredconfirm.cpp diff --git a/include/amqpcpp/deferredconfirm.h b/include/amqpcpp/deferredconfirm.h new file mode 100644 index 0000000..724e7f8 --- /dev/null +++ b/include/amqpcpp/deferredconfirm.h @@ -0,0 +1,93 @@ +/** + * DeferredDelete.h + * + * Deferred callback for RabbitMQ-specific publisher confirms mechanism. + * + * @author Marcin Gibula + * @copyright 2018 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * We extend from the default deferred and add extra functionality + */ +class DeferredConfirm : public Deferred +{ +private: + /** + * Callback to execute when server confirms that message is processed + * @var AckCallback + */ + AckCallback _ackCallback; + + /** + * Callback to execute when server sends negative acknowledgement + * @var NackCallback + */ + NackCallback _nackCallback; + + /** + * Process an ACK frame + * + * @param frame The frame to process + */ + void process(BasicAckFrame &frame); + + + /** + * Process an ACK frame + * + * @param frame The frame to process + */ + void process(BasicNackFrame &frame); + + /** + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + friend class BasicAckFrame; + friend class BasicNackFrame; + +public: + /** + * Protected constructor that can only be called + * from within the channel implementation + * + * Note: this constructor _should_ be protected, but because make_shared + * will then not work, we have decided to make it public after all, + * because the work-around would result in not-so-easy-to-read code. + * + * @param boolean are we already failed? + */ + DeferredConfirm(bool failed = false) : Deferred(failed) {} + +public: + /** + * Register the function that is called when channel is put in publisher + * confirmed mode + * @param callback + */ + DeferredConfirm &onSuccess(const SuccessCallback &callback) + { + // call base + Deferred::onSuccess(callback); + + // allow chaining + return *this; + } +}; + +/** + * End namespace + */ +} diff --git a/src/deferredconfirm.cpp b/src/deferredconfirm.cpp new file mode 100644 index 0000000..4a473f5 --- /dev/null +++ b/src/deferredconfirm.cpp @@ -0,0 +1,40 @@ +/** + * DeferredConfirm.cpp + * + * Implementation file for the DeferredConfirm class + * + * @author Marcin Gibula + * @copyright 2018 Copernica BV + */ +#include "includes.h" + +/** + * Namespace + */ +namespace AMQP { + +/** + * Process an ACK frame + * + * @param frame The frame to process + */ +void DeferredConfirm::process(BasicAckFrame &frame) +{ + if (_ackCallback) _ackCallback(frame.deliveryTag(), frame.multiple()); +} + +/** + * Process a NACK frame + * + * @param frame The frame to process + */ +void DeferredConfirm::process(BasicNackFrame &frame) +{ + if (_nackCallback) _nackCallback(frame.deliveryTag(), frame.multiple(), frame.requeue()); +} + +/** + * End namespace + */ +} + diff --git a/src/includes.h b/src/includes.h index 795d501..2cc31bd 100644 --- a/src/includes.h +++ b/src/includes.h @@ -72,6 +72,7 @@ #include "amqpcpp/deferredqueue.h" #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h" +#include "amqpcpp/deferredconfirm.h" #include "amqpcpp/deferredget.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h"