Added onSize() method to DeferredGet
This commit is contained in:
parent
c7b3f71f14
commit
ebcdd71848
|
|
@ -24,6 +24,7 @@ using EmptyCallback = std::function<void()>;
|
||||||
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
|
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
|
||||||
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
|
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
|
||||||
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
|
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
|
||||||
|
using SizeCallback = std::function<void(uint32_t messagecount)>;
|
||||||
using ConsumeCallback = std::function<void(const std::string &consumer)>;
|
using ConsumeCallback = std::function<void(const std::string &consumer)>;
|
||||||
using CancelCallback = std::function<void(const std::string &consumer)>;
|
using CancelCallback = std::function<void(const std::string &consumer)>;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,12 @@ private:
|
||||||
*/
|
*/
|
||||||
EmptyCallback _emptyCallback;
|
EmptyCallback _emptyCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback with the number of messages still in the queue
|
||||||
|
* @var SizeCallback
|
||||||
|
*/
|
||||||
|
SizeCallback _sizeCallback;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report success when a message is indeed expected
|
* Report success when a message is indeed expected
|
||||||
* @param count number of messages in the queue
|
* @param count number of messages in the queue
|
||||||
|
|
@ -121,6 +127,19 @@ public:
|
||||||
// allow chaining
|
// allow chaining
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a function to be called when size information is known
|
||||||
|
* @param callback the callback to execute
|
||||||
|
*/
|
||||||
|
DeferredGet &onSize(const SizeCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_sizeCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,12 @@ Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const
|
||||||
auto finalizeCallback = _finalizeCallback;
|
auto finalizeCallback = _finalizeCallback;
|
||||||
auto *channel = _channel;
|
auto *channel = _channel;
|
||||||
|
|
||||||
|
// install a monitor because the channel could be destructed
|
||||||
|
Monitor monitor(channel);
|
||||||
|
|
||||||
|
// report the size (technically, the channel object could be destructed now, but we ignore that case)
|
||||||
|
if (_sizeCallback) _sizeCallback(messageCount);
|
||||||
|
|
||||||
// we now know the name, so we can install the message callback on the channel
|
// we now know the name, so we can install the message callback on the channel
|
||||||
_channel->install("", [channel, messageCallback, finalizeCallback](const Message &message, uint64_t deliveryTag, bool redelivered) {
|
_channel->install("", [channel, messageCallback, finalizeCallback](const Message &message, uint64_t deliveryTag, bool redelivered) {
|
||||||
|
|
||||||
|
|
@ -54,6 +60,9 @@ Deferred *DeferredGet::reportSuccess(uint32_t messageCount) const
|
||||||
*/
|
*/
|
||||||
Deferred *DeferredGet::reportSuccess() const
|
Deferred *DeferredGet::reportSuccess() const
|
||||||
{
|
{
|
||||||
|
// report the size
|
||||||
|
if (_sizeCallback) _sizeCallback(0);
|
||||||
|
|
||||||
// check if a callback was set
|
// check if a callback was set
|
||||||
if (_emptyCallback) _emptyCallback();
|
if (_emptyCallback) _emptyCallback();
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue