Add Channel::setConfirmMode() and Channel::messageCounter() methods

This commit is contained in:
Marcin Gibula 2017-06-08 22:19:55 +02:00
parent c7a12d22c1
commit 30e652c669
4 changed files with 69 additions and 0 deletions

View File

@ -123,6 +123,28 @@ public:
return _implementation->connected(); return _implementation->connected();
} }
/**
* Put channel in a confirm mode (RabbitMQ specific)
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &setConfirmMode()
{
return _implementation->setConfirmMode();
}
/**
* Return number of messages sent.
*
* This functions returns number of messages sent. It's incremented only
* when channel is in confirm mode.
*/
uint64_t messageCounter() const
{
return _implementation->messageCounter();
}
/** /**
* Start a transaction * Start a transaction
* *

View File

@ -133,6 +133,12 @@ private:
*/ */
std::shared_ptr<DeferredConsumerBase> _consumer; std::shared_ptr<DeferredConsumerBase> _consumer;
/**
* Number of messages sent. Used in confirm mode
* @var uint64_t
*/
uint64_t _messageCounter = 0;
/** /**
* Attach the connection * Attach the connection
* @param connection * @param connection
@ -244,6 +250,19 @@ public:
return _state == state_connected || _state == state_ready; return _state == state_connected || _state == state_ready;
} }
/**
* Put channel in a confirm mode (RabbitMQ specific)
*/
Deferred &setConfirmMode();
/**
* Return number of messages sent.
*/
uint64_t messageCounter() const
{
return _messageCounter;
}
/** /**
* Start a transaction * Start a transaction
*/ */
@ -696,6 +715,14 @@ public:
*/ */
void complete(); void complete();
/**
* Start message counter
*/
void startMessageCounter()
{
_messageCounter = 1;
}
/** /**
* The channel class is its friend, thus can it instantiate this object * The channel class is its friend, thus can it instantiate this object
*/ */

View File

@ -15,6 +15,7 @@
#include "channelflowframe.h" #include "channelflowframe.h"
#include "channelcloseokframe.h" #include "channelcloseokframe.h"
#include "channelcloseframe.h" #include "channelcloseframe.h"
#include "confirmselectframe.h"
#include "transactionselectframe.h" #include "transactionselectframe.h"
#include "transactioncommitframe.h" #include "transactioncommitframe.h"
#include "transactionrollbackframe.h" #include "transactionrollbackframe.h"
@ -182,6 +183,18 @@ Deferred &ChannelImpl::resume()
return push(ChannelFlowFrame(_id, true)); return push(ChannelFlowFrame(_id, true));
} }
/**
* Put channel in a confirm mode
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &ChannelImpl::setConfirmMode()
{
// send a transaction frame
return push(ConfirmSelectFrame(_id));
}
/** /**
* Start a transaction * Start a transaction
* *
@ -492,6 +505,10 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
bytesleft -= chunksize; bytesleft -= chunksize;
} }
// increment message counter if we're in confirm mode
if (_messageCounter)
_messageCounter++;
// done // done
return true; return true;
} }

View File

@ -74,6 +74,9 @@ public:
// channel does not exist // channel does not exist
if(!channel) return false; if(!channel) return false;
// start message counter
channel->startMessageCounter();
// report that the channel is open // report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized(); if (channel->reportSuccess()) channel->onSynchronized();