From 94bf5fe74bd9c8645d743efd54deecc8c92c37f7 Mon Sep 17 00:00:00 2001 From: Michael van der Werve Date: Tue, 6 Oct 2020 15:14:42 +0200 Subject: [PATCH] initial implementation --- include/amqpcpp.h | 1 + include/amqpcpp/channel.h | 5 + include/amqpcpp/channelimpl.h | 13 +++ include/amqpcpp/throttledchannel.h | 164 +++++++++++++++++++++++++++++ src/channelimpl.cpp | 49 +++++++++ src/includes.h | 1 + src/throttledchannel.cpp | 164 +++++++++++++++++++++++++++++ 7 files changed, 397 insertions(+) create mode 100644 include/amqpcpp/throttledchannel.h create mode 100644 src/throttledchannel.cpp diff --git a/include/amqpcpp.h b/include/amqpcpp.h index ebb064e..774869b 100644 --- a/include/amqpcpp.h +++ b/include/amqpcpp.h @@ -75,6 +75,7 @@ #include "amqpcpp/deferredpublisher.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" +#include "amqpcpp/throttledchannel.h" #include "amqpcpp/login.h" #include "amqpcpp/address.h" #include "amqpcpp/connectionhandler.h" diff --git a/include/amqpcpp/channel.h b/include/amqpcpp/channel.h index b1c197b..81842c9 100644 --- a/include/amqpcpp/channel.h +++ b/include/amqpcpp/channel.h @@ -597,6 +597,11 @@ public: { return _implementation->id(); } + + /** + * Some internal classes may touch our implementation + */ + friend class ThrottledChannel; }; /** diff --git a/include/amqpcpp/channelimpl.h b/include/amqpcpp/channelimpl.h index edc08e2..3cf95bd 100644 --- a/include/amqpcpp/channelimpl.h +++ b/include/amqpcpp/channelimpl.h @@ -566,6 +566,13 @@ public: return _id; } + /** + * Send a frame over the channel + * @param frame frame to send + * @return bool was frame succesfully sent? + */ + bool send(CopiedBuffer &&frame); + /** * Send a frame over the channel * @param frame frame to send @@ -582,6 +589,12 @@ public: return _synchronous || !_queue.empty(); } + /** + * The max payload size for frames + * @return uint32_t + */ + uint32_t maxPayload() const; + /** * Signal the channel that a synchronous operation was completed, and that any * queued frames can be sent out. diff --git a/include/amqpcpp/throttledchannel.h b/include/amqpcpp/throttledchannel.h new file mode 100644 index 0000000..2739d6b --- /dev/null +++ b/include/amqpcpp/throttledchannel.h @@ -0,0 +1,164 @@ +/** + * ThrottledChannel.h + * + * A channel wrapper that publishes more messages as soon as there is more capacity. + * + * @author Michael van der Werve + * @copyright 2020 Copernica BV + */ + +/** + * Header guard + */ +#pragma once + +/** + * Includes + */ +#include +#include +#include +#include "copiedbuffer.h" +#include "channelimpl.h" + +/** + * Begin of namespaces + */ +namespace AMQP { + +/** + * Forward declarations + */ +class Channel; + +/** + * Class definition + */ +class ThrottledChannel +{ +private: + /** + * The implementation for the channel + * @var std::shared_ptr + */ + std::shared_ptr _implementation; + + /** + * Current id, always starts at 1. + * @var uint64_t + */ + uint64_t _current = 1; + + /** + * Last sent ID + * @var uint64_t + */ + uint64_t _last = 0; + + /** + * Throttle + * @var size_t + */ + size_t _throttle; + + /** + * Messages that should still be sent out. + * @var queue + */ + std::queue> _queue; + + /** + * Set of open deliverytags. We want a normal set (not unordered_set) because + * removal will be cheaper for whole ranges. + * @var size_t + */ + std::set _open; + + /** + * Called when the deliverytag(s) are acked/nacked + * @param deliveryTag + * @param multiple + */ + void onAck(uint64_t deliveryTag, bool multiple); + + /** + * Send method for a frame + * @param id + * @param frame + */ + bool send(uint64_t id, const Frame &frame); + +public: + /** + * Constructor + * @param channel + * @param throttle + */ + ThrottledChannel(Channel &channel, size_t throttle); + + /** + * Deleted copy constructor, deleted move constructor + * @param other + */ + ThrottledChannel(const ThrottledChannel &other) = delete; + ThrottledChannel(ThrottledChannel &&other) = delete; + + /** + * Deleted copy assignment, deleted move assignment + * @param other + */ + ThrottledChannel &operator=(const ThrottledChannel &other) = delete; + ThrottledChannel &operator=(ThrottledChannel &&other) = delete; + + /** + * Virtual destructor + */ + virtual ~ThrottledChannel() = default; + + /** + * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. + * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. + * + * @param exchange the exchange to publish to + * @param routingkey the routing key + * @param envelope the full envelope to send + * @param message the message to send + * @param size size of the message + * @param flags optional flags + * @return bool + */ + bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0); + bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); } + + /** + * Get the number of messages that are waiting to be published + * @return uint64_t + */ + size_t waiting() const { return _current - _last - 1; } + + /** + * Number of messages already sent but unacknowledged by rabbit + * @return size_t + */ + size_t unacknowledged() const { return _open.size(); } + + /** + * Get the throttle + * @return size_t + */ + size_t throttle() const { return _throttle; } + + /** + * Set a new throttle. Note that this will only gradually take effect when set down, and + * the update is picked up on the next acknowledgement. + * @param size_t + */ + void throttle(size_t throttle) { _throttle = throttle; } +}; + +/** + * End of namespaces + */ +} diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 6f128ea..b2de96d 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -493,6 +493,8 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std:: Monitor monitor(this); // @todo do not copy the entire buffer to individual frames + + // @todo this seems utterly (conceptually) broken // make sure we have a deferred object to return if (!_publisher) _publisher.reset(new DeferredPublisher(this)); @@ -714,6 +716,42 @@ Deferred &ChannelImpl::recover(int flags) return push(BasicRecoverFrame(_id, (flags & requeue) != 0)); } +/** + * Send a buffer over the channel + * @param frame frame to send + * @return bool was frame succesfully sent? + */ +bool ChannelImpl::send(CopiedBuffer &&frame) +{ + // skip if channel is not connected + if (_state == state_closed || !_connection) return false; + + // if we're busy closing, we failed as well + if (_state == state_closing) return false; + + // are we currently in synchronous mode or are there + // other frames waiting for their turn to be sent? + if (_synchronous || !_queue.empty()) + { + // we need to wait until the synchronous frame has + // been processed, so queue the frame until it was + _queue.emplace(false, std::move(frame)); + + // it was of course not actually sent but we pretend + // that it was, because no error occured + return true; + } + + // send to tcp connection + if (!_connection->send(std::move(frame))) return false; + + // frame was sent, a copied buffer cannot be synchronous + _synchronous = false; + + // done + return true; +} + /** * Send a frame over the channel * @param frame frame to send @@ -753,6 +791,17 @@ bool ChannelImpl::send(const Frame &frame) return true; } +/** + * The max payload size for body frames + * @return uint32_t + */ +uint32_t ChannelImpl::maxPayload() const +{ + // forward to the connection + // @todo what if _connection == nullptr? + return _connection->maxPayload(); +} + /** * Signal the channel that a synchronous operation was completed. After * this operation, waiting frames can be sent out. diff --git a/src/includes.h b/src/includes.h index 9451588..408db92 100644 --- a/src/includes.h +++ b/src/includes.h @@ -76,6 +76,7 @@ #include "amqpcpp/deferredget.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" +#include "amqpcpp/throttledchannel.h" #include "amqpcpp/login.h" #include "amqpcpp/address.h" #include "amqpcpp/connectionhandler.h" diff --git a/src/throttledchannel.cpp b/src/throttledchannel.cpp new file mode 100644 index 0000000..1dd8db9 --- /dev/null +++ b/src/throttledchannel.cpp @@ -0,0 +1,164 @@ +/** + * ThrottledChannel.cpp + * + * Implementation for ThrottledChannel class. + * + * @author Michael van der Werve + * @copyright 2020 Copernica BV + */ + +/** + * Includes + */ +#include "includes.h" +#include "basicpublishframe.h" +#include "basicheaderframe.h" +#include "bodyframe.h" +#include + +/** + * Begin of namespaces + */ +namespace AMQP { + +/** + * Constructor + * @param channel + * @param throttle + */ +ThrottledChannel::ThrottledChannel(Channel &channel, size_t throttle) : _implementation(channel._implementation), _throttle(throttle) +{ + // activate confirm-select mode + auto &deferred = channel.confirmSelect() + .onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); }) + .onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onAck(deliveryTag, multiple); }); + + // we might have failed, in which case we throw + if (!deferred) throw std::runtime_error("could not enable publisher confirms"); +} + +/** + * Called when the deliverytag(s) are acked/nacked + * @param deliveryTag + * @param multiple + */ +void ThrottledChannel::onAck(uint64_t deliveryTag, bool multiple) +{ + // number of messages exposed + if (multiple) _open.erase(_open.begin(), _open.upper_bound(deliveryTag)); + + // otherwise, we remove the single element + else _open.erase(deliveryTag); + + // keep sending more messages while there is a queue + while (!_queue.empty()) + { + // get the front element from the queue + // @todo move it to the channel + auto &front = _queue.front(); + + // if the front has a different tag, we might not be allowed to continue + if (front.first != _last) + { + // if there is no more room, we're done, stop + if (_open.size() >= _throttle) return; + + // we now go to publish a new element + _last = front.first; + + // insert it into the set as well + _open.insert(_last); + } + + // send the buffer over the implementation + _implementation->send(std::move(front.second)); + + // and remove the message + _queue.pop(); + } +} + +/** + * Send method for a frame + * @param id + * @param frame + */ +bool ThrottledChannel::send(uint64_t id, const Frame &frame) +{ + // if there is already a queue, we always append it + if (!_queue.empty() || (_open.size() >= _throttle && _last != id)) + { + // add the element to the queue + _queue.emplace(id, frame); + + // we have successfully added the message + return true; + } + + // there is no queue and we have space, we send it directly + _last = id; + + // we have now send this id + _open.insert(id); + + // and we're going to send it over the channel directly + return _implementation->send(frame); +} + +/** + * Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. + * Delays actual publishing depending on the publisher confirms sent by RabbitMQ. + * + * @param exchange the exchange to publish to + * @param routingkey the routing key + * @param envelope the full envelope to send + * @param message the message to send + * @param size size of the message + * @param flags optional flags + */ +bool ThrottledChannel::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags) +{ + // @todo do not copy the entire buffer to individual frames + + // send the publish frame + if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false; + + // send header + if (!send(_current, BasicHeaderFrame(_implementation->id(), envelope))) return false; + + // connection and channel still usable? + if (!_implementation->usable()) return false; + + // the max payload size is the max frame size minus the bytes for headers and trailer + uint32_t maxpayload = _implementation->maxPayload(); + uint64_t bytessent = 0; + + // the buffer + const char *data = envelope.body(); + uint64_t bytesleft = envelope.bodySize(); + + // split up the body in multiple frames depending on the max frame size + while (bytesleft > 0) + { + // size of this chunk + uint64_t chunksize = std::min(static_cast(maxpayload), bytesleft); + + // send out a body frame + if (!send(_current, BodyFrame(_implementation->id(), data + bytessent, (uint32_t)chunksize))) return false; + + // update counters + bytessent += chunksize; + bytesleft -= chunksize; + } + + // we're done, we move to the next deliverytag + ++_current; + + // we succeeded + return true; +} + +/** + * End of namespaces + */ +}