{auto} initial implementation of the confirmed wrapper
This commit is contained in:
parent
f10e861532
commit
0b9f4e4af5
|
|
@ -76,6 +76,7 @@
|
||||||
#include "amqpcpp/channelimpl.h"
|
#include "amqpcpp/channelimpl.h"
|
||||||
#include "amqpcpp/channel.h"
|
#include "amqpcpp/channel.h"
|
||||||
#include "amqpcpp/throttle.h"
|
#include "amqpcpp/throttle.h"
|
||||||
|
#include "amqpcpp/confirmed.h"
|
||||||
#include "amqpcpp/login.h"
|
#include "amqpcpp/login.h"
|
||||||
#include "amqpcpp/address.h"
|
#include "amqpcpp/address.h"
|
||||||
#include "amqpcpp/connectionhandler.h"
|
#include "amqpcpp/connectionhandler.h"
|
||||||
|
|
|
||||||
|
|
@ -85,6 +85,12 @@ using BounceCallback = std::function<void(const Message &message, int16
|
||||||
using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
|
using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
|
||||||
using NackCallback = std::function<void(uint64_t deliveryTag, bool multiple, bool requeue)>;
|
using NackCallback = std::function<void(uint64_t deliveryTag, bool multiple, bool requeue)>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When using a confirm wrapped channel, these callbacks are called when a message is acknowledged/nacked.
|
||||||
|
*/
|
||||||
|
using PublishAckCallback = std::function<void()>;
|
||||||
|
using PublishNackCallback = std::function<void()>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* End namespace
|
* End namespace
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,96 @@
|
||||||
|
/**
|
||||||
|
* Confirmed.h
|
||||||
|
*
|
||||||
|
* A channel wrapper based on AMQP::Throttle that allows message callbacks to be installed
|
||||||
|
* on the publishes, to be called when they are confirmed by the message broker.
|
||||||
|
*
|
||||||
|
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
|
||||||
|
* @copyright 2020 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Header guard
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Includes
|
||||||
|
*/
|
||||||
|
#include "deferredconfirmedpublish.h"
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of namespaces
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class definition
|
||||||
|
*/
|
||||||
|
class Confirmed : public Throttle, private Watchable
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* Set of open deliverytags. We want a normal set (not unordered_set) because
|
||||||
|
* removal will be cheaper for whole ranges.
|
||||||
|
* @var size_t
|
||||||
|
*/
|
||||||
|
std::map<size_t, std::shared_ptr<DeferredConfirmedPublish>> _handlers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when the deliverytag(s) are acked/nacked
|
||||||
|
* @param deliveryTag
|
||||||
|
* @param multiple
|
||||||
|
*/
|
||||||
|
virtual void onAck(uint64_t deliveryTag, bool multiple) override;
|
||||||
|
virtual void onNack(uint64_t deliveryTag, bool multiple) override;
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param channel
|
||||||
|
* @param throttle
|
||||||
|
*/
|
||||||
|
Confirmed(AMQP::Channel &channel, size_t throttle) : Throttle(channel, throttle) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deleted copy constructor, deleted move constructor
|
||||||
|
* @param other
|
||||||
|
*/
|
||||||
|
Confirmed(const Confirmed &other) = delete;
|
||||||
|
Confirmed(Confirmed &&other) = delete;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deleted copy assignment, deleted move assignment
|
||||||
|
* @param other
|
||||||
|
*/
|
||||||
|
Confirmed &operator=(const Confirmed &other) = delete;
|
||||||
|
Confirmed &operator=(Confirmed &&other) = delete;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Virtual destructor
|
||||||
|
*/
|
||||||
|
virtual ~Confirmed() = default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
||||||
|
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
||||||
|
*
|
||||||
|
* @param exchange the exchange to publish to
|
||||||
|
* @param routingkey the routing key
|
||||||
|
* @param envelope the full envelope to send
|
||||||
|
* @param message the message to send
|
||||||
|
* @param size size of the message
|
||||||
|
* @param flags optional flags
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0);
|
||||||
|
DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
|
||||||
|
DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
|
||||||
|
DeferredConfirmedPublish &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespaces
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,107 @@
|
||||||
|
/**
|
||||||
|
* DeferredConfirmedPublish.h
|
||||||
|
*
|
||||||
|
* Deferred callback for RabbitMQ-specific publisher confirms mechanism per-message.
|
||||||
|
*
|
||||||
|
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
|
||||||
|
* @copyright 2020 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Include guard
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up namespace
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We extend from the default deferred and add extra functionality
|
||||||
|
*/
|
||||||
|
class DeferredConfirmedPublish : public Deferred
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* Callback to execute when server confirms that message is processed
|
||||||
|
* @var AckCallback
|
||||||
|
*/
|
||||||
|
PublishAckCallback _ackCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback to execute when server sends negative acknowledgement
|
||||||
|
* @var NackCallback
|
||||||
|
*/
|
||||||
|
PublishNackCallback _nackCallback;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Report an ack, calls the callback.
|
||||||
|
*/
|
||||||
|
void reportAck()
|
||||||
|
{
|
||||||
|
// check if the callback is set
|
||||||
|
if (_ackCallback) _ackCallback();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Report an nack, calls the callback if set.
|
||||||
|
*/
|
||||||
|
void reportNack()
|
||||||
|
{
|
||||||
|
// check if the callback is set
|
||||||
|
if (_nackCallback) _nackCallback();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The wrapped confirmed channel implementation may call our
|
||||||
|
* private members and construct us
|
||||||
|
*/
|
||||||
|
friend class Confirmed;
|
||||||
|
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Protected constructor that can only be called
|
||||||
|
* from within the channel implementation
|
||||||
|
*
|
||||||
|
* Note: this constructor _should_ be protected, but because make_shared
|
||||||
|
* will then not work, we have decided to make it public after all,
|
||||||
|
* because the work-around would result in not-so-easy-to-read code.
|
||||||
|
*
|
||||||
|
* @param boolean are we already failed?
|
||||||
|
*/
|
||||||
|
DeferredConfirmedPublish(bool failed = false) : Deferred(failed) {}
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Callback that is called when the broker confirmed message publication
|
||||||
|
* @param callback the callback to execute
|
||||||
|
*/
|
||||||
|
DeferredConfirmedPublish &onAck(const PublishAckCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_ackCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback that is called when the broker denied message publication
|
||||||
|
* @param callback the callback to execute
|
||||||
|
*/
|
||||||
|
DeferredConfirmedPublish &onNack(const PublishNackCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_nackCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End namespace
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
@ -36,7 +36,7 @@ class Channel;
|
||||||
*/
|
*/
|
||||||
class Throttle
|
class Throttle
|
||||||
{
|
{
|
||||||
private:
|
protected:
|
||||||
/**
|
/**
|
||||||
* The implementation for the channel
|
* The implementation for the channel
|
||||||
* @var std::shared_ptr<ChannelImpl>
|
* @var std::shared_ptr<ChannelImpl>
|
||||||
|
|
@ -74,13 +74,6 @@ private:
|
||||||
*/
|
*/
|
||||||
std::set<size_t> _open;
|
std::set<size_t> _open;
|
||||||
|
|
||||||
/**
|
|
||||||
* Called when the deliverytag(s) are acked/nacked
|
|
||||||
* @param deliveryTag
|
|
||||||
* @param multiple
|
|
||||||
*/
|
|
||||||
void onAck(uint64_t deliveryTag, bool multiple);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send method for a frame
|
* Send method for a frame
|
||||||
* @param id
|
* @param id
|
||||||
|
|
@ -88,6 +81,15 @@ private:
|
||||||
*/
|
*/
|
||||||
bool send(uint64_t id, const Frame &frame);
|
bool send(uint64_t id, const Frame &frame);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/**
|
||||||
|
* Called when the deliverytag(s) are acked/nacked
|
||||||
|
* @param deliveryTag
|
||||||
|
* @param multiple
|
||||||
|
*/
|
||||||
|
virtual void onAck(uint64_t deliveryTag, bool multiple);
|
||||||
|
virtual void onNack(uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); }
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,160 @@
|
||||||
|
/**
|
||||||
|
* Confirmed.cpp
|
||||||
|
*
|
||||||
|
* Implementation for Confirmed class.
|
||||||
|
*
|
||||||
|
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
|
||||||
|
* @copyright 2020 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Includes
|
||||||
|
*/
|
||||||
|
#include "includes.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of namespaces
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when the deliverytag(s) are acked
|
||||||
|
* @param deliveryTag
|
||||||
|
* @param multiple
|
||||||
|
*/
|
||||||
|
void Confirmed::onAck(uint64_t deliveryTag, bool multiple)
|
||||||
|
{
|
||||||
|
// call base handler, will advance on the throttle if needed
|
||||||
|
Throttle::onAck(deliveryTag, multiple);
|
||||||
|
|
||||||
|
// monitor the object, watching for destruction since these ack/nack handlers
|
||||||
|
// could destruct the object
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
// single element is simple
|
||||||
|
if (!multiple)
|
||||||
|
{
|
||||||
|
// find the element
|
||||||
|
auto iter = _handlers.find(deliveryTag);
|
||||||
|
|
||||||
|
// we did not find it (this should not be possible, unless somebody explicitly called)
|
||||||
|
// the base-class publish methods for some reason.
|
||||||
|
if (iter == _handlers.end()) return;
|
||||||
|
|
||||||
|
// call the ack handler
|
||||||
|
iter->second->reportAck();
|
||||||
|
|
||||||
|
// if the monitor is no longer valid, we stop (we're done)
|
||||||
|
if (!monitor) return;
|
||||||
|
|
||||||
|
// erase it from the map
|
||||||
|
_handlers.erase(iter);
|
||||||
|
|
||||||
|
// we are done
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// find the last element, inclusive
|
||||||
|
auto upper = _handlers.upper_bound(deliveryTag);
|
||||||
|
|
||||||
|
// call the handlers
|
||||||
|
for (auto iter = _handlers.begin(); iter != upper; iter++)
|
||||||
|
{
|
||||||
|
// call the handler
|
||||||
|
iter->second->reportAck();
|
||||||
|
|
||||||
|
// if we were destructed in the meantime, we leap out
|
||||||
|
if (!monitor) return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// erase all acknowledged items
|
||||||
|
_handlers.erase(_handlers.begin(), upper);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when the deliverytag(s) are nacked
|
||||||
|
* @param deliveryTag
|
||||||
|
* @param multiple
|
||||||
|
*/
|
||||||
|
void Confirmed::onNack(uint64_t deliveryTag, bool multiple)
|
||||||
|
{
|
||||||
|
// call base handler, will advance on the throttle if needed
|
||||||
|
Throttle::onNack(deliveryTag, multiple);
|
||||||
|
|
||||||
|
// monitor the object, watching for destruction since these ack/nack handlers
|
||||||
|
// could destruct the object
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
// single element is simple
|
||||||
|
if (!multiple)
|
||||||
|
{
|
||||||
|
// find the element
|
||||||
|
auto iter = _handlers.find(deliveryTag);
|
||||||
|
|
||||||
|
// we did not find it (this should not be possible, unless somebody explicitly called)
|
||||||
|
// the base-class publish methods for some reason.
|
||||||
|
if (iter == _handlers.end()) return;
|
||||||
|
|
||||||
|
// call the ack handler
|
||||||
|
iter->second->reportNack();
|
||||||
|
|
||||||
|
// if the monitor is no longer valid, we stop (we're done)
|
||||||
|
if (!monitor) return;
|
||||||
|
|
||||||
|
// erase it from the map
|
||||||
|
_handlers.erase(iter);
|
||||||
|
|
||||||
|
// we are done
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// find the last element, inclusive
|
||||||
|
auto upper = _handlers.upper_bound(deliveryTag);
|
||||||
|
|
||||||
|
// call the handlers
|
||||||
|
for (auto iter = _handlers.begin(); iter != upper; iter++)
|
||||||
|
{
|
||||||
|
// call the handler
|
||||||
|
iter->second->reportNack();
|
||||||
|
|
||||||
|
// if we were destructed in the meantime, we leap out
|
||||||
|
if (!monitor) return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// erase all acknowledged items
|
||||||
|
_handlers.erase(_handlers.begin(), upper);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
|
||||||
|
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
|
||||||
|
*
|
||||||
|
* @param exchange the exchange to publish to
|
||||||
|
* @param routingkey the routing key
|
||||||
|
* @param envelope the full envelope to send
|
||||||
|
* @param message the message to send
|
||||||
|
* @param size size of the message
|
||||||
|
* @param flags optional flags
|
||||||
|
*/
|
||||||
|
DeferredConfirmedPublish &Confirmed::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags)
|
||||||
|
{
|
||||||
|
// copy the current identifier, this will be the ID that will come back
|
||||||
|
auto current = _current;
|
||||||
|
|
||||||
|
// publish the entire thing, and remember if it failed at any point
|
||||||
|
bool failed = !Throttle::publish(exchange, routingKey, envelope, flags);
|
||||||
|
|
||||||
|
// create the open
|
||||||
|
auto handler = std::make_shared<DeferredConfirmedPublish>(failed);
|
||||||
|
|
||||||
|
// add it to the open handlers
|
||||||
|
_handlers[current] = handler;
|
||||||
|
|
||||||
|
// return the dereferenced handler
|
||||||
|
return *handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespaces
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
@ -77,6 +77,7 @@
|
||||||
#include "amqpcpp/channelimpl.h"
|
#include "amqpcpp/channelimpl.h"
|
||||||
#include "amqpcpp/channel.h"
|
#include "amqpcpp/channel.h"
|
||||||
#include "amqpcpp/throttle.h"
|
#include "amqpcpp/throttle.h"
|
||||||
|
#include "amqpcpp/confirmed.h"
|
||||||
#include "amqpcpp/login.h"
|
#include "amqpcpp/login.h"
|
||||||
#include "amqpcpp/address.h"
|
#include "amqpcpp/address.h"
|
||||||
#include "amqpcpp/connectionhandler.h"
|
#include "amqpcpp/connectionhandler.h"
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ Throttle::Throttle(Channel &channel, size_t throttle) : _implementation(channel.
|
||||||
// activate confirm-select mode
|
// activate confirm-select mode
|
||||||
auto &deferred = channel.confirmSelect()
|
auto &deferred = channel.confirmSelect()
|
||||||
.onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); })
|
.onAck([this](uint64_t deliveryTag, bool multiple) { onAck(deliveryTag, multiple); })
|
||||||
.onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onAck(deliveryTag, multiple); });
|
.onNack([this](uint64_t deliveryTag, bool multiple, bool /* requeue*/) { onNack(deliveryTag, multiple); });
|
||||||
|
|
||||||
// we might have failed, in which case we throw
|
// we might have failed, in which case we throw
|
||||||
if (!deferred) throw std::runtime_error("could not enable publisher confirms");
|
if (!deferred) throw std::runtime_error("could not enable publisher confirms");
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue