diff --git a/include/channel.h b/include/channel.h index af01f02..37baadb 100644 --- a/include/channel.h +++ b/include/channel.h @@ -123,6 +123,28 @@ public: return _implementation->connected(); } + /** + * Put channel in a confirm mode (RabbitMQ specific) + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + */ + Deferred &setConfirmMode() + { + return _implementation->setConfirmMode(); + } + + /** + * Return number of messages sent. + * + * This functions returns number of messages sent. It's incremented only + * when channel is in confirm mode. + */ + uint64_t messageCounter() const + { + return _implementation->messageCounter(); + } + /** * Start a transaction * diff --git a/include/channelimpl.h b/include/channelimpl.h index 7ddf1e3..c95b1f1 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -133,6 +133,12 @@ private: */ std::shared_ptr _consumer; + /** + * Number of messages sent. Used in confirm mode + * @var uint64_t + */ + uint64_t _messageCounter = 0; + /** * Attach the connection * @param connection @@ -244,6 +250,19 @@ public: return _state == state_connected || _state == state_ready; } + /** + * Put channel in a confirm mode (RabbitMQ specific) + */ + Deferred &setConfirmMode(); + + /** + * Return number of messages sent. + */ + uint64_t messageCounter() const + { + return _messageCounter; + } + /** * Start a transaction */ @@ -696,6 +715,14 @@ public: */ void complete(); + /** + * Start message counter + */ + void startMessageCounter() + { + _messageCounter = 1; + } + /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 1ddf54f..dde08af 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -15,6 +15,7 @@ #include "channelflowframe.h" #include "channelcloseokframe.h" #include "channelcloseframe.h" +#include "confirmselectframe.h" #include "transactionselectframe.h" #include "transactioncommitframe.h" #include "transactionrollbackframe.h" @@ -182,6 +183,18 @@ Deferred &ChannelImpl::resume() return push(ChannelFlowFrame(_id, true)); } +/** + * Put channel in a confirm mode + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + */ +Deferred &ChannelImpl::setConfirmMode() +{ + // send a transaction frame + return push(ConfirmSelectFrame(_id)); +} + /** * Start a transaction * @@ -492,6 +505,10 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin bytesleft -= chunksize; } + // increment message counter if we're in confirm mode + if (_messageCounter) + _messageCounter++; + // done return true; } diff --git a/src/confirmselectokframe.h b/src/confirmselectokframe.h index 4a20992..83ee8b8 100644 --- a/src/confirmselectokframe.h +++ b/src/confirmselectokframe.h @@ -74,6 +74,9 @@ public: // channel does not exist if(!channel) return false; + // start message counter + channel->startMessageCounter(); + // report that the channel is open if (channel->reportSuccess()) channel->onSynchronized();