Merge pull request #365 from CopernicaMarketingSoftware/throttled-channel
Initial implementation of a throttled channel wrapper using publisher confirms.
This commit is contained in:
commit
e4e9358c10
|
|
@ -75,6 +75,7 @@
|
|||
#include "amqpcpp/deferredpublisher.h"
|
||||
#include "amqpcpp/channelimpl.h"
|
||||
#include "amqpcpp/channel.h"
|
||||
#include "amqpcpp/throttledchannel.h"
|
||||
#include "amqpcpp/login.h"
|
||||
#include "amqpcpp/address.h"
|
||||
#include "amqpcpp/connectionhandler.h"
|
||||
|
|
|
|||
|
|
@ -597,6 +597,11 @@ public:
|
|||
{
|
||||
return _implementation->id();
|
||||
}
|
||||
|
||||
/**
|
||||
* Some internal classes may touch our implementation
|
||||
*/
|
||||
friend class ThrottledChannel;
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -566,6 +566,13 @@ public:
|
|||
return _id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a frame over the channel
|
||||
* @param frame frame to send
|
||||
* @return bool was frame succesfully sent?
|
||||
*/
|
||||
bool send(CopiedBuffer &&frame);
|
||||
|
||||
/**
|
||||
* Send a frame over the channel
|
||||
* @param frame frame to send
|
||||
|
|
@ -582,6 +589,12 @@ public:
|
|||
return _synchronous || !_queue.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* The max payload size for frames
|
||||
* @return uint32_t
|
||||
*/
|
||||
uint32_t maxPayload() const;
|
||||
|
||||
/**
|
||||
* Signal the channel that a synchronous operation was completed, and that any
|
||||
* queued frames can be sent out.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* ThrottledChannel.h
|
||||
*
|
||||
* A channel wrapper that publishes more messages as soon as there is more capacity.
|
||||
*
|
||||
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
|
||||
* @copyright 2020 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Header guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Includes
|
||||
*/
|
||||
#include <cstdint>
|
||||
#include <set>
|
||||
#include <queue>
|
||||
#include "copiedbuffer.h"
|
||||
#include "channelimpl.h"
|
||||
|
||||
/**
|
||||
* Begin of namespaces
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Forward declarations
|
||||
*/
|
||||
class Channel;
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class ThrottledChannel
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* The implementation for the channel
|
||||
* @var std::shared_ptr<ChannelImpl>
|
||||
*/
|
||||
std::shared_ptr<ChannelImpl> _implementation;
|
||||
|
||||
/**
|
||||
* Current id, always starts at 1.
|
||||
* @var uint64_t
|
||||
*/
|
||||
uint64_t _current = 1;
|
||||
|
||||
/**
|
||||
* Last sent ID
|
||||
* @var uint64_t
|
||||
*/
|
||||
uint64_t _last = 0;
|
||||
|
||||
/**
|
||||
* Throttle
|
||||
* @var size_t
|
||||
*/
|
||||
size_t _throttle;
|
||||
|
||||
/**
|
||||
* Messages that should still be sent out.
|
||||
* @var queue
|
||||
*/
|
||||
std::queue<std::pair<uint64_t, CopiedBuffer>> _queue;
|
||||
|
||||
/**
|
||||
* Set of open deliverytags. We want a normal set (not unordered_set) because
|
||||
* removal will be cheaper for whole ranges.
|
||||
* @var size_t
|
||||
*/
|
||||
std::set<size_t> _open;
|
||||
|
||||
/**
|
||||
* Called when the deliverytag(s) are acked/nacked
|
||||
* @param deliveryTag
|
||||
* @param multiple
|
||||
*/
|
||||
void onAck(uint64_t deliveryTag, bool multiple);
|
||||
|
||||
/**
|
||||
* Send method for a frame
|
||||
* @param id
|
||||
* @param frame
|
||||
*/
|
||||
bool send(uint64_t id, const Frame &frame);
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
* @param channel
|
||||
* @param throttle
|
||||
*/
|
||||
ThrottledChannel(Channel &channel, size_t throttle);
|
||||
|
||||
/**
|
||||
* Deleted copy constructor, deleted move constructor
|
||||
* @param other
|
||||
*/
|
||||
ThrottledChannel(const ThrottledChannel &other) = delete;
|
||||
ThrottledChannel(ThrottledChannel &&other) = delete;
|
||||
|
||||
/**
|
||||
* Deleted copy assignment, deleted move assignment
|
||||
* @param other
|
||||
*/
|
||||
ThrottledChannel &operator=(const ThrottledChannel &other) = delete;
|
||||
ThrottledChannel &operator=(ThrottledChannel &&other) = delete;
|
||||
|
||||
/**
|
||||
* Virtual destructor
|
||||
*/
|
||||
virtual ~ThrottledChannel() = default;
|
||||
|
||||
/**
|
||||
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
||||
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
||||
*
|
||||
* @param exchange the exchange to publish to
|
||||
* @param routingkey the routing key
|
||||
* @param envelope the full envelope to send
|
||||
* @param message the message to send
|
||||
* @param size size of the message
|
||||
* @param flags optional flags
|
||||
* @return bool
|
||||
*/
|
||||
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0);
|
||||
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
|
||||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
|
||||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
|
||||
|
||||
/**
|
||||
* Get the number of messages that are waiting to be published
|
||||
* @return uint64_t
|
||||
*/
|
||||
size_t waiting() const { return _current - _last - 1; }
|
||||
|
||||
/**
|
||||
* Number of messages already sent but unacknowledged by rabbit
|
||||
* @return size_t
|
||||
*/
|
||||
size_t unacknowledged() const { return _open.size(); }
|
||||
|
||||
/**
|
||||
* Get the throttle
|
||||
* @return size_t
|
||||
*/
|
||||
size_t throttle() const { return _throttle; }
|
||||
|
||||
/**
|
||||
* Set a new throttle. Note that this will only gradually take effect when set down, and
|
||||
* the update is picked up on the next acknowledgement.
|
||||
* @param size_t
|
||||
*/
|
||||
void throttle(size_t throttle) { _throttle = throttle; }
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespaces
|
||||
*/
|
||||
}
|
||||
|
|
@ -494,6 +494,8 @@ DeferredPublisher &ChannelImpl::publish(const std::string &exchange, const std::
|
|||
|
||||
// @todo do not copy the entire buffer to individual frames
|
||||
|
||||
// @todo this seems utterly (conceptually) broken
|
||||
|
||||
// make sure we have a deferred object to return
|
||||
if (!_publisher) _publisher.reset(new DeferredPublisher(this));
|
||||
|
||||
|
|
@ -714,6 +716,42 @@ Deferred &ChannelImpl::recover(int flags)
|
|||
return push(BasicRecoverFrame(_id, (flags & requeue) != 0));
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a buffer over the channel
|
||||
* @param frame frame to send
|
||||
* @return bool was frame succesfully sent?
|
||||
*/
|
||||
bool ChannelImpl::send(CopiedBuffer &&frame)
|
||||
{
|
||||
// skip if channel is not connected
|
||||
if (_state == state_closed || !_connection) return false;
|
||||
|
||||
// if we're busy closing, we failed as well
|
||||
if (_state == state_closing) return false;
|
||||
|
||||
// are we currently in synchronous mode or are there
|
||||
// other frames waiting for their turn to be sent?
|
||||
if (_synchronous || !_queue.empty())
|
||||
{
|
||||
// we need to wait until the synchronous frame has
|
||||
// been processed, so queue the frame until it was
|
||||
_queue.emplace(false, std::move(frame));
|
||||
|
||||
// it was of course not actually sent but we pretend
|
||||
// that it was, because no error occured
|
||||
return true;
|
||||
}
|
||||
|
||||
// send to tcp connection
|
||||
if (!_connection->send(std::move(frame))) return false;
|
||||
|
||||
// frame was sent, a copied buffer cannot be synchronous
|
||||
_synchronous = false;
|
||||
|
||||
// done
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a frame over the channel
|
||||
* @param frame frame to send
|
||||
|
|
@ -753,6 +791,17 @@ bool ChannelImpl::send(const Frame &frame)
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* The max payload size for body frames
|
||||
* @return uint32_t
|
||||
*/
|
||||
uint32_t ChannelImpl::maxPayload() const
|
||||
{
|
||||
// forward to the connection
|
||||
// @todo what if _connection == nullptr?
|
||||
return _connection->maxPayload();
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal the channel that a synchronous operation was completed. After
|
||||
* this operation, waiting frames can be sent out.
|
||||
|
|
|
|||
|
|
@ -76,6 +76,7 @@
|
|||
#include "amqpcpp/deferredget.h"
|
||||
#include "amqpcpp/channelimpl.h"
|
||||
#include "amqpcpp/channel.h"
|
||||
#include "amqpcpp/throttledchannel.h"
|
||||
#include "amqpcpp/login.h"
|
||||
#include "amqpcpp/address.h"
|
||||
#include "amqpcpp/connectionhandler.h"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* ThrottledChannel.cpp
|
||||
*
|
||||
* Implementation for ThrottledChannel class.
|
||||
*
|
||||
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
|
||||
* @copyright 2020 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Includes
|
||||
*/
|
||||
#include "includes.h"
|
||||
#include "basicpublishframe.h"
|
||||
#include "basicheaderframe.h"
|
||||
#include "bodyframe.h"
|
||||
#include <iostream>
|
||||
|
||||
/**
|
||||
* Begin of namespaces
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param channel
|
||||
* @param throttle
|
||||
*/
|
||||
ThrottledChannel::ThrottledChannel(Channel &channel, size_t throttle) : _implementation(channel._implementation), _throttle(throttle)
|
||||
{
|
||||
// activate confirm-select mode
|
||||
auto &deferred = channel.confirmSelect()
|
||||
.onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); })
|
||||
.onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onAck(deliveryTag, multiple); });
|
||||
|
||||
// we might have failed, in which case we throw
|
||||
if (!deferred) throw std::runtime_error("could not enable publisher confirms");
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the deliverytag(s) are acked/nacked
|
||||
* @param deliveryTag
|
||||
* @param multiple
|
||||
*/
|
||||
void ThrottledChannel::onAck(uint64_t deliveryTag, bool multiple)
|
||||
{
|
||||
// number of messages exposed
|
||||
if (multiple) _open.erase(_open.begin(), _open.upper_bound(deliveryTag));
|
||||
|
||||
// otherwise, we remove the single element
|
||||
else _open.erase(deliveryTag);
|
||||
|
||||
// keep sending more messages while there is a queue
|
||||
while (!_queue.empty())
|
||||
{
|
||||
// get the front element from the queue
|
||||
// @todo move it to the channel
|
||||
auto &front = _queue.front();
|
||||
|
||||
// if the front has a different tag, we might not be allowed to continue
|
||||
if (front.first != _last)
|
||||
{
|
||||
// if there is no more room, we're done, stop
|
||||
if (_open.size() >= _throttle) return;
|
||||
|
||||
// we now go to publish a new element
|
||||
_last = front.first;
|
||||
|
||||
// insert it into the set as well
|
||||
_open.insert(_last);
|
||||
}
|
||||
|
||||
// send the buffer over the implementation
|
||||
_implementation->send(std::move(front.second));
|
||||
|
||||
// and remove the message
|
||||
_queue.pop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send method for a frame
|
||||
* @param id
|
||||
* @param frame
|
||||
*/
|
||||
bool ThrottledChannel::send(uint64_t id, const Frame &frame)
|
||||
{
|
||||
// if there is already a queue, we always append it
|
||||
if (!_queue.empty() || (_open.size() >= _throttle && _last != id))
|
||||
{
|
||||
// add the element to the queue
|
||||
_queue.emplace(id, frame);
|
||||
|
||||
// we have successfully added the message
|
||||
return true;
|
||||
}
|
||||
|
||||
// there is no queue and we have space, we send it directly
|
||||
_last = id;
|
||||
|
||||
// we have now send this id
|
||||
_open.insert(id);
|
||||
|
||||
// and we're going to send it over the channel directly
|
||||
return _implementation->send(frame);
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
||||
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
||||
*
|
||||
* @param exchange the exchange to publish to
|
||||
* @param routingkey the routing key
|
||||
* @param envelope the full envelope to send
|
||||
* @param message the message to send
|
||||
* @param size size of the message
|
||||
* @param flags optional flags
|
||||
*/
|
||||
bool ThrottledChannel::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
|
||||
{
|
||||
// @todo do not copy the entire buffer to individual frames
|
||||
|
||||
// send the publish frame
|
||||
if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false;
|
||||
|
||||
// send header
|
||||
if (!send(_current, BasicHeaderFrame(_implementation->id(), envelope))) return false;
|
||||
|
||||
// connection and channel still usable?
|
||||
if (!_implementation->usable()) return false;
|
||||
|
||||
// the max payload size is the max frame size minus the bytes for headers and trailer
|
||||
uint32_t maxpayload = _implementation->maxPayload();
|
||||
uint64_t bytessent = 0;
|
||||
|
||||
// the buffer
|
||||
const char *data = envelope.body();
|
||||
uint64_t bytesleft = envelope.bodySize();
|
||||
|
||||
// split up the body in multiple frames depending on the max frame size
|
||||
while (bytesleft > 0)
|
||||
{
|
||||
// size of this chunk
|
||||
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
|
||||
|
||||
// send out a body frame
|
||||
if (!send(_current, BodyFrame(_implementation->id(), data + bytessent, (uint32_t)chunksize))) return false;
|
||||
|
||||
// update counters
|
||||
bytessent += chunksize;
|
||||
bytesleft -= chunksize;
|
||||
}
|
||||
|
||||
// we're done, we move to the next deliverytag
|
||||
++_current;
|
||||
|
||||
// we succeeded
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* End of namespaces
|
||||
*/
|
||||
}
|
||||
Loading…
Reference in New Issue