Merge pull request #373 from CopernicaMarketingSoftware/reliable-disconnected-throttle
disconnected explicit reliance of reliable on throttle
This commit is contained in:
commit
c0ac8f78b2
|
|
@ -76,8 +76,9 @@
|
||||||
#include "amqpcpp/deferredrecall.h"
|
#include "amqpcpp/deferredrecall.h"
|
||||||
#include "amqpcpp/channelimpl.h"
|
#include "amqpcpp/channelimpl.h"
|
||||||
#include "amqpcpp/channel.h"
|
#include "amqpcpp/channel.h"
|
||||||
#include "amqpcpp/throttle.h"
|
|
||||||
#include "amqpcpp/confirmed.h"
|
#include "amqpcpp/confirmed.h"
|
||||||
|
#include "amqpcpp/throttle.h"
|
||||||
|
#include "amqpcpp/reliable.h"
|
||||||
#include "amqpcpp/login.h"
|
#include "amqpcpp/login.h"
|
||||||
#include "amqpcpp/address.h"
|
#include "amqpcpp/address.h"
|
||||||
#include "amqpcpp/connectionhandler.h"
|
#include "amqpcpp/connectionhandler.h"
|
||||||
|
|
|
||||||
|
|
@ -604,7 +604,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Some internal classes may touch our implementation
|
* Some internal classes may touch our implementation
|
||||||
*/
|
*/
|
||||||
friend class Throttle;
|
friend class Confirmed;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* Confirmed.h
|
* Confirmed.h
|
||||||
*
|
*
|
||||||
* A channel wrapper based on AMQP::Throttle that allows message callbacks to be installed
|
* Base class that enables publisher confirms and keeps track of the sent messages.
|
||||||
* on the publishes, to be called when they are confirmed by the message broker.
|
|
||||||
*
|
*
|
||||||
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
|
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
|
||||||
* @copyright 2020 Copernica BV
|
* @copyright 2020 Copernica BV
|
||||||
|
|
@ -27,37 +26,63 @@ namespace AMQP {
|
||||||
/**
|
/**
|
||||||
* Class definition
|
* Class definition
|
||||||
*/
|
*/
|
||||||
class Confirmed : public Throttle, private Watchable
|
class Confirmed : public Watchable
|
||||||
{
|
{
|
||||||
private:
|
protected:
|
||||||
/**
|
/**
|
||||||
* Set of open deliverytags. We want a normal set (not unordered_set) because
|
* The implementation for the channel
|
||||||
* removal will be cheaper for whole ranges.
|
* @var std::shared_ptr<ChannelImpl>
|
||||||
* @var size_t
|
|
||||||
*/
|
*/
|
||||||
std::map<size_t, std::shared_ptr<DeferredPublish>> _handlers;
|
std::shared_ptr<ChannelImpl> _implementation;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when the deliverytag(s) are acked/nacked
|
* Current id, always starts at 1.
|
||||||
|
* @var uint64_t
|
||||||
|
*/
|
||||||
|
uint64_t _current = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deferred to set up on the close
|
||||||
|
* @var std::shared_ptr<Deferred>
|
||||||
|
*/
|
||||||
|
std::shared_ptr<Deferred> _close;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback to call when an error occurred
|
||||||
|
* @var ErrorCallback
|
||||||
|
*/
|
||||||
|
ErrorCallback _errorCallback;
|
||||||
|
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/**
|
||||||
|
* Send method for a frame
|
||||||
|
* @param id
|
||||||
|
* @param frame
|
||||||
|
*/
|
||||||
|
virtual bool send(uint64_t id, const Frame &frame);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called to report an error.
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
virtual void reportError(const char *message);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that gets called on ack/nack. If these methods are overridden, make sure
|
||||||
|
* to also call the base class methods.
|
||||||
* @param deliveryTag
|
* @param deliveryTag
|
||||||
* @param multiple
|
* @param multiple
|
||||||
*/
|
*/
|
||||||
virtual void onAck(uint64_t deliveryTag, bool multiple) override;
|
virtual void onAck(uint64_t deliveryTag, bool multiple);
|
||||||
virtual void onNack(uint64_t deliveryTag, bool multiple) override;
|
virtual void onNack(uint64_t deliveryTag, bool multiple);
|
||||||
|
|
||||||
/**
|
|
||||||
* Method that is called to report an error
|
|
||||||
* @param message
|
|
||||||
*/
|
|
||||||
virtual void reportError(const char *message) override;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param channel
|
* @param channel
|
||||||
* @param throttle
|
|
||||||
*/
|
*/
|
||||||
Confirmed(AMQP::Channel &channel, size_t throttle) : Throttle(channel, throttle) {}
|
Confirmed(AMQP::Channel &channel);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deleted copy constructor, deleted move constructor
|
* Deleted copy constructor, deleted move constructor
|
||||||
|
|
@ -78,6 +103,12 @@ public:
|
||||||
*/
|
*/
|
||||||
virtual ~Confirmed() = default;
|
virtual ~Confirmed() = default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to check if there is anything still waiting
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
virtual bool waiting() const { return false; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
||||||
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
||||||
|
|
@ -88,12 +119,24 @@ public:
|
||||||
* @param message the message to send
|
* @param message the message to send
|
||||||
* @param size size of the message
|
* @param size size of the message
|
||||||
* @param flags optional flags
|
* @param flags optional flags
|
||||||
* @return bool
|
* @return uint64_t
|
||||||
*/
|
*/
|
||||||
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0);
|
uint64_t publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0);
|
||||||
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
|
uint64_t publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
|
||||||
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
|
uint64_t publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
|
||||||
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
|
uint64_t publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close underlying channel
|
||||||
|
* @return Deferred&
|
||||||
|
*/
|
||||||
|
Deferred &close();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Install an error callback
|
||||||
|
* @param callback
|
||||||
|
*/
|
||||||
|
void onError(const ErrorCallback &callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -193,7 +193,7 @@ protected:
|
||||||
* private members and construct us
|
* private members and construct us
|
||||||
*/
|
*/
|
||||||
friend class ChannelImpl;
|
friend class ChannelImpl;
|
||||||
friend class Throttle;
|
friend class Confirmed;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -82,7 +82,8 @@ private:
|
||||||
* The wrapped confirmed channel implementation may call our
|
* The wrapped confirmed channel implementation may call our
|
||||||
* private members and construct us
|
* private members and construct us
|
||||||
*/
|
*/
|
||||||
friend class Confirmed;
|
template <class T>
|
||||||
|
friend class Reliable;
|
||||||
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,272 @@
|
||||||
|
/**
|
||||||
|
* Reliable.h
|
||||||
|
*
|
||||||
|
* A channel wrapper based on AMQP::Throttle that allows message callbacks to be installed
|
||||||
|
* on the publishes, to be called when they are confirmed by the message broker.
|
||||||
|
*
|
||||||
|
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
|
||||||
|
* @copyright 2020 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Header guard
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Includes
|
||||||
|
*/
|
||||||
|
#include "deferredpublish.h"
|
||||||
|
#include "confirmed.h"
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of namespaces
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class definition
|
||||||
|
*/
|
||||||
|
template <typename BASE=Confirmed>
|
||||||
|
class Reliable : public BASE
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
// make sure it is a proper channel
|
||||||
|
static_assert(std::is_base_of<AMQP::Confirmed, BASE>::value, "base should be derived from a confirmed channel.");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set of open deliverytags. We want a normal set (not unordered_set) because
|
||||||
|
* removal will be cheaper for whole ranges.
|
||||||
|
* @var size_t
|
||||||
|
*/
|
||||||
|
std::map<size_t, std::shared_ptr<DeferredPublish>> _handlers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when the deliverytag(s) are acked
|
||||||
|
* @param deliveryTag
|
||||||
|
* @param multiple
|
||||||
|
*/
|
||||||
|
void onAck(uint64_t deliveryTag, bool multiple) override
|
||||||
|
{
|
||||||
|
// monitor the object, watching for destruction since these ack/nack handlers
|
||||||
|
// could destruct the object
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
// single element is simple
|
||||||
|
if (!multiple)
|
||||||
|
{
|
||||||
|
// find the element
|
||||||
|
auto iter = _handlers.find(deliveryTag);
|
||||||
|
|
||||||
|
// we did not find it (this should not be possible, unless somebody explicitly called)
|
||||||
|
// the base-class publish methods for some reason.
|
||||||
|
if (iter == _handlers.end()) return BASE::onAck(deliveryTag, multiple);
|
||||||
|
|
||||||
|
// call the ack handler
|
||||||
|
iter->second->reportAck();
|
||||||
|
|
||||||
|
// if the monitor is no longer valid, we stop (we're done)
|
||||||
|
if (!monitor) return;
|
||||||
|
|
||||||
|
// erase it from the map
|
||||||
|
_handlers.erase(iter);
|
||||||
|
}
|
||||||
|
|
||||||
|
// do multiple at once
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// find the last element, inclusive
|
||||||
|
auto upper = _handlers.upper_bound(deliveryTag);
|
||||||
|
|
||||||
|
// call the handlers
|
||||||
|
for (auto iter = _handlers.begin(); iter != upper; iter++)
|
||||||
|
{
|
||||||
|
// call the handler
|
||||||
|
iter->second->reportAck();
|
||||||
|
|
||||||
|
// if we were destructed in the meantime, we leap out
|
||||||
|
if (!monitor) return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// erase all acknowledged items
|
||||||
|
_handlers.erase(_handlers.begin(), upper);
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure the object is still valid
|
||||||
|
if (!monitor) return;
|
||||||
|
|
||||||
|
// call base handler as well
|
||||||
|
BASE::onAck(deliveryTag, multiple);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when the deliverytag(s) are nacked
|
||||||
|
* @param deliveryTag
|
||||||
|
* @param multiple
|
||||||
|
*/
|
||||||
|
void onNack(uint64_t deliveryTag, bool multiple) override
|
||||||
|
{
|
||||||
|
// monitor the object, watching for destruction since these ack/nack handlers
|
||||||
|
// could destruct the object
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
// single element is simple
|
||||||
|
if (!multiple)
|
||||||
|
{
|
||||||
|
// find the element
|
||||||
|
auto iter = _handlers.find(deliveryTag);
|
||||||
|
|
||||||
|
// we did not find it (this should not be possible, unless somebody explicitly called)
|
||||||
|
// the base-class publish methods for some reason.
|
||||||
|
if (iter == _handlers.end()) return BASE::onNack(deliveryTag, multiple);
|
||||||
|
|
||||||
|
// call the ack handler
|
||||||
|
iter->second->reportNack();
|
||||||
|
|
||||||
|
// if the monitor is no longer valid, we stop (we're done)
|
||||||
|
if (!monitor) return;
|
||||||
|
|
||||||
|
// erase it from the map
|
||||||
|
_handlers.erase(iter);
|
||||||
|
}
|
||||||
|
|
||||||
|
// nack multiple elements
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// find the last element, inclusive
|
||||||
|
auto upper = _handlers.upper_bound(deliveryTag);
|
||||||
|
|
||||||
|
// call the handlers
|
||||||
|
for (auto iter = _handlers.begin(); iter != upper; iter++)
|
||||||
|
{
|
||||||
|
// call the handler
|
||||||
|
iter->second->reportNack();
|
||||||
|
|
||||||
|
// if we were destructed in the meantime, we leap out
|
||||||
|
if (!monitor) return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// erase all acknowledged items
|
||||||
|
_handlers.erase(_handlers.begin(), upper);
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the object is no longer valid, return
|
||||||
|
if (!monitor) return;
|
||||||
|
|
||||||
|
// call the base handler
|
||||||
|
BASE::onNack(deliveryTag, multiple);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called to report an error
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
void reportError(const char *message) override
|
||||||
|
{
|
||||||
|
// monitor the object, watching for destruction since these ack/nack handlers
|
||||||
|
// could destruct the object
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
// move the handlers out
|
||||||
|
auto handlers = std::move(_handlers);
|
||||||
|
|
||||||
|
// iterate over all the messages
|
||||||
|
// call the handlers
|
||||||
|
for (const auto &iter : handlers)
|
||||||
|
{
|
||||||
|
// call the handler
|
||||||
|
iter.second->reportError(message);
|
||||||
|
|
||||||
|
// if we were destructed in the meantime, we leap out
|
||||||
|
if (!monitor) return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the monitor is no longer valid, leap out
|
||||||
|
if (!monitor) return;
|
||||||
|
|
||||||
|
// call the base handler
|
||||||
|
BASE::reportError(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param channel
|
||||||
|
* @param throttle
|
||||||
|
*/
|
||||||
|
template <typename ...Args>
|
||||||
|
Reliable(Args &&...args) : BASE(std::forward<Args>(args)...) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deleted copy constructor, deleted move constructor
|
||||||
|
* @param other
|
||||||
|
*/
|
||||||
|
Reliable(const Reliable &other) = delete;
|
||||||
|
Reliable(Reliable &&other) = delete;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deleted copy assignment, deleted move assignment
|
||||||
|
* @param other
|
||||||
|
*/
|
||||||
|
Reliable &operator=(const Reliable &other) = delete;
|
||||||
|
Reliable &operator=(Reliable &&other) = delete;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Virtual destructor
|
||||||
|
*/
|
||||||
|
virtual ~Reliable() = default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that can be accessed to check if there are still buffered messages.
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
virtual bool waiting() const override { return _handlers.size() > 0 || BASE::waiting(); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
||||||
|
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
||||||
|
*
|
||||||
|
* @param exchange the exchange to publish to
|
||||||
|
* @param routingkey the routing key
|
||||||
|
* @param envelope the full envelope to send
|
||||||
|
* @param message the message to send
|
||||||
|
* @param size size of the message
|
||||||
|
* @param flags optional flags
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
|
||||||
|
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
|
||||||
|
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
||||||
|
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
||||||
|
*
|
||||||
|
* @param exchange the exchange to publish to
|
||||||
|
* @param routingkey the routing key
|
||||||
|
* @param envelope the full envelope to send
|
||||||
|
* @param message the message to send
|
||||||
|
* @param size size of the message
|
||||||
|
* @param flags optional flags
|
||||||
|
*/
|
||||||
|
DeferredPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
|
||||||
|
{
|
||||||
|
// publish the entire thing, and remember if it failed at any point
|
||||||
|
uint64_t tag = BASE::publish(exchange, routingKey, envelope, flags);
|
||||||
|
|
||||||
|
// create the publish deferred object, if we got no tag we failed
|
||||||
|
auto handler = std::make_shared<DeferredPublish>(tag == 0);
|
||||||
|
|
||||||
|
// add it to the open handlers
|
||||||
|
_handlers[tag] = handler;
|
||||||
|
|
||||||
|
// return the dereferenced handler
|
||||||
|
return *handler;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespaces
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
@ -20,6 +20,7 @@
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include "copiedbuffer.h"
|
#include "copiedbuffer.h"
|
||||||
#include "channelimpl.h"
|
#include "channelimpl.h"
|
||||||
|
#include "confirmed.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Begin of namespaces
|
* Begin of namespaces
|
||||||
|
|
@ -34,21 +35,9 @@ class Channel;
|
||||||
/**
|
/**
|
||||||
* Class definition
|
* Class definition
|
||||||
*/
|
*/
|
||||||
class Throttle
|
class Throttle : public Confirmed
|
||||||
{
|
{
|
||||||
protected:
|
protected:
|
||||||
/**
|
|
||||||
* The implementation for the channel
|
|
||||||
* @var std::shared_ptr<ChannelImpl>
|
|
||||||
*/
|
|
||||||
std::shared_ptr<ChannelImpl> _implementation;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Current id, always starts at 1.
|
|
||||||
* @var uint64_t
|
|
||||||
*/
|
|
||||||
uint64_t _current = 1;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Last sent ID
|
* Last sent ID
|
||||||
* @var uint64_t
|
* @var uint64_t
|
||||||
|
|
@ -74,39 +63,28 @@ protected:
|
||||||
*/
|
*/
|
||||||
std::set<size_t> _open;
|
std::set<size_t> _open;
|
||||||
|
|
||||||
/**
|
|
||||||
* Deferred to set up on the close
|
|
||||||
* @var std::shared_ptr<Deferred>
|
|
||||||
*/
|
|
||||||
std::shared_ptr<Deferred> _close;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Callback to call when an error occurred
|
|
||||||
* @var ErrorCallback
|
|
||||||
*/
|
|
||||||
ErrorCallback _errorCallback;
|
|
||||||
|
|
||||||
|
protected:
|
||||||
/**
|
/**
|
||||||
* Send method for a frame
|
* Send method for a frame
|
||||||
* @param id
|
* @param id
|
||||||
* @param frame
|
* @param frame
|
||||||
*/
|
*/
|
||||||
bool send(uint64_t id, const Frame &frame);
|
virtual bool send(uint64_t id, const Frame &frame) override;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method that is called to report an error
|
* Method that is called to report an error
|
||||||
* @param message
|
* @param message
|
||||||
*/
|
*/
|
||||||
virtual void reportError(const char *message);
|
virtual void reportError(const char *message) override;
|
||||||
|
|
||||||
protected:
|
|
||||||
/**
|
/**
|
||||||
* Called when the deliverytag(s) are acked/nacked
|
* Method that is called to report an ack/nack
|
||||||
* @param deliveryTag
|
* @param deliveryTag
|
||||||
* @param multiple
|
* @param multiple
|
||||||
*/
|
*/
|
||||||
virtual void onAck(uint64_t deliveryTag, bool multiple);
|
virtual void onAck(uint64_t deliveryTag, bool multiple) override;
|
||||||
virtual void onNack(uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); }
|
virtual void onNack(uint64_t deliveryTag, bool multiple) override;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
|
|
@ -139,33 +117,10 @@ public:
|
||||||
virtual ~Throttle() = default;
|
virtual ~Throttle() = default;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
* Method that can be accessed to check if there are still buffered messages.
|
||||||
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
|
||||||
*
|
|
||||||
* @param exchange the exchange to publish to
|
|
||||||
* @param routingkey the routing key
|
|
||||||
* @param envelope the full envelope to send
|
|
||||||
* @param message the message to send
|
|
||||||
* @param size size of the message
|
|
||||||
* @param flags optional flags
|
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0);
|
virtual bool waiting() const override { return _queue.size() > 0; }
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
|
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
|
|
||||||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of messages that are waiting to be published
|
|
||||||
* @return uint64_t
|
|
||||||
*/
|
|
||||||
size_t waiting() const { return _current - _last - 1; }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Number of messages already sent but unacknowledged by rabbit
|
|
||||||
* @return size_t
|
|
||||||
*/
|
|
||||||
size_t unacknowledged() const { return _open.size(); }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the throttle
|
* Get the throttle
|
||||||
|
|
@ -186,18 +141,6 @@ public:
|
||||||
* @param max optional maximum, 0 is flush all
|
* @param max optional maximum, 0 is flush all
|
||||||
*/
|
*/
|
||||||
size_t flush(size_t max = 0);
|
size_t flush(size_t max = 0);
|
||||||
|
|
||||||
/**
|
|
||||||
* Close the throttle channel (closes the underlying channel when all messages have been sent)
|
|
||||||
* @return Deferred&
|
|
||||||
*/
|
|
||||||
Deferred &close();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Install an error callback
|
|
||||||
* @param callback
|
|
||||||
*/
|
|
||||||
void onError(const ErrorCallback &callback);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
CPP = g++
|
CPP = g++
|
||||||
RM = rm -f
|
RM = rm -f
|
||||||
CPPFLAGS = -Wall -c -I../include -std=c++11 -MD
|
CPPFLAGS = -Wall -c -I../include -std=c++11 -MD -Wno-class-conversion
|
||||||
LD = g++
|
LD = g++
|
||||||
LD_FLAGS = -Wall -shared
|
LD_FLAGS = -Wall -shared
|
||||||
SHARED_LIB = lib$(LIBRARY_NAME).so.$(VERSION)
|
SHARED_LIB = lib$(LIBRARY_NAME).so.$(VERSION)
|
||||||
|
|
|
||||||
|
|
@ -11,71 +11,58 @@
|
||||||
* Includes
|
* Includes
|
||||||
*/
|
*/
|
||||||
#include "includes.h"
|
#include "includes.h"
|
||||||
|
#include "basicpublishframe.h"
|
||||||
|
#include "basicheaderframe.h"
|
||||||
|
#include "bodyframe.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Begin of namespaces
|
* Begin of namespaces
|
||||||
*/
|
*/
|
||||||
namespace AMQP {
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param channel
|
||||||
|
*/
|
||||||
|
Confirmed::Confirmed(Channel &channel) : _implementation(channel._implementation)
|
||||||
|
{
|
||||||
|
// activate confirm-select mode
|
||||||
|
auto &deferred = channel.confirmSelect()
|
||||||
|
.onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); })
|
||||||
|
.onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onNack(deliveryTag, multiple); });
|
||||||
|
|
||||||
|
// we might have failed, in which case we throw
|
||||||
|
if (!deferred) throw std::runtime_error("could not enable publisher confirms");
|
||||||
|
|
||||||
|
// we wrap a handling error callback that calls our member function
|
||||||
|
_implementation->onError([this](const char *message) { reportError(message); });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send method for a frame
|
||||||
|
* @param id
|
||||||
|
* @param frame
|
||||||
|
*/
|
||||||
|
bool Confirmed::send(uint64_t id, const Frame &frame)
|
||||||
|
{
|
||||||
|
// we're simply going to send it over the channel directly
|
||||||
|
return _implementation->send(frame);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when the deliverytag(s) are acked
|
* Called when the deliverytag(s) are acked
|
||||||
* @param deliveryTag
|
* @param deliveryTag
|
||||||
* @param multiple
|
* @param multiple
|
||||||
*/
|
*/
|
||||||
void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
|
void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
|
||||||
{
|
{
|
||||||
// monitor the object, watching for destruction since these ack/nack handlers
|
// leap out if there are still messages or we shouldn't close yet
|
||||||
// could destruct the object
|
if (!_close || waiting()) return;
|
||||||
Monitor monitor(this);
|
|
||||||
|
|
||||||
// single element is simple
|
// close the channel, and forward the callbacks to the installed handler
|
||||||
if (!multiple)
|
_implementation->close()
|
||||||
{
|
.onSuccess([this]() { _close->reportSuccess(); })
|
||||||
// find the element
|
.onError([this](const char *message) { _close->reportError(message); });
|
||||||
auto iter = _handlers.find(deliveryTag);
|
|
||||||
|
|
||||||
// we did not find it (this should not be possible, unless somebody explicitly called)
|
|
||||||
// the base-class publish methods for some reason.
|
|
||||||
if (iter == _handlers.end()) return Throttle::onAck(deliveryTag, multiple);
|
|
||||||
|
|
||||||
// call the ack handler
|
|
||||||
iter->second->reportAck();
|
|
||||||
|
|
||||||
// if the monitor is no longer valid, we stop (we're done)
|
|
||||||
if (!monitor) return;
|
|
||||||
|
|
||||||
// erase it from the map
|
|
||||||
_handlers.erase(iter);
|
|
||||||
}
|
|
||||||
|
|
||||||
// do multiple at once
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// find the last element, inclusive
|
|
||||||
auto upper = _handlers.upper_bound(deliveryTag);
|
|
||||||
|
|
||||||
// call the handlers
|
|
||||||
for (auto iter = _handlers.begin(); iter != upper; iter++)
|
|
||||||
{
|
|
||||||
// call the handler
|
|
||||||
iter->second->reportAck();
|
|
||||||
|
|
||||||
// if we were destructed in the meantime, we leap out
|
|
||||||
if (!monitor) return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// erase all acknowledged items
|
|
||||||
_handlers.erase(_handlers.begin(), upper);
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure the object is still valid
|
|
||||||
if (!monitor) return;
|
|
||||||
|
|
||||||
// call base handler, will advance on the throttle if needed. we call this _after_ we're
|
|
||||||
// done processing the callbacks, since one of the callbacks might close the channel, or publish
|
|
||||||
// more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing
|
|
||||||
// for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed
|
|
||||||
Throttle::onAck(deliveryTag, multiple);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -85,58 +72,13 @@ void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
|
||||||
*/
|
*/
|
||||||
void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
|
void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
|
||||||
{
|
{
|
||||||
// monitor the object, watching for destruction since these ack/nack handlers
|
// leap out if there are still messages or we shouldn't close yet
|
||||||
// could destruct the object
|
if (!_close || waiting()) return;
|
||||||
Monitor monitor(this);
|
|
||||||
|
|
||||||
// single element is simple
|
// close the channel, and forward the callbacks to the installed handler
|
||||||
if (!multiple)
|
_implementation->close()
|
||||||
{
|
.onSuccess([this]() { _close->reportSuccess(); })
|
||||||
// find the element
|
.onError([this](const char *message) { _close->reportError(message); });
|
||||||
auto iter = _handlers.find(deliveryTag);
|
|
||||||
|
|
||||||
// we did not find it (this should not be possible, unless somebody explicitly called)
|
|
||||||
// the base-class publish methods for some reason.
|
|
||||||
if (iter == _handlers.end()) return Throttle::onNack(deliveryTag, multiple);
|
|
||||||
|
|
||||||
// call the ack handler
|
|
||||||
iter->second->reportNack();
|
|
||||||
|
|
||||||
// if the monitor is no longer valid, we stop (we're done)
|
|
||||||
if (!monitor) return;
|
|
||||||
|
|
||||||
// erase it from the map
|
|
||||||
_handlers.erase(iter);
|
|
||||||
}
|
|
||||||
|
|
||||||
// nack multiple elements
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// find the last element, inclusive
|
|
||||||
auto upper = _handlers.upper_bound(deliveryTag);
|
|
||||||
|
|
||||||
// call the handlers
|
|
||||||
for (auto iter = _handlers.begin(); iter != upper; iter++)
|
|
||||||
{
|
|
||||||
// call the handler
|
|
||||||
iter->second->reportNack();
|
|
||||||
|
|
||||||
// if we were destructed in the meantime, we leap out
|
|
||||||
if (!monitor) return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// erase all acknowledged items
|
|
||||||
_handlers.erase(_handlers.begin(), upper);
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure the object is still valid
|
|
||||||
if (!monitor) return;
|
|
||||||
|
|
||||||
// call base handler, will advance on the throttle if needed. we call this _after_ we're
|
|
||||||
// done processing the callbacks, since one of the callbacks might close the channel, or publish
|
|
||||||
// more stuff. additionally, if it does destroy the channel, we are doing a lot of extra publishing
|
|
||||||
// for nothing. also, we call some extra handlers, and otherwise we might get onAcked after onClosed
|
|
||||||
Throttle::onNack(deliveryTag, multiple);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -145,29 +87,11 @@ void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
|
||||||
*/
|
*/
|
||||||
void Confirmed::reportError(const char *message)
|
void Confirmed::reportError(const char *message)
|
||||||
{
|
{
|
||||||
// monitor the object, watching for destruction since these ack/nack handlers
|
// reset tracking, since channel is fully broken
|
||||||
// could destruct the object
|
_current = 1;
|
||||||
Monitor monitor(this);
|
|
||||||
|
|
||||||
// move the handlers out
|
// if a callback is set, call the handler with the message
|
||||||
auto handlers = std::move(_handlers);
|
if (_errorCallback) _errorCallback(message);
|
||||||
|
|
||||||
// iterate over all the messages
|
|
||||||
// call the handlers
|
|
||||||
for (const auto &iter : handlers)
|
|
||||||
{
|
|
||||||
// call the handler
|
|
||||||
iter.second->reportError(message);
|
|
||||||
|
|
||||||
// if we were destructed in the meantime, we leap out
|
|
||||||
if (!monitor) return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the monitor is no longer valid, leap out
|
|
||||||
if (!monitor) return;
|
|
||||||
|
|
||||||
// call base class to let it handle the errors
|
|
||||||
Throttle::reportError(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -180,23 +104,91 @@ void Confirmed::reportError(const char *message)
|
||||||
* @param message the message to send
|
* @param message the message to send
|
||||||
* @param size size of the message
|
* @param size size of the message
|
||||||
* @param flags optional flags
|
* @param flags optional flags
|
||||||
|
* @return uint64_t
|
||||||
*/
|
*/
|
||||||
DeferredPublish &Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
|
uint64_t Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
|
||||||
{
|
{
|
||||||
// copy the current identifier, this will be the ID that will come back
|
// @todo do not copy the entire buffer to individual frames
|
||||||
auto current = _current;
|
|
||||||
|
|
||||||
// publish the entire thing, and remember if it failed at any point
|
// fail if we're closing the channel, no more publishes allowed
|
||||||
bool failed = !Throttle::publish(exchange, routingKey, envelope, flags);
|
if (_close) return false;
|
||||||
|
|
||||||
// create the open
|
// send the publish frame
|
||||||
auto handler = std::make_shared<DeferredPublish>(failed);
|
if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false;
|
||||||
|
|
||||||
// add it to the open handlers
|
// send header
|
||||||
_handlers[current] = handler;
|
if (!send(_current, BasicHeaderFrame(_implementation->id(), envelope))) return false;
|
||||||
|
|
||||||
// return the dereferenced handler
|
// connection and channel still usable?
|
||||||
return *handler;
|
if (!_implementation->usable()) return false;
|
||||||
|
|
||||||
|
// the max payload size is the max frame size minus the bytes for headers and trailer
|
||||||
|
uint32_t maxpayload = _implementation->maxPayload();
|
||||||
|
uint64_t bytessent = 0;
|
||||||
|
|
||||||
|
// the buffer
|
||||||
|
const char *data = envelope.body();
|
||||||
|
uint64_t bytesleft = envelope.bodySize();
|
||||||
|
|
||||||
|
// split up the body in multiple frames depending on the max frame size
|
||||||
|
while (bytesleft > 0)
|
||||||
|
{
|
||||||
|
// size of this chunk
|
||||||
|
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
|
||||||
|
|
||||||
|
// send out a body frame
|
||||||
|
if (!send(_current, BodyFrame(_implementation->id(), data + bytessent, (uint32_t)chunksize))) return false;
|
||||||
|
|
||||||
|
// update counters
|
||||||
|
bytessent += chunksize;
|
||||||
|
bytesleft -= chunksize;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we succeeded
|
||||||
|
return _current++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the throttle channel (closes the underlying channel)
|
||||||
|
* @return Deferred&
|
||||||
|
*/
|
||||||
|
Deferred &Confirmed::close()
|
||||||
|
{
|
||||||
|
// if this was already set to be closed, return that
|
||||||
|
if (_close) return *_close;
|
||||||
|
|
||||||
|
// create the deferred
|
||||||
|
_close = std::make_shared<Deferred>(_implementation->usable());
|
||||||
|
|
||||||
|
// if there are open messages or there is a queue, they will still get acked and we will then forward it
|
||||||
|
if (waiting()) return *_close;
|
||||||
|
|
||||||
|
// there are no open messages, we can close the channel directly.
|
||||||
|
_implementation->close()
|
||||||
|
.onSuccess([this]() { _close->reportSuccess(); })
|
||||||
|
.onError([this](const char *message) { _close->reportError(message); });
|
||||||
|
|
||||||
|
// return the created deferred
|
||||||
|
return *_close;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Install an error callback
|
||||||
|
* @param callback
|
||||||
|
*/
|
||||||
|
void Confirmed::onError(const ErrorCallback &callback)
|
||||||
|
{
|
||||||
|
// we store the callback
|
||||||
|
_errorCallback = callback;
|
||||||
|
|
||||||
|
// check the callback
|
||||||
|
if (!callback) return;
|
||||||
|
|
||||||
|
// if the channel is no longer usable, report that
|
||||||
|
if (!_implementation->usable()) return callback("Channel is no longer usable");
|
||||||
|
|
||||||
|
// specify that we're already closing
|
||||||
|
if (_close) callback("Wrapped channel is closing down");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -79,6 +79,7 @@
|
||||||
#include "amqpcpp/channel.h"
|
#include "amqpcpp/channel.h"
|
||||||
#include "amqpcpp/throttle.h"
|
#include "amqpcpp/throttle.h"
|
||||||
#include "amqpcpp/confirmed.h"
|
#include "amqpcpp/confirmed.h"
|
||||||
|
#include "amqpcpp/reliable.h"
|
||||||
#include "amqpcpp/login.h"
|
#include "amqpcpp/login.h"
|
||||||
#include "amqpcpp/address.h"
|
#include "amqpcpp/address.h"
|
||||||
#include "amqpcpp/connectionhandler.h"
|
#include "amqpcpp/connectionhandler.h"
|
||||||
|
|
|
||||||
155
src/throttle.cpp
155
src/throttle.cpp
|
|
@ -26,22 +26,10 @@ namespace AMQP {
|
||||||
* @param channel
|
* @param channel
|
||||||
* @param throttle
|
* @param throttle
|
||||||
*/
|
*/
|
||||||
Throttle::Throttle(Channel &channel, size_t throttle) : _implementation(channel._implementation), _throttle(throttle)
|
Throttle::Throttle(Channel &channel, size_t throttle) : Confirmed(channel), _throttle(throttle) {}
|
||||||
{
|
|
||||||
// activate confirm-select mode
|
|
||||||
auto &deferred = channel.confirmSelect()
|
|
||||||
.onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); })
|
|
||||||
.onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onNack(deliveryTag, multiple); });
|
|
||||||
|
|
||||||
// we might have failed, in which case we throw
|
|
||||||
if (!deferred) throw std::runtime_error("could not enable publisher confirms");
|
|
||||||
|
|
||||||
// we wrap a handling error callback that calls our member function
|
|
||||||
_implementation->onError([this](const char *message) { reportError(message); });
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when the deliverytag(s) are acked/nacked
|
* Called when the deliverytag(s) are acked
|
||||||
* @param deliveryTag
|
* @param deliveryTag
|
||||||
* @param multiple
|
* @param multiple
|
||||||
*/
|
*/
|
||||||
|
|
@ -53,16 +41,31 @@ void Throttle::onAck(uint64_t deliveryTag, bool multiple)
|
||||||
// otherwise, we remove the single element
|
// otherwise, we remove the single element
|
||||||
else _open.erase(deliveryTag);
|
else _open.erase(deliveryTag);
|
||||||
|
|
||||||
// if there is more room now, we can flush some items
|
// if there is room, flush part of the queue
|
||||||
if (_open.size() < _throttle) flush(_throttle - _open.size());
|
if (_open.size() < _throttle) flush(_throttle - _open.size());
|
||||||
|
|
||||||
// leap out if there are still messages or we shouldn't close yet
|
// call base handler
|
||||||
if (!_open.empty() || !_close) return;
|
Confirmed::onAck(deliveryTag, multiple);
|
||||||
|
}
|
||||||
|
|
||||||
// close the channel, and forward the callbacks to the installed handler
|
/**
|
||||||
_implementation->close()
|
* Called when the deliverytag(s) are nacked
|
||||||
.onSuccess([this]() { _close->reportSuccess(); })
|
* @param deliveryTag
|
||||||
.onError([this](const char *message) { _close->reportError(message); });
|
* @param multiple
|
||||||
|
*/
|
||||||
|
void Throttle::onNack(uint64_t deliveryTag, bool multiple)
|
||||||
|
{
|
||||||
|
// number of messages exposed
|
||||||
|
if (multiple) _open.erase(_open.begin(), _open.upper_bound(deliveryTag));
|
||||||
|
|
||||||
|
// otherwise, we remove the single element
|
||||||
|
else _open.erase(deliveryTag);
|
||||||
|
|
||||||
|
// if there is room, flush part of the queue
|
||||||
|
if (_open.size() < _throttle) flush(_throttle - _open.size());
|
||||||
|
|
||||||
|
// call base handler
|
||||||
|
Confirmed::onNack(deliveryTag, multiple);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -88,8 +91,8 @@ bool Throttle::send(uint64_t id, const Frame &frame)
|
||||||
// we have now send this id
|
// we have now send this id
|
||||||
_open.insert(id);
|
_open.insert(id);
|
||||||
|
|
||||||
// and we're going to send it over the channel directly
|
// we can finally actually send it
|
||||||
return _implementation->send(frame);
|
return Confirmed::send(id, frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -104,68 +107,11 @@ void Throttle::reportError(const char *message)
|
||||||
// we can also forget all open messages, won't hear from them any more
|
// we can also forget all open messages, won't hear from them any more
|
||||||
_open.clear();
|
_open.clear();
|
||||||
|
|
||||||
// reset tracking, since channel is fully broken
|
// we have no last seen message any more
|
||||||
_last = 0;
|
_last = 0;
|
||||||
_current = 1;
|
|
||||||
|
|
||||||
// if a callback is set, call the handler with the message
|
// call base method
|
||||||
if (_errorCallback) _errorCallback(message);
|
Confirmed::reportError(message);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
|
||||||
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
|
||||||
*
|
|
||||||
* @param exchange the exchange to publish to
|
|
||||||
* @param routingkey the routing key
|
|
||||||
* @param envelope the full envelope to send
|
|
||||||
* @param message the message to send
|
|
||||||
* @param size size of the message
|
|
||||||
* @param flags optional flags
|
|
||||||
*/
|
|
||||||
bool Throttle::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
|
|
||||||
{
|
|
||||||
// @todo do not copy the entire buffer to individual frames
|
|
||||||
|
|
||||||
// fail if we're closing the channel, no more publishes allowed
|
|
||||||
if (_close) return false;
|
|
||||||
|
|
||||||
// send the publish frame
|
|
||||||
if (!send(_current, BasicPublishFrame(_implementation->id(), exchange, routingKey, (flags & mandatory) != 0, (flags & immediate) != 0))) return false;
|
|
||||||
|
|
||||||
// send header
|
|
||||||
if (!send(_current, BasicHeaderFrame(_implementation->id(), envelope))) return false;
|
|
||||||
|
|
||||||
// connection and channel still usable?
|
|
||||||
if (!_implementation->usable()) return false;
|
|
||||||
|
|
||||||
// the max payload size is the max frame size minus the bytes for headers and trailer
|
|
||||||
uint32_t maxpayload = _implementation->maxPayload();
|
|
||||||
uint64_t bytessent = 0;
|
|
||||||
|
|
||||||
// the buffer
|
|
||||||
const char *data = envelope.body();
|
|
||||||
uint64_t bytesleft = envelope.bodySize();
|
|
||||||
|
|
||||||
// split up the body in multiple frames depending on the max frame size
|
|
||||||
while (bytesleft > 0)
|
|
||||||
{
|
|
||||||
// size of this chunk
|
|
||||||
uint64_t chunksize = std::min(static_cast<uint64_t>(maxpayload), bytesleft);
|
|
||||||
|
|
||||||
// send out a body frame
|
|
||||||
if (!send(_current, BodyFrame(_implementation->id(), data + bytessent, (uint32_t)chunksize))) return false;
|
|
||||||
|
|
||||||
// update counters
|
|
||||||
bytessent += chunksize;
|
|
||||||
bytesleft -= chunksize;
|
|
||||||
}
|
|
||||||
|
|
||||||
// we're done, we move to the next deliverytag
|
|
||||||
++_current;
|
|
||||||
|
|
||||||
// we succeeded
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -211,49 +157,6 @@ size_t Throttle::flush(size_t max)
|
||||||
return published;
|
return published;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Close the throttle channel (closes the underlying channel)
|
|
||||||
* @return Deferred&
|
|
||||||
*/
|
|
||||||
Deferred &Throttle::close()
|
|
||||||
{
|
|
||||||
// if this was already set to be closed, return that
|
|
||||||
if (_close) return *_close;
|
|
||||||
|
|
||||||
// create the deferred
|
|
||||||
_close = std::make_shared<Deferred>(_implementation->usable());
|
|
||||||
|
|
||||||
// if there are open messages or there is a queue, they will still get acked and we will then forward it
|
|
||||||
if (_open.size() > 0 || !_queue.empty()) return *_close;
|
|
||||||
|
|
||||||
// there are no open messages, we can close the channel directly.
|
|
||||||
_implementation->close()
|
|
||||||
.onSuccess([this]() { _close->reportSuccess(); })
|
|
||||||
.onError([this](const char *message) { _close->reportError(message); });
|
|
||||||
|
|
||||||
// return the created deferred
|
|
||||||
return *_close;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Install an error callback
|
|
||||||
* @param callback
|
|
||||||
*/
|
|
||||||
void Throttle::onError(const ErrorCallback &callback)
|
|
||||||
{
|
|
||||||
// we store the callback
|
|
||||||
_errorCallback = callback;
|
|
||||||
|
|
||||||
// check the callback
|
|
||||||
if (!callback) return;
|
|
||||||
|
|
||||||
// if the channel is no longer usable, report that
|
|
||||||
if (!_implementation->usable()) return callback("Channel is no longer usable");
|
|
||||||
|
|
||||||
// specify that we're already closing
|
|
||||||
if (_close) callback("Wrapped channel is closing down");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* End of namespaces
|
* End of namespaces
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue