{auto} disconnected explicit requirement of reliable and throttle

This commit is contained in:
Michael van der Werve 2020-10-21 10:36:39 +02:00
parent b4805f97aa
commit 2262d92938
11 changed files with 514 additions and 358 deletions

View File

@ -76,8 +76,9 @@
#include "amqpcpp/deferredrecall.h"
#include "amqpcpp/channelimpl.h"
#include "amqpcpp/channel.h"
#include "amqpcpp/throttle.h"
#include "amqpcpp/confirmed.h"
#include "amqpcpp/throttle.h"
#include "amqpcpp/reliable.h"
#include "amqpcpp/login.h"
#include "amqpcpp/address.h"
#include "amqpcpp/connectionhandler.h"

View File

@ -604,7 +604,7 @@ public:
/**
* Some internal classes may touch our implementation
*/
friend class Throttle;
friend class Confirmed;
};
/**

View File

@ -1,8 +1,7 @@
/**
* Confirmed.h
*
* A channel wrapper based on AMQP::Throttle that allows message callbacks to be installed
* on the publishes, to be called when they are confirmed by the message broker.
* Base class that enables publisher confirms and keeps track of the sent messages.
*
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
* @copyright 2020 Copernica BV
@ -27,37 +26,63 @@ namespace AMQP {
/**
* Class definition
*/
class Confirmed : public Throttle, private Watchable
class Confirmed : public Watchable
{
private:
protected:
/**
* Set of open deliverytags. We want a normal set (not unordered_set) because
* removal will be cheaper for whole ranges.
* @var size_t
* The implementation for the channel
* @var std::shared_ptr<ChannelImpl>
*/
std::map<size_t, std::shared_ptr<DeferredPublish>> _handlers;
std::shared_ptr<ChannelImpl> _implementation;
/**
* Called when the deliverytag(s) are acked/nacked
* Current id, always starts at 1.
* @var uint64_t
*/
uint64_t _current = 1;
/**
* Deferred to set up on the close
* @var std::shared_ptr<Deferred>
*/
std::shared_ptr<Deferred> _close;
/**
* Callback to call when an error occurred
* @var ErrorCallback
*/
ErrorCallback _errorCallback;
protected:
/**
* Send method for a frame
* @param id
* @param frame
*/
virtual bool send(uint64_t id, const Frame &frame);
/**
* Method that is called to report an error.
* @param message
*/
virtual void reportError(const char *message);
/**
* Method that gets called on ack/nack. If these methods are overridden, make sure
* to also call the base class methods.
* @param deliveryTag
* @param multiple
*/
virtual void onAck(uint64_t deliveryTag, bool multiple) override;
virtual void onNack(uint64_t deliveryTag, bool multiple) override;
/**
* Method that is called to report an error
* @param message
*/
virtual void reportError(const char *message) override;
virtual void onAck(uint64_t deliveryTag, bool multiple);
virtual void onNack(uint64_t deliveryTag, bool multiple);
public:
/**
* Constructor
* @param channel
* @param throttle
*/
Confirmed(AMQP::Channel &channel, size_t throttle) : Throttle(channel, throttle) {}
Confirmed(AMQP::Channel &channel);
/**
* Deleted copy constructor, deleted move constructor
@ -78,6 +103,12 @@ public:
*/
virtual ~Confirmed() = default;
/**
* Method to check if there is anything still waiting
* @return bool
*/
virtual bool waiting() const { return false; }
/**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
@ -88,12 +119,24 @@ public:
* @param message the message to send
* @param size size of the message
* @param flags optional flags
* @return bool
* @return uint64_t
*/
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0);
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
uint64_t publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0);
uint64_t publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
uint64_t publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
uint64_t publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
/**
* Close underlying channel
* @return Deferred&
*/
Deferred &close();
/**
* Install an error callback
* @param callback
*/
void onError(const ErrorCallback &callback);
};
/**

View File

@ -193,7 +193,7 @@ protected:
* private members and construct us
*/
friend class ChannelImpl;
friend class Throttle;
friend class Confirmed;
public:
/**

View File

@ -82,7 +82,8 @@ private:
* The wrapped confirmed channel implementation may call our
* private members and construct us
*/
friend class Confirmed;
template <class T>
friend class Reliable;
public:

272
include/amqpcpp/reliable.h Normal file
View File

@ -0,0 +1,272 @@
/**
* Reliable.h
*
* A channel wrapper based on AMQP::Throttle that allows message callbacks to be installed
* on the publishes, to be called when they are confirmed by the message broker.
*
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
* @copyright 2020 Copernica BV
*/
/**
* Header guard
*/
#pragma once
/**
* Includes
*/
#include "deferredpublish.h"
#include "confirmed.h"
#include <memory>
/**
* Begin of namespaces
*/
namespace AMQP {
/**
* Class definition
*/
template <typename BASE=Confirmed>
class Reliable : public BASE
{
private:
// make sure it is a proper channel
static_assert(std::is_base_of<AMQP::Confirmed, BASE>::value, "base should be derived from a confirmed channel.");
/**
* Set of open deliverytags. We want a normal set (not unordered_set) because
* removal will be cheaper for whole ranges.
* @var size_t
*/
std::map<size_t, std::shared_ptr<DeferredPublish>> _handlers;
/**
* Called when the deliverytag(s) are acked
* @param deliveryTag
* @param multiple
*/
void onAck(uint64_t deliveryTag, bool multiple) override
{
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
// single element is simple
if (!multiple)
{
// find the element
auto iter = _handlers.find(deliveryTag);
// we did not find it (this should not be possible, unless somebody explicitly called)
// the base-class publish methods for some reason.
if (iter == _handlers.end()) return BASE::onAck(deliveryTag, multiple);
// call the ack handler
iter->second->reportAck();
// if the monitor is no longer valid, we stop (we're done)
if (!monitor) return;
// erase it from the map
_handlers.erase(iter);
}
// do multiple at once
else
{
// find the last element, inclusive
auto upper = _handlers.upper_bound(deliveryTag);
// call the handlers
for (auto iter = _handlers.begin(); iter != upper; iter++)
{
// call the handler
iter->second->reportAck();
// if we were destructed in the meantime, we leap out
if (!monitor) return;
}
// erase all acknowledged items
_handlers.erase(_handlers.begin(), upper);
}
// make sure the object is still valid
if (!monitor) return;
// call base handler as well
BASE::onAck(deliveryTag, multiple);
}
/**
* Called when the deliverytag(s) are nacked
* @param deliveryTag
* @param multiple
*/
void onNack(uint64_t deliveryTag, bool multiple) override
{
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
// single element is simple
if (!multiple)
{
// find the element
auto iter = _handlers.find(deliveryTag);
// we did not find it (this should not be possible, unless somebody explicitly called)
// the base-class publish methods for some reason.
if (iter == _handlers.end()) return BASE::onNack(deliveryTag, multiple);
// call the ack handler
iter->second->reportNack();
// if the monitor is no longer valid, we stop (we're done)
if (!monitor) return;
// erase it from the map
_handlers.erase(iter);
}
// nack multiple elements
else
{
// find the last element, inclusive
auto upper = _handlers.upper_bound(deliveryTag);
// call the handlers
for (auto iter = _handlers.begin(); iter != upper; iter++)
{
// call the handler
iter->second->reportNack();
// if we were destructed in the meantime, we leap out
if (!monitor) return;
}
// erase all acknowledged items
_handlers.erase(_handlers.begin(), upper);
}
// if the object is no longer valid, return
if (!monitor) return;
// call the base handler
BASE::onNack(deliveryTag, multiple);
}
/**
* Method that is called to report an error
* @param message
*/
void reportError(const char *message) override
{
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
// move the handlers out
auto handlers = std::move(_handlers);
// iterate over all the messages
// call the handlers
for (const auto &iter : handlers)
{
// call the handler
iter.second->reportError(message);
// if we were destructed in the meantime, we leap out
if (!monitor) return;
}
// if the monitor is no longer valid, leap out
if (!monitor) return;
// call the base handler
BASE::reportError(message);
}
public:
/**
* Constructor
* @param channel
* @param throttle
*/
template <typename ...Args>
Reliable(Args &&...args) : BASE(std::forward<Args>(args)...) {}
/**
* Deleted copy constructor, deleted move constructor
* @param other
*/
Reliable(const Reliable &other) = delete;
Reliable(Reliable &&other) = delete;
/**
* Deleted copy assignment, deleted move assignment
* @param other
*/
Reliable &operator=(const Reliable &other) = delete;
Reliable &operator=(Reliable &&other) = delete;
/**
* Virtual destructor
*/
virtual ~Reliable() = default;
/**
* Method that can be accessed to check if there are still buffered messages.
* @return bool
*/
virtual bool waiting() const override { return _handlers.size() > 0 || BASE::waiting(); }
/**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
* @param flags optional flags
* @return bool
*/
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
/**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
* @param flags optional flags
*/
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
{
// publish the entire thing, and remember if it failed at any point
uint64_t tag = BASE::publish(exchange, routingKey, envelope, flags);
// create the publish deferred object, if we got no tag we failed
auto handler = std::make_shared<DeferredPublish>(tag == 0);
// add it to the open handlers
_handlers[tag] = handler;
// return the dereferenced handler
return *handler;
}
};
/**
* End of namespaces
*/
}

View File

@ -20,6 +20,7 @@
#include <queue>
#include "copiedbuffer.h"
#include "channelimpl.h"
#include "confirmed.h"
/**
* Begin of namespaces
@ -34,21 +35,9 @@ class Channel;
/**
* Class definition
*/
class Throttle
class Throttle : public Confirmed
{
protected:
/**
* The implementation for the channel
* @var std::shared_ptr<ChannelImpl>
*/
std::shared_ptr<ChannelImpl> _implementation;
/**
* Current id, always starts at 1.
* @var uint64_t
*/
uint64_t _current = 1;
/**
* Last sent ID
* @var uint64_t
@ -74,39 +63,28 @@ protected:
*/
std::set<size_t> _open;
/**
* Deferred to set up on the close
* @var std::shared_ptr<Deferred>
*/
std::shared_ptr<Deferred> _close;
/**
* Callback to call when an error occurred
* @var ErrorCallback
*/
ErrorCallback _errorCallback;
protected:
/**
* Send method for a frame
* @param id
* @param frame
*/
bool send(uint64_t id, const Frame &frame);
virtual bool send(uint64_t id, const Frame &frame) override;
/**
* Method that is called to report an error
* @param message
*/
virtual void reportError(const char *message);
virtual void reportError(const char *message) override;
protected:
/**
* Called when the deliverytag(s) are acked/nacked
* Method that is called to report an ack/nack
* @param deliveryTag
* @param multiple
*/
virtual void onAck(uint64_t deliveryTag, bool multiple);
virtual void onNack(uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); }
virtual void onAck(uint64_t deliveryTag, bool multiple) override;
virtual void onNack(uint64_t deliveryTag, bool multiple) override;
public:
/**
@ -139,33 +117,10 @@ public:
virtual ~Throttle() = default;
/**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
* @param flags optional flags
* Method that can be accessed to check if there are still buffered messages.
* @return bool
*/
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0);
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
/**
* Get the number of messages that are waiting to be published
* @return uint64_t
*/
size_t waiting() const { return _current - _last - 1; }
/**
* Number of messages already sent but unacknowledged by rabbit
* @return size_t
*/
size_t unacknowledged() const { return _open.size(); }
virtual bool waiting() const override { return _queue.size() > 0; }
/**
* Get the throttle
@ -186,18 +141,6 @@ public:
* @param max optional maximum, 0 is flush all
*/
size_t flush(size_t max = 0);
/**
* Close the throttle channel (closes the underlying channel when all messages have been sent)
* @return Deferred&
*/
Deferred &close();
/**
* Install an error callback
* @param callback
*/
void onError(const ErrorCallback &callback);
};
/**

View File

@ -1,6 +1,6 @@
CPP = g++
RM = rm -f
CPPFLAGS = -Wall -c -I../include -std=c++11 -MD
CPPFLAGS = -Wall -c -I../include -std=c++11 -MD -Wno-class-conversion
LD = g++
LD_FLAGS = -Wall -shared
SHARED_LIB = lib$(LIBRARY_NAME).so.$(VERSION)

View File

@ -11,71 +11,58 @@
* Includes
*/
#include "includes.h"
#include "basicpublishframe.h"
#include "basicheaderframe.h"
#include "bodyframe.h"
/**
* Begin of namespaces
*/
namespace AMQP {
/**
* Constructor
* @param channel
*/
Confirmed::Confirmed(Channel &channel) : _implementation(channel._implementation)
{
// activate confirm-select mode
auto &deferred = channel.confirmSelect()
.onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); })
.onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onNack(deliveryTag, multiple); });
// we might have failed, in which case we throw
if (!deferred) throw std::runtime_error("could not enable publisher confirms");
// we wrap a handling error callback that calls our member function
_implementation->onError([this](const char *message) { reportError(message); });
}
/**
* Send method for a frame
* @param id
* @param frame
*/
bool Confirmed::send(uint64_t id, const Frame &frame)
{
// we're simply going to send it over the channel directly
return _implementation->send(frame);
}
/**
* Called when the deliverytag(s) are acked
* @param deliveryTag
* @param multiple
*/
void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
{
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
// leap out if there are still messages or we shouldn't close yet
if (!_close || waiting()) return;
// single element is simple
if (!multiple)
{
// find the element
auto iter = _handlers.find(deliveryTag);
// we did not find it (this should not be possible, unless somebody explicitly called)
// the base-class publish methods for some reason.
if (iter == _handlers.end()) return Throttle::onAck(deliveryTag, multiple);
// call the ack handler
iter->second->reportAck();
// if the monitor is no longer valid, we stop (we're done)
if (!monitor) return;
// erase it from the map
_handlers.erase(iter);
}
// do multiple at once
else
{
// find the last element, inclusive
auto upper = _handlers.upper_bound(deliveryTag);
// call the handlers
for (auto iter = _handlers.begin(); iter != upper; iter++)
{
// call the handler
iter->second->reportAck();
// if we were destructed in the meantime, we leap out
if (!monitor) return;
}
// erase all acknowledged items
_handlers.erase(_handlers.begin(), upper);
}
// make sure the object is still valid
if (!monitor) return;
// call base handler, will advance on the throttle if needed. we call this _after_ we're
// done processing the callbacks, since one of the callbacks might close the channel, or publish
// more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing
// for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed
Throttle::onAck(deliveryTag, multiple);
// close the channel, and forward the callbacks to the installed handler
_implementation->close()
.onSuccess([this]() { _close->reportSuccess(); })
.onError([this](const char *message) { _close->reportError(message); });
}
/**
@ -85,58 +72,13 @@ void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
*/
void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
{
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
// leap out if there are still messages or we shouldn't close yet
if (!_close || waiting()) return;
// single element is simple
if (!multiple)
{
// find the element
auto iter = _handlers.find(deliveryTag);
// we did not find it (this should not be possible, unless somebody explicitly called)
// the base-class publish methods for some reason.
if (iter == _handlers.end()) return Throttle::onNack(deliveryTag, multiple);
// call the ack handler
iter->second->reportNack();
// if the monitor is no longer valid, we stop (we're done)
if (!monitor) return;
// erase it from the map
_handlers.erase(iter);
}
// nack multiple elements
else
{
// find the last element, inclusive
auto upper = _handlers.upper_bound(deliveryTag);
// call the handlers
for (auto iter = _handlers.begin(); iter != upper; iter++)
{
// call the handler
iter->second->reportNack();
// if we were destructed in the meantime, we leap out
if (!monitor) return;
}
// erase all acknowledged items
_handlers.erase(_handlers.begin(), upper);
}
// make sure the object is still valid
if (!monitor) return;
// call base handler, will advance on the throttle if needed. we call this _after_ we're
// done processing the callbacks, since one of the callbacks might close the channel, or publish
// more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing
// for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed
Throttle::onNack(deliveryTag, multiple);
// close the channel, and forward the callbacks to the installed handler
_implementation->close()
.onSuccess([this]() { _close->reportSuccess(); })
.onError([this](const char *message) { _close->reportError(message); });
}
/**
@ -145,29 +87,11 @@ void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
*/
void Confirmed::reportError(const char *message)
{
// monitor the object, watching for destruction since these ack/nack handlers
// could destruct the object
Monitor monitor(this);
// reset tracking, since channel is fully broken
_current = 1;
// move the handlers out
auto handlers = std::move(_handlers);
// iterate over all the messages
// call the handlers
for (const auto &iter : handlers)
{
// call the handler
iter.second->reportError(message);
// if we were destructed in the meantime, we leap out
if (!monitor) return;
}
// if the monitor is no longer valid, leap out
if (!monitor) return;
// call base class to let it handle the errors
Throttle::reportError(message);
// if a callback is set, call the handler with the message
if (_errorCallback) _errorCallback(message);
}
/**
@ -180,23 +104,91 @@ void Confirmed::reportError(const char *message)
* @param message the message to send
* @param size size of the message
* @param flags optional flags
* @return uint64_t
*/
DeferredPublish &Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
uint64_t Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
{
// copy the current identifier, this will be the ID that will come back
auto current = _current;
// @todo do not copy the entire buffer to individual frames
// publish the entire thing, and remember if it failed at any point
bool failed = !Throttle::publish(exchange, routingKey, envelope, flags);
// fail if we're closing the channel, no more publishes allowed
if (_close) return false;
// create the open
auto handler = std::make_shared<DeferredPublish>(failed);
// send the publish frame
if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false;
// add it to the open handlers
_handlers[current] = handler;
// send header
if (!send(_current, BasicHeaderFrame(_implementation->id(), envelope))) return false;
// return the dereferenced handler
return *handler;
// connection and channel still usable?
if (!_implementation->usable()) return false;
// the max payload size is the max frame size minus the bytes for headers and trailer
uint32_t maxpayload = _implementation->maxPayload();
uint64_t bytessent = 0;
// the buffer
const char *data = envelope.body();
uint64_t bytesleft = envelope.bodySize();
// split up the body in multiple frames depending on the max frame size
while (bytesleft > 0)
{
// size of this chunk
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
// send out a body frame
if (!send(_current, BodyFrame(_implementation->id(), data + bytessent, (uint32_t)chunksize))) return false;
// update counters
bytessent += chunksize;
bytesleft -= chunksize;
}
// we succeeded
return _current++;
}
/**
* Close the throttle channel (closes the underlying channel)
* @return Deferred&
*/
Deferred &Confirmed::close()
{
// if this was already set to be closed, return that
if (_close) return *_close;
// create the deferred
_close = std::make_shared<Deferred>(_implementation->usable());
// if there are open messages or there is a queue, they will still get acked and we will then forward it
if (waiting()) return *_close;
// there are no open messages, we can close the channel directly.
_implementation->close()
.onSuccess([this]() { _close->reportSuccess(); })
.onError([this](const char *message) { _close->reportError(message); });
// return the created deferred
return *_close;
}
/**
* Install an error callback
* @param callback
*/
void Confirmed::onError(const ErrorCallback &callback)
{
// we store the callback
_errorCallback = callback;
// check the callback
if (!callback) return;
// if the channel is no longer usable, report that
if (!_implementation->usable()) return callback("Channel is no longer usable");
// specify that we're already closing
if (_close) callback("Wrapped channel is closing down");
}
/**

View File

@ -79,6 +79,7 @@
#include "amqpcpp/channel.h"
#include "amqpcpp/throttle.h"
#include "amqpcpp/confirmed.h"
#include "amqpcpp/reliable.h"
#include "amqpcpp/login.h"
#include "amqpcpp/address.h"
#include "amqpcpp/connectionhandler.h"

View File

@ -26,22 +26,10 @@ namespace AMQP {
* @param channel
* @param throttle
*/
Throttle::Throttle(Channel &channel, size_t throttle) : _implementation(channel._implementation), _throttle(throttle)
{
// activate confirm-select mode
auto &deferred = channel.confirmSelect()
.onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); })
.onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onNack(deliveryTag, multiple); });
// we might have failed, in which case we throw
if (!deferred) throw std::runtime_error("could not enable publisher confirms");
// we wrap a handling error callback that calls our member function
_implementation->onError([this](const char *message) { reportError(message); });
}
Throttle::Throttle(Channel &channel, size_t throttle) : Confirmed(channel), _throttle(throttle) {}
/**
* Called when the deliverytag(s) are acked/nacked
* Called when the deliverytag(s) are acked
* @param deliveryTag
* @param multiple
*/
@ -53,16 +41,31 @@ void Throttle::onAck(uint64_t deliveryTag, bool multiple)
// otherwise, we remove the single element
else _open.erase(deliveryTag);
// if there is more room now, we can flush some items
// if there is room, flush part of the queue
if (_open.size() < _throttle) flush(_throttle - _open.size());
// leap out if there are still messages or we shouldn't close yet
if (!_open.empty() || !_close) return;
// call base handler
Confirmed::onAck(deliveryTag, multiple);
}
// close the channel, and forward the callbacks to the installed handler
_implementation->close()
.onSuccess([this]() { _close->reportSuccess(); })
.onError([this](const char *message) { _close->reportError(message); });
/**
* Called when the deliverytag(s) are nacked
* @param deliveryTag
* @param multiple
*/
void Throttle::onNack(uint64_t deliveryTag, bool multiple)
{
// number of messages exposed
if (multiple) _open.erase(_open.begin(), _open.upper_bound(deliveryTag));
// otherwise, we remove the single element
else _open.erase(deliveryTag);
// if there is room, flush part of the queue
if (_open.size() < _throttle) flush(_throttle - _open.size());
// call base handler
Confirmed::onNack(deliveryTag, multiple);
}
/**
@ -88,8 +91,8 @@ bool Throttle::send(uint64_t id, const Frame &frame)
// we have now send this id
_open.insert(id);
// and we're going to send it over the channel directly
return _implementation->send(frame);
// we can finally actually send it
return Confirmed::send(id, frame);
}
/**
@ -104,68 +107,11 @@ void Throttle::reportError(const char *message)
// we can also forget all open messages, won't hear from them any more
_open.clear();
// reset tracking, since channel is fully broken
// we have no last seen message any more
_last = 0;
_current = 1;
// if a callback is set, call the handler with the message
if (_errorCallback) _errorCallback(message);
}
/**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
* @param flags optional flags
*/
bool Throttle::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
{
// @todo do not copy the entire buffer to individual frames
// fail if we're closing the channel, no more publishes allowed
if (_close) return false;
// send the publish frame
if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false;
// send header
if (!send(_current, BasicHeaderFrame(_implementation->id(), envelope))) return false;
// connection and channel still usable?
if (!_implementation->usable()) return false;
// the max payload size is the max frame size minus the bytes for headers and trailer
uint32_t maxpayload = _implementation->maxPayload();
uint64_t bytessent = 0;
// the buffer
const char *data = envelope.body();
uint64_t bytesleft = envelope.bodySize();
// split up the body in multiple frames depending on the max frame size
while (bytesleft > 0)
{
// size of this chunk
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
// send out a body frame
if (!send(_current, BodyFrame(_implementation->id(), data + bytessent, (uint32_t)chunksize))) return false;
// update counters
bytessent += chunksize;
bytesleft -= chunksize;
}
// we're done, we move to the next deliverytag
++_current;
// we succeeded
return true;
// call base method
Confirmed::reportError(message);
}
/**
@ -211,49 +157,6 @@ size_t Throttle::flush(size_t max)
return published;
}
/**
* Close the throttle channel (closes the underlying channel)
* @return Deferred&
*/
Deferred &Throttle::close()
{
// if this was already set to be closed, return that
if (_close) return *_close;
// create the deferred
_close = std::make_shared<Deferred>(_implementation->usable());
// if there are open messages or there is a queue, they will still get acked and we will then forward it
if (_open.size() > 0 || !_queue.empty()) return *_close;
// there are no open messages, we can close the channel directly.
_implementation->close()
.onSuccess([this]() { _close->reportSuccess(); })
.onError([this](const char *message) { _close->reportError(message); });
// return the created deferred
return *_close;
}
/**
* Install an error callback
* @param callback
*/
void Throttle::onError(const ErrorCallback &callback)
{
// we store the callback
_errorCallback = callback;
// check the callback
if (!callback) return;
// if the channel is no longer usable, report that
if (!_implementation->usable()) return callback("Channel is no longer usable");
// specify that we're already closing
if (_close) callback("Wrapped channel is closing down");
}
/**
* End of namespaces
*/