Merge pull request #368 from CopernicaMarketingSoftware/throttle-close

Allow to close the wrapped Throttle / Confirmed class.
This commit is contained in:
Emiel Bruijntjes 2020-10-07 16:18:14 +02:00 committed by GitHub
commit d542ba8e44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 30 deletions

View File

@ -193,6 +193,7 @@ protected:
* private members and construct us
*/
friend class ChannelImpl;
friend class Throttle;
public:
/**

View File

@ -74,6 +74,12 @@ protected:
*/
std::set<size_t> _open;
/**
* Deferred to set up on the close
* @var std::shared_ptr<Deferred>
*/
std::shared_ptr<Deferred> _close;
/**
* Send method for a frame
* @param id
@ -158,6 +164,19 @@ public:
* @param size_t
*/
void throttle(size_t throttle) { _throttle = throttle; }
/**
* Flush the throttle. This flushes it _without_ taking the throttle into account, e.g. the messages
* are sent in a burst over the channel.
* @param max optional maximum, 0 is flush all
*/
size_t flush(size_t max = 0);
/**
* Close the throttle channel (closes the underlying channel when all messages have been sent)
* @return Deferred&
*/
Deferred &close();
};
/**

View File

@ -24,9 +24,6 @@ namespace AMQP {
*/
void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
{
// call base handler, will advance on the throttle if needed
Throttle::onAck(deliveryTag, multiple);
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
@ -69,6 +66,12 @@ void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
// erase all acknowledged items
_handlers.erase(_handlers.begin(), upper);
// call base handler, will advance on the throttle if needed. we call this _after_ we're
// done processing the callbacks, since one of the callbacks might close the channel, or publish
// more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing
// for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed
Throttle::onAck(deliveryTag, multiple);
}
/**
@ -78,9 +81,6 @@ void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
*/
void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
{
// call base handler, will advance on the throttle if needed
Throttle::onNack(deliveryTag, multiple);
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
@ -123,6 +123,12 @@ void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
// erase all acknowledged items
_handlers.erase(_handlers.begin(), upper);
// call base handler, will advance on the throttle if needed. we call this _after_ we're
// done processing the callbacks, since one of the callbacks might close the channel, or publish
// more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing
// for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed
Throttle::onNack(deliveryTag, multiple);
}
/**

View File

@ -50,32 +50,16 @@ void Throttle::onAck(uint64_t deliveryTag, bool multiple)
// otherwise, we remove the single element
else _open.erase(deliveryTag);
// keep sending more messages while there is a queue
while (!_queue.empty())
{
// get the front element from the queue
// @todo move it to the channel
auto &front = _queue.front();
// if there is more room now, we can flush some items
if (_open.size() < _throttle) flush(_throttle - _open.size());
// if the front has a different tag, we might not be allowed to continue
if (front.first != _last)
{
// if there is no more room, we're done, stop
if (_open.size() >= _throttle) return;
// leap out if there are still messages or we shouldn't close yet
if (!_open.empty() || !_close) return;
// we now go to publish a new element
_last = front.first;
// insert it into the set as well
_open.insert(_last);
}
// send the buffer over the implementation
_implementation->send(std::move(front.second));
// and remove the message
_queue.pop();
}
// close the channel, and forward the callbacks to the installed handler
_implementation->close()
.onSuccess([this]() { _close->reportSuccess(); })
.onError([this](const char *message) { _close->reportError(message); });
}
/**
@ -119,6 +103,9 @@ bool Throttle::send(uint64_t id, const Frame &frame)
bool Throttle::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
{
// @todo do not copy the entire buffer to individual frames
// fail if we're closing the channel, no more publishes allowed
if (_close) return false;
// send the publish frame
if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false;
@ -158,6 +145,73 @@ bool Throttle::publish(const std::string &exchange, const std::string &routingKe
return true;
}
/**
* Flush the throttle
* @param max
*/
size_t Throttle::flush(size_t max)
{
// how many have we published
size_t published = 0;
// keep sending more messages while there is a queue
while (!_queue.empty())
{
// get the front element from the queue
auto &front = _queue.front();
// if the front has a different tag, we might not be allowed to continue
if (front.first != _last)
{
// this is an extra publish, check if this puts us over the edge, in which case we
// did one less (unless max = 0, which means do a full flush)
if (max > 0 && published >= max) return published;
// we are going to publish an extra message
++published;
// we now go to publish a new element
_last = front.first;
// insert it into the set as well
_open.insert(_last);
}
// send the buffer over the implementation
_implementation->send(std::move(front.second));
// and remove the message
_queue.pop();
}
// return number of published messages.
return published;
}
/**
* Close the throttle channel (closes the underlying channel)
* @return Deferred&
*/
Deferred &Throttle::close()
{
// if this was already set to be closed, return that
if (_close) return *_close;
// create the deferred
_close = std::make_shared<Deferred>(_implementation->usable());
// if there are open messages or there is a queue, they will still get acked and we will then forward it
if (_open.size() > 0 || !_queue.empty()) return *_close;
// there are no open messages, we can close the channel directly.
_implementation->close()
.onSuccess([this]() { _close->reportSuccess(); })
.onError([this](const char *message) { _close->reportError(message); });
// return the created deferred
return *_close;
}
/**
* End of namespaces
*/