diff --git a/include/amqpcpp/deferred.h b/include/amqpcpp/deferred.h index e9c88c5..05eb530 100644 --- a/include/amqpcpp/deferred.h +++ b/include/amqpcpp/deferred.h @@ -193,6 +193,7 @@ protected: * private members and construct us */ friend class ChannelImpl; + friend class Throttle; public: /** diff --git a/include/amqpcpp/throttle.h b/include/amqpcpp/throttle.h index ceda5b1..c0f972e 100644 --- a/include/amqpcpp/throttle.h +++ b/include/amqpcpp/throttle.h @@ -74,6 +74,12 @@ protected: */ std::set _open; + /** + * Deferred to set up on the close + * @var std::shared_ptr + */ + std::shared_ptr _close; + /** * Send method for a frame * @param id @@ -158,6 +164,19 @@ public: * @param size_t */ void throttle(size_t throttle) { _throttle = throttle; } + + /** + * Flush the throttle. This flushes it _without_ taking the throttle into account, e.g. the messages + * are sent in a burst over the channel. + * @param max optional maximum, 0 is flush all + */ + size_t flush(size_t max = 0); + + /** + * Close the throttle channel (closes the underlying channel when all messages have been sent) + * @return Deferred& + */ + Deferred &close(); }; /** diff --git a/src/confirmed.cpp b/src/confirmed.cpp index c893722..70e3ae9 100644 --- a/src/confirmed.cpp +++ b/src/confirmed.cpp @@ -24,9 +24,6 @@ namespace AMQP { */ void Confirmed::onAck(uint64_t deliveryTag, bool multiple) { - // call base handler, will advance on the throttle if needed - Throttle::onAck(deliveryTag, multiple); - // monitor the object, watching for destruction since these ack/nack handlers // could destruct the object Monitor monitor(this); @@ -69,6 +66,12 @@ void Confirmed::onAck(uint64_t deliveryTag, bool multiple) // erase all acknowledged items _handlers.erase(_handlers.begin(), upper); + + // call base handler, will advance on the throttle if needed. we call this _after_ we're + // done processing the callbacks, since one of the callbacks might close the channel, or publish + // more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing + // for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed + Throttle::onAck(deliveryTag, multiple); } /** @@ -78,9 +81,6 @@ void Confirmed::onAck(uint64_t deliveryTag, bool multiple) */ void Confirmed::onNack(uint64_t deliveryTag, bool multiple) { - // call base handler, will advance on the throttle if needed - Throttle::onNack(deliveryTag, multiple); - // monitor the object, watching for destruction since these ack/nack handlers // could destruct the object Monitor monitor(this); @@ -123,6 +123,12 @@ void Confirmed::onNack(uint64_t deliveryTag, bool multiple) // erase all acknowledged items _handlers.erase(_handlers.begin(), upper); + + // call base handler, will advance on the throttle if needed. we call this _after_ we're + // done processing the callbacks, since one of the callbacks might close the channel, or publish + // more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing + // for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed + Throttle::onNack(deliveryTag, multiple); } /** diff --git a/src/throttle.cpp b/src/throttle.cpp index c6083fb..d8d58a5 100644 --- a/src/throttle.cpp +++ b/src/throttle.cpp @@ -50,32 +50,16 @@ void Throttle::onAck(uint64_t deliveryTag, bool multiple) // otherwise, we remove the single element else _open.erase(deliveryTag); - // keep sending more messages while there is a queue - while (!_queue.empty()) - { - // get the front element from the queue - // @todo move it to the channel - auto &front = _queue.front(); + // if there is more room now, we can flush some items + if (_open.size() < _throttle) flush(_throttle - _open.size()); - // if the front has a different tag, we might not be allowed to continue - if (front.first != _last) - { - // if there is no more room, we're done, stop - if (_open.size() >= _throttle) return; + // leap out if there are still messages or we shouldn't close yet + if (!_open.empty() || !_close) return; - // we now go to publish a new element - _last = front.first; - - // insert it into the set as well - _open.insert(_last); - } - - // send the buffer over the implementation - _implementation->send(std::move(front.second)); - - // and remove the message - _queue.pop(); - } + // close the channel, and forward the callbacks to the installed handler + _implementation->close() + .onSuccess([this]() { _close->reportSuccess(); }) + .onError([this](const char *message) { _close->reportError(message); }); } /** @@ -119,6 +103,9 @@ bool Throttle::send(uint64_t id, const Frame &frame) bool Throttle::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) { // @todo do not copy the entire buffer to individual frames + + // fail if we're closing the channel, no more publishes allowed + if (_close) return false; // send the publish frame if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false; @@ -158,6 +145,73 @@ bool Throttle::publish(const std::string &exchange, const std::string &routingKe return true; } +/** + * Flush the throttle + * @param max + */ +size_t Throttle::flush(size_t max) +{ + // how many have we published + size_t published = 0; + + // keep sending more messages while there is a queue + while (!_queue.empty()) + { + // get the front element from the queue + auto &front = _queue.front(); + + // if the front has a different tag, we might not be allowed to continue + if (front.first != _last) + { + // this is an extra publish, check if this puts us over the edge, in which case we + // did one less (unless max = 0, which means do a full flush) + if (max > 0 && published >= max) return published; + + // we are going to publish an extra message + ++published; + + // we now go to publish a new element + _last = front.first; + + // insert it into the set as well + _open.insert(_last); + } + + // send the buffer over the implementation + _implementation->send(std::move(front.second)); + + // and remove the message + _queue.pop(); + } + + // return number of published messages. + return published; +} + +/** + * Close the throttle channel (closes the underlying channel) + * @return Deferred& + */ +Deferred &Throttle::close() +{ + // if this was already set to be closed, return that + if (_close) return *_close; + + // create the deferred + _close = std::make_shared(_implementation->usable()); + + // if there are open messages or there is a queue, they will still get acked and we will then forward it + if (_open.size() > 0 || !_queue.empty()) return *_close; + + // there are no open messages, we can close the channel directly. + _implementation->close() + .onSuccess([this]() { _close->reportSuccess(); }) + .onError([this](const char *message) { _close->reportError(message); }); + + // return the created deferred + return *_close; +} + /** * End of namespaces */