de-templified the deferred objects, to make them easier to understand for other programmers

This commit is contained in:
Emiel Bruijntjes 2014-04-15 10:43:33 +02:00
parent 7b20f46519
commit 921f24ae06
11 changed files with 623 additions and 312 deletions

View File

@ -53,9 +53,12 @@
// mid level includes
#include <amqpcpp/exchangetype.h>
#include <amqpcpp/flags.h>
#include <amqpcpp/callbacks.h>
#include <amqpcpp/deferred.h>
#include <amqpcpp/deferredconsumer.h>
#include <amqpcpp/callbacks.h>
#include <amqpcpp/deferredqueue.h>
#include <amqpcpp/deferreddelete.h>
#include <amqpcpp/deferredcancel.h>
#include <amqpcpp/channelimpl.h>
#include <amqpcpp/channel.h>
#include <amqpcpp/login.h>

View File

@ -12,145 +12,18 @@
namespace AMQP {
/**
* Class for managing deferred callbacks
* All the callbacks that are supported
*
* When someone registers a callback function for certain events, it should
* match one of the following signatures.
*/
class Callbacks
{
private:
/**
* Different callback types supported
*/
std::tuple<
std::deque<Deferred<>>,
std::deque<Deferred<const std::string&, uint32_t, uint32_t>>,
std::deque<Deferred<uint32_t>>,
std::deque<Deferred<const std::string&>>
> _callbacks;
/**
* If all else fails, we have gotten the wrong
* type, which is not present in the arguments.
*
* This should result in a compile error.
*/
template <class T, std::size_t N, class... Arguments>
struct getIndex
{
// if this structure is used, we went past the last argument
// and this static_assert should trigger a compile failure.
static_assert(N < sizeof...(Arguments), "Type T not found in Arguments");
// we still have to provide this member though
static constexpr std::size_t value = N;
};
/**
* This structure has one static member that represents
* the index of T in Arguments. This variant is used where U
* does equal T, so a match is found, meaning the current
* index given is the right one.
*/
template <class T, std::size_t N, class... Arguments>
struct getIndex<T, N, T, Arguments...>
{
// element is same type as we are looking for
static constexpr std::size_t value = N;
};
/**
* This structure has one static member that represents
* the index of T in Arguments. This variant is used where U
* does not equal T, so we need to look at the next member.
*/
template <class T, std::size_t N, class U, class... Arguments>
struct getIndex<T, N, U, Arguments...>
{
// current N is not correct, unroll to next element
static constexpr std::size_t value = getIndex<T, N + 1, Arguments...>::value;
};
/**
* Retrieve the list of callbacks matching the type
*
* @param tuple tuple with callbacks
*/
template <class T, class... Arguments>
T& get(std::tuple<Arguments...>& tuple)
{
// retrieve the index at which the requested callbacks can be found
constexpr std::size_t index = getIndex<T, 0, Arguments...>::value;
// retrieve the callbacks
return std::get<index>(tuple);
}
public:
/**
* Add a deferred to the available callbacks
*
* @param deferred the deferred to add
* @return reference to the inserted deferred
*/
template <typename... Arguments>
Deferred<Arguments...>& push_back(Deferred<Arguments...>&& item)
{
// retrieve the container
auto &container = get<std::deque<Deferred<Arguments...>>>(_callbacks);
// add the element
container.push_back(std::move(item));
// return reference to the new item
return container.back();
}
/**
* Report success to the relevant callback
*
* @param mixed... additional parameters
*/
template <typename... Arguments>
void reportSuccess(Arguments ...parameters)
{
// retrieve the container and element
auto &container = get<std::deque<Deferred<Arguments...>>>(_callbacks);
auto &callback = container.front();
// execute the callback
callback.success(parameters...);
// remove the executed callback
container.pop_front();
}
/**
* Report a failure
*
* @param error a description of the error
*/
template <std::size_t N = 0>
typename std::enable_if<N == std::tuple_size<decltype(_callbacks)>::value>::type
reportError(const std::string& message)
{}
/**
* Report a failure
*
* @param error a description of the error
*/
template <std::size_t N = 0>
typename std::enable_if<N < std::tuple_size<decltype(_callbacks)>::value>::type
reportError(const std::string& message)
{
// retrieve the callbacks at current index
auto &callbacks = std::get<N>(_callbacks);
// report errors to all callbacks of the current type
for (auto &callback : callbacks) callback.error(message);
// execute the next type
reportError<N + 1>(message);
}
};
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
using ConsumeCallback = std::function<void(const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)>;
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using CancelCallback = std::function<void(const std::string &consumer)>;
/**
* End namespace

View File

@ -42,7 +42,7 @@ public:
*
* @param callback the callback to execute
*/
void onReady(const std::function<void(Channel *channel)>& callback)
void onReady(const SuccessCallback &callback)
{
// store callback in implementation
_implementation._readyCallback = callback;
@ -56,7 +56,7 @@ public:
*
* @param callback the callback to execute
*/
void onError(const std::function<void(Channel *channel, const std::string& message)>& callback)
void onError(const ErrorCallback &callback)
{
// store callback in implementation
_implementation._errorCallback = callback;
@ -70,7 +70,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& pause()
Deferred &pause()
{
return _implementation.pause();
}
@ -83,7 +83,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& resume()
Deferred &resume()
{
return _implementation.resume();
}
@ -103,7 +103,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& startTransaction()
Deferred &startTransaction()
{
return _implementation.startTransaction();
}
@ -114,7 +114,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& commitTransaction()
Deferred &commitTransaction()
{
return _implementation.commitTransaction();
}
@ -125,7 +125,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& rollbackTransaction()
Deferred &rollbackTransaction()
{
return _implementation.rollbackTransaction();
}
@ -149,12 +149,12 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); }
Deferred<>& declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); }
Deferred<>& declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); }
Deferred<>& declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); }
Deferred<>& declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); }
Deferred<>& declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); }
Deferred &declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); }
Deferred &declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); }
Deferred &declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); }
Deferred &declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); }
Deferred &declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); }
Deferred &declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); }
/**
* Remove an exchange
@ -169,7 +169,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); }
Deferred &removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); }
/**
* Bind two exchanges to each other
@ -187,9 +187,9 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); }
Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); }
Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); }
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); }
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); }
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); }
/**
* Unbind two exchanges from one another
@ -207,9 +207,9 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); }
Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); }
Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); }
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); }
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); }
Deferred &unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); }
/**
* Declare a queue
@ -240,12 +240,12 @@ public:
*
* });
*/
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); }
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); }
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); }
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); }
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); }
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); }
DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); }
DeferredQueue &declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); }
DeferredQueue &declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); }
DeferredQueue &declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); }
DeferredQueue &declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); }
DeferredQueue &declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); }
/**
* Bind a queue to an exchange
@ -263,9 +263,9 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); }
Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); }
Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); }
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); }
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); }
Deferred &bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); }
/**
* Unbind a queue from an exchange
@ -277,8 +277,8 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); }
Deferred<>& unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); }
Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); }
Deferred &unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); }
/**
* Purge a queue
@ -303,7 +303,7 @@ public:
*
* });
*/
Deferred<uint32_t>& purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); }
DeferredDelete &purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); }
/**
* Remove a queue
@ -329,7 +329,7 @@ public:
*
* });
*/
Deferred<uint32_t>& removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); }
DeferredDelete &removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); }
/**
* Publish a message to an exchange
@ -361,7 +361,7 @@ public:
* @param prefetchCount maximum number of messages to prefetch
* @return bool whether the Qos frame is sent.
*/
Deferred<>& setQos(uint16_t prefetchCount)
Deferred &setQos(uint16_t prefetchCount)
{
return _implementation.setQos(prefetchCount);
}
@ -440,7 +440,7 @@ public:
*
* });
*/
Deferred<const std::string&>& cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); }
DeferredCancel &cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); }
/**
* Acknoldge a received message
@ -493,7 +493,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& recover(int flags = 0) { return _implementation.recover(flags); }
Deferred &recover(int flags = 0) { return _implementation.recover(flags); }
/**
* Close the current channel
@ -501,7 +501,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& close() { return _implementation.close(); }
Deferred &close() { return _implementation.close(); }
/**
* Get the channel we're working on

View File

@ -34,24 +34,38 @@ private:
/**
* Callback when the channel is ready
* @var SuccessCallback
*/
std::function<void(Channel *channel)> _readyCallback;
SuccessCallback _readyCallback;
/**
* Callback when the channel errors out
* @var ErrorCallback
*/
std::function<void(Channel *channel, const std::string& message)> _errorCallback;
ErrorCallback _errorCallback;
/**
* Callback to execute when a message arrives
*
* @todo do this different??
*/
std::unique_ptr<DeferredConsumer> _consumer;
/**
* The callbacks waiting to be called
* Pointer to the oldest deferred result (the first one that is going
* to be executed)
*
* @var Deferred
*/
Callbacks _callbacks;
Deferred *_oldestCallback = nullptr;
/**
* Pointer to the newest deferred result (the last one to be added).
*
* @var Deferred
*/
Deferred *_newestCallback = nullptr;
/**
* The channel number
* @var uint16_t
@ -86,6 +100,14 @@ private:
*/
ChannelImpl(Channel *parent, Connection *connection);
/**
* Push a deferred result
* @param result The deferred result
* @param error Error message in case the result is not ok
*/
void push(Deferred *deferred, const char *error);
public:
/**
* Destructor
@ -109,7 +131,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& pause();
Deferred &pause();
/**
* Resume a paused channel
@ -119,7 +141,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& resume();
Deferred &resume();
/**
* Is the channel connected?
@ -133,7 +155,7 @@ public:
/**
* Start a transaction
*/
Deferred<>& startTransaction();
Deferred &startTransaction();
/**
* Commit the current transaction
@ -141,7 +163,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& commitTransaction();
Deferred &commitTransaction();
/**
* Rollback the current transaction
@ -149,7 +171,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& rollbackTransaction();
Deferred &rollbackTransaction();
/**
* declare an exchange
@ -162,7 +184,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments);
Deferred &declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments);
/**
* bind two exchanges
@ -176,7 +198,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments);
Deferred &bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments);
/**
* unbind two exchanges
@ -190,7 +212,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments);
Deferred &unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments);
/**
* remove an exchange
@ -201,7 +223,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& removeExchange(const std::string &name, int flags);
Deferred &removeExchange(const std::string &name, int flags);
/**
* declare a queue
@ -212,7 +234,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags, const Table &arguments);
DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments);
/**
* Bind a queue to an exchange
@ -226,7 +248,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments);
Deferred &bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments);
/**
* Unbind a queue from an exchange
@ -239,7 +261,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& unbindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments);
Deferred &unbindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments);
/**
* Purge a queue
@ -259,7 +281,7 @@ public:
*
* });
*/
Deferred<uint32_t>& purgeQueue(const std::string &name, int flags);
DeferredDelete &purgeQueue(const std::string &name, int flags);
/**
* Remove a queue
@ -279,7 +301,7 @@ public:
*
* });
*/
Deferred<uint32_t>& removeQueue(const std::string &name, int flags);
DeferredDelete &removeQueue(const std::string &name, int flags);
/**
* Publish a message to an exchange
@ -303,7 +325,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& setQos(uint16_t prefetchCount);
Deferred &setQos(uint16_t prefetchCount);
/**
* Tell the RabbitMQ server that we're ready to consume messages
@ -345,7 +367,7 @@ public:
*
* });
*/
Deferred<const std::string&>& cancel(const std::string &tag, int flags);
DeferredCancel &cancel(const std::string &tag, int flags);
/**
* Acknowledge a message
@ -370,7 +392,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& recover(int flags);
Deferred &recover(int flags);
/**
* Close the current channel
@ -378,7 +400,7 @@ public:
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& close();
Deferred &close();
/**
* Get the channel we're working on
@ -402,9 +424,9 @@ public:
*
* @param frame frame to send
* @param message the message to trigger if the frame cannot be send at all
* @return Deferred the deferred object
*/
template <typename... Arguments>
Deferred<Arguments...>& send(const Frame &frame, const char *message);
Deferred &send(const Frame &frame, const char *message);
/**
* Report to the handler that the channel is opened
@ -412,7 +434,7 @@ public:
void reportReady()
{
// inform handler
if (_readyCallback) _readyCallback(_parent);
if (_readyCallback) _readyCallback();
}
/**
@ -436,8 +458,13 @@ public:
template <typename... Arguments>
void reportSuccess(Arguments ...parameters)
{
// report success to the relevant callback
_callbacks.reportSuccess<Arguments...>(std::forward<Arguments>(parameters)...);
// skip if there is no oldest callback
if (!_oldestCallback) return;
// report to the oldest callback, and install a new oldest callback
_oldestCallback = _oldestCallback->reportSuccess(std::forward<Arguments>(parameters)...);
// @todo destruct oldest callback
}
/**
@ -449,11 +476,19 @@ public:
// change state
_state = state_closed;
// inform handler
if (_errorCallback) _errorCallback(_parent, message);
// @todo multiple callbacks are called, this could break
// @todo should this be a std::string parameter?
// report to all waiting callbacks too
_callbacks.reportError(message);
// inform handler
if (_errorCallback) _errorCallback(message.c_str());
// skip if there is no oldest callback
if (!_oldestCallback) return;
// report to the oldest callback, and install a new oldest callback
_oldestCallback = _oldestCallback->reportError(message);
// @todo destruct oldest callback
}
/**
@ -488,7 +523,8 @@ public:
if (!_consumer) reportError("Received basic consume ok frame, but no consumer was found");
// otherwise, we now report the consumer as started
else _consumer->success(consumerTag);
// @todo look at this implementation
//else _consumer->success(consumerTag);
}
/**

View File

@ -20,47 +20,132 @@ class Callbacks;
/**
* Class definition
*/
template <typename... Arguments>
class Deferred
{
private:
protected:
/**
* Callback to execute on success
* @var SuccessCallback
*/
std::function<void(Arguments ...parameters)> _successCallback;
SuccessCallback _successCallback;
/**
* Callback to execute on failure
* @var ErrorCallback
*/
std::function<void(const std::string& error)> _errorCallback;
ErrorCallback _errorCallback;
/**
* Callback to execute either way
* @var FinalizeCallback
*/
std::function<void(const std::string& error)> _finalizeCallback;
FinalizeCallback _finalizeCallback;
/**
* Pointer to the next deferred object
* @var Deferred
*/
Deferred *_next = nullptr;
/**
* Do we already know we failed?
* @var bool
*/
bool _failed;
/**
* Indicate success
*
* @param parameters... the extra parameters relevant for this deferred handler
* @return Deferred Next deferred result
*/
void success(Arguments ...parameters) const
Deferred *reportSuccess() const
{
// execute callbacks if registered
if (_successCallback) _successCallback(parameters...);
if (_finalizeCallback) _finalizeCallback("");
if (_successCallback) _successCallback();
if (_finalizeCallback) _finalizeCallback();
// return the next deferred result
return _next;
}
/**
* Report success for queue declared messages
* @param name Name of the new queue
* @param messagecount Number of messages in the queue
* @param consumercount Number of consumers linked to the queue
* @return Deferred Next deferred result
*/
virtual Deferred *reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const
{
// this is the same as a regular success message
return reportSuccess();
}
/**
* Report success for frames that report delete operations
* @param messagecount Number of messages that were deleted
* @return Deferred
*/
virtual Deferred *reportSuccess(uint32_t messagecount) const
{
// this is the same as a regular success message
return reportSuccess();
}
/**
* Report success for frames that report cancel operations
* @param name Consumer tag that is cancelled
* @return Deferred
*/
virtual Deferred *reportSuccess(const std::string &name) const
{
// this is the same as a regular success message
return reportSuccess();
}
/**
* Indicate failure
* @param error Description of the error that occured
* @return Deferred Next deferred result
*/
Deferred *reportError(const std::string& error)
{
// from this moment on the object should be listed as failed
_failed = true;
// execute callbacks if registered
if (_errorCallback) _errorCallback(error.c_str());
if (_finalizeCallback) _finalizeCallback();
// return the next deferred result
return _next;
}
/**
* Indicate failure
*
* @param error description of the error that occured
*/
void error(const std::string& error) const
Deferred *reportError(const char *error)
{
// from this moment on the object should be listed as failed
_failed = true;
// execute callbacks if registered
if (_errorCallback) _errorCallback(error);
if (_finalizeCallback) _finalizeCallback(error);
if (_finalizeCallback) _finalizeCallback();
// return the next deferred result
return _next;
}
/**
* Add a pointer to the next deferred result
* @param deferred
*/
void add(Deferred *deferred)
{
// store pointer
_next = deferred;
}
/**
@ -71,12 +156,6 @@ private:
friend class Callbacks;
protected:
/**
* Do we already know we failed?
* @var bool
*/
bool _failed;
/**
* Protected constructor that can only be called
* from within the channel implementation
@ -87,18 +166,10 @@ protected:
public:
/**
* Deleted copy constructor
* Deleted copy and move constructors
*/
Deferred(const Deferred& that) = delete;
/**
* Move constructor
*/
Deferred(Deferred&& that) :
_successCallback(std::move(that._successCallback)),
_errorCallback(std::move(that._errorCallback)),
_finalizeCallback(std::move(that._finalizeCallback))
{}
Deferred(const Deferred &that) = delete;
Deferred(Deferred &&that) = delete;
/**
* Cast to a boolean
@ -119,10 +190,12 @@ public:
*
* @param callback the callback to execute
*/
Deferred& onSuccess(const std::function<void(Arguments ...parameters)>& callback)
Deferred &onSuccess(const SuccessCallback &callback)
{
// store callback
_successCallback = callback;
// allow chaining
return *this;
}
@ -136,10 +209,12 @@ public:
*
* @param callback the callback to execute
*/
Deferred& onError(const std::function<void(const std::string& error)>& callback)
Deferred &onError(const ErrorCallback &callback)
{
// store callback
_errorCallback = callback;
// allow chaining
return *this;
}
@ -158,10 +233,12 @@ public:
*
* @param callback the callback to execute
*/
Deferred& onFinalize(const std::function<void(const std::string& error)>& callback)
Deferred &onFinalize(const FinalizeCallback &callback)
{
// store callback
_finalizeCallback = callback;
// allow chaining
return *this;
}
};

91
include/deferredcancel.h Normal file
View File

@ -0,0 +1,91 @@
/**
* DeferredCancel.h
*
* Deferred callback for instructions that cancel a running consumer. This
* deferred object allows one to register a callback that also gets the
* consumer tag as one of its parameters.
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* We extend from the default deferred and add extra functionality
*/
class DeferredCancel : public Deferred
{
private:
/**
* Callback to execute when the instruction is completed
* @var CancelCallback
*/
CancelCallback _cancelCallback;
/**
* Report success for frames that report cancel operations
* @param name Consumer tag that is cancelled
* @return Deferred
*/
virtual Deferred *reportSuccess(const std::string &name) const override
{
// skip if no special callback was installed
if (!_cancelCallback) return Deferred::reportSuccess();
// call the callback
_cancelCallback(name);
// call finalize callback
if (_finalizeCallback) _finalizeCallback();
// return next object
return _next;
}
/**
* The channel implementation may call our
* private members and construct us
*/
friend class ChannelImpl;
friend class ConsumedMessage;
protected:
/**
* Protected constructor that can only be called
* from within the channel implementation
*
* @param boolean are we already failed?
*/
DeferredCancel(bool failed = false) : Deferred(failed) {}
public:
/**
* Register a function to be called when the cancel operation succeeded
*
* Only one callback can be registered. Successive calls
* to this function will clear callbacks registered before.
*
* @param callback the callback to execute
*/
DeferredCancel &onSuccess(const CancelCallback &callback)
{
// store callback
_cancelCallback = callback;
// allow chaining
return *this;
}
/**
* All the onSuccess() functions defined in the base class are accessible too
*/
using Deferred::onSuccess;
};
/**
* End namespace
*/
}

View File

@ -14,13 +14,14 @@ namespace AMQP {
/**
* We extend from the default deferred and add extra functionality
*/
class DeferredConsumer : public Deferred<const std::string&>
class DeferredConsumer : public Deferred
{
private:
/**
* Callback to execute when a message arrives
* @var ConsumeCallbacl
*/
std::function<void(const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)> _messageCallback;
ConsumeCallback _consumeCallback;
/**
* Process a message
@ -33,7 +34,7 @@ private:
void message(const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) const
{
// do we have a valid callback
if (_messageCallback) _messageCallback(message, deliveryTag, consumerTag, redelivered);
if (_consumeCallback) _consumeCallback(message, deliveryTag, consumerTag, redelivered);
}
/**
@ -48,7 +49,6 @@ protected:
* Protected constructor that can only be called
* from within the channel implementation
*
* @param channel the channel we operate under
* @param boolea are we already failed?
*/
DeferredConsumer(bool failed = false) : Deferred(failed) {}
@ -62,12 +62,17 @@ public:
*
* @param callback the callback to execute
*/
DeferredConsumer& onReceived(const std::function<void(const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)>& callback)
DeferredConsumer& onReceived(const ConsumeCallback &callback)
{
// store callback
_messageCallback = callback;
_consumeCallback = callback;
return *this;
}
/**
* All the onSuccess() functions defined in the base class are accessible too
*/
using Deferred::onSuccess;
};
/**

91
include/deferreddelete.h Normal file
View File

@ -0,0 +1,91 @@
/**
* DeferredDelete.h
*
* Deferred callback for instructions that delete or purge queues, and that
* want to report the number of deleted messages.
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* We extend from the default deferred and add extra functionality
*/
class DeferredDelete : public Deferred
{
private:
/**
* Callback to execute when the instruction is completed
* @var DeleteCallback
*/
DeleteCallback _deleteCallback;
/**
* Report success for queue delete and queue purge messages
* @param messagecount Number of messages that were deleted
* @return Deferred Next deferred result
*/
virtual Deferred *reportSuccess(uint32_t messagecount) const override
{
// skip if no special callback was installed
if (!_deleteCallback) return Deferred::reportSuccess();
// call the callback
_deleteCallback(messagecount);
// call finalize callback
if (_finalizeCallback) _finalizeCallback();
// return next object
return _next;
}
/**
* The channel implementation may call our
* private members and construct us
*/
friend class ChannelImpl;
friend class ConsumedMessage;
protected:
/**
* Protected constructor that can only be called
* from within the channel implementation
*
* @param boolean are we already failed?
*/
DeferredDelete(bool failed = false) : Deferred(failed) {}
public:
/**
* Register a function to be called when the queue is deleted or purged
*
* Only one callback can be registered. Successive calls
* to this function will clear callbacks registered before.
*
* @param callback the callback to execute
*/
DeferredDelete &onSuccess(const DeleteCallback &callback)
{
// store callback
_deleteCallback = callback;
// allow chaining
return *this;
}
/**
* All the onSuccess() functions defined in the base class are accessible too
*/
using Deferred::onSuccess;
};
/**
* End namespace
*/
}

91
include/deferredqueue.h Normal file
View File

@ -0,0 +1,91 @@
/**
* DeferredQueue.h
*
* Deferred callback for "declare-queue" instructions.
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* We extend from the default deferred and add extra functionality
*/
class DeferredQueue : public Deferred
{
private:
/**
* Callback to execute when the queue is declared
* @var QueueCallback
*/
QueueCallback _queueCallback;
/**
* Report success for queue declared messages
* @param name Name of the new queue
* @param messagecount Number of messages in the queue
* @param consumercount Number of consumers linked to the queue
* @return Deferred Next deferred result
*/
virtual Deferred *reportSuccess(const std::string &name, uint32_t messagecount, uint32_t consumercount) const override
{
// skip if no special callback was installed
if (!_queueCallback) return Deferred::reportSuccess();
// call the queue callback
_queueCallback(name, messagecount, consumercount);
// call finalize callback
if (_finalizeCallback) _finalizeCallback();
// return next object
return _next;
}
/**
* The channel implementation may call our
* private members and construct us
*/
friend class ChannelImpl;
friend class ConsumedMessage;
protected:
/**
* Protected constructor that can only be called
* from within the channel implementation
*
* @param boolea are we already failed?
*/
DeferredQueue(bool failed = false) : Deferred(failed) {}
public:
/**
* Register a function to be called when the queue is declared
*
* Only one callback can be registered. Successive calls
* to this function will clear callbacks registered before.
*
* @param callback the callback to execute
*/
DeferredQueue &onSuccess(const QueueCallback &callback)
{
// store callback
_queueCallback = callback;
// allow chaining
return *this;
}
/**
* All the onSuccess() functions defined in the base class are accessible too
*/
using Deferred::onSuccess;
};
/**
* End namespace
*/
}

View File

@ -85,6 +85,28 @@ ChannelImpl::~ChannelImpl()
// close the channel now
close();
// @todo destruct deferred resutls
}
/**
* Push a deferred result
* @param result The deferred object to push
* @param error Error message in case of error
*/
void ChannelImpl::push(Deferred *deferred, const char *error)
{
// do we already have an oldest?
if (!_oldestCallback) _oldestCallback = deferred;
// do we already have a newest?
if (_newestCallback) _newestCallback->add(deferred);
// store newest callback
_newestCallback = deferred;
// @todo in case of error we have to report the error with a timeout
}
/**
@ -95,10 +117,10 @@ ChannelImpl::~ChannelImpl()
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::pause()
Deferred &ChannelImpl::pause()
{
// send a channel flow frame
return send<>(ChannelFlowFrame(_id, false), "Cannot send channel flow frame");
return send(ChannelFlowFrame(_id, false), "Cannot send channel flow frame");
}
/**
@ -109,10 +131,10 @@ Deferred<>& ChannelImpl::pause()
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::resume()
Deferred &ChannelImpl::resume()
{
// send a channel flow frame
return send<>(ChannelFlowFrame(_id, true), "Cannot send channel flow frame");
return send(ChannelFlowFrame(_id, true), "Cannot send channel flow frame");
}
/**
@ -121,10 +143,10 @@ Deferred<>& ChannelImpl::resume()
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::startTransaction()
Deferred &ChannelImpl::startTransaction()
{
// send a transaction frame
return send<>(TransactionSelectFrame(_id), "Cannot send transaction start frame");
return send(TransactionSelectFrame(_id), "Cannot send transaction start frame");
}
/**
@ -133,10 +155,10 @@ Deferred<>& ChannelImpl::startTransaction()
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::commitTransaction()
Deferred &ChannelImpl::commitTransaction()
{
// send a transaction frame
return send<>(TransactionCommitFrame(_id), "Cannot send transaction commit frame");
return send(TransactionCommitFrame(_id), "Cannot send transaction commit frame");
}
/**
@ -145,10 +167,10 @@ Deferred<>& ChannelImpl::commitTransaction()
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::rollbackTransaction()
Deferred &ChannelImpl::rollbackTransaction()
{
// send a transaction frame
return send<>(TransactionRollbackFrame(_id), "Cannot send transaction commit frame");
return send(TransactionRollbackFrame(_id), "Cannot send transaction commit frame");
}
/**
@ -157,13 +179,13 @@ Deferred<>& ChannelImpl::rollbackTransaction()
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::close()
Deferred &ChannelImpl::close()
{
// channel could be dead after send operation, we need to monitor that
Monitor monitor(this);
// send a channel close frame
auto &handler = send<>(ChannelCloseFrame(_id), "Cannot send channel close frame");
auto &handler = send(ChannelCloseFrame(_id), "Cannot send channel close frame");
// was the frame sent and are we still alive?
if (handler && monitor.valid()) _state = state_closing;
@ -183,7 +205,7 @@ Deferred<>& ChannelImpl::close()
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments)
Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments)
{
// convert exchange type
std::string exchangeType;
@ -193,7 +215,7 @@ Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType t
if (type == ExchangeType::headers)exchangeType = "headers";
// send declare exchange frame
return send<>(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame");
return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame");
}
/**
@ -208,10 +230,10 @@ Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType t
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
{
// send exchange bind frame
return send<>(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame");
return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame");
}
/**
@ -226,10 +248,10 @@ Deferred<>& ChannelImpl::bindExchange(const std::string &source, const std::stri
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
{
// send exchange unbind frame
return send<>(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame");
return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame");
}
/**
@ -241,10 +263,10 @@ Deferred<>& ChannelImpl::unbindExchange(const std::string &source, const std::st
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::removeExchange(const std::string &name, int flags)
Deferred &ChannelImpl::removeExchange(const std::string &name, int flags)
{
// send delete exchange frame
return send<>(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame");
return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame");
}
/**
@ -256,10 +278,19 @@ Deferred<>& ChannelImpl::removeExchange(const std::string &name, int flags)
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<const std::string&, uint32_t, uint32_t>& ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments)
DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments)
{
// the frame to send
QueueDeclareFrame frame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments);
// send the queuedeclareframe
return send<const std::string&, uint32_t, uint32_t>(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments), "Cannot send queue declare frame");
auto *result = new DeferredQueue(send(frame));
// add the deferred result
push(result, "Cannot send queue declare frame");
// done
return *result;
}
/**
@ -274,10 +305,10 @@ Deferred<const std::string&, uint32_t, uint32_t>& ChannelImpl::declareQueue(cons
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments)
Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments)
{
// send the bind queue frame
return send<>(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame");
return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame");
}
/**
@ -291,10 +322,10 @@ Deferred<>& ChannelImpl::bindQueue(const std::string &exchangeName, const std::s
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments)
Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments)
{
// send the unbind queue frame
return send<>(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame");
return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame");
}
/**
@ -315,10 +346,19 @@ Deferred<>& ChannelImpl::unbindQueue(const std::string &exchange, const std::str
*
* });
*/
Deferred<uint32_t>& ChannelImpl::purgeQueue(const std::string &name, int flags)
DeferredDelete &ChannelImpl::purgeQueue(const std::string &name, int flags)
{
// send the queue purge frame
return send<uint32_t>(QueuePurgeFrame(_id, name, flags & nowait), "Cannot send queue purge frame");
// the frame to send
QueuePurgeFrame frame(_id, name, flags & nowait);
// send the frame, and create deferred object
auto *deferred = new DeferredDelete(send(frame));
// push to list
push(deferred, "Cannot send queue purge frame");
// done
return *deferred;
}
/**
@ -339,10 +379,19 @@ Deferred<uint32_t>& ChannelImpl::purgeQueue(const std::string &name, int flags)
*
* });
*/
Deferred<uint32_t>& ChannelImpl::removeQueue(const std::string &name, int flags)
DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
{
// send the remove queue frame
return send<uint32_t>(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait), "Cannot send remove queue frame");
// the frame to send
QueueDeleteFrame frame(_id, name, flags & ifunused, flags & ifempty, flags & nowait);
// send the frame, and create deferred object
auto *deferred = new DeferredDelete(send(frame));
// push to list
push(deferred, "Cannot send remove queue frame");
// done
return *deferred;
}
/**
@ -410,7 +459,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::setQos(uint16_t prefetchCount)
Deferred &ChannelImpl::setQos(uint16_t prefetchCount)
{
// send a qos frame
return send(BasicQosFrame(_id, prefetchCount, false), "Cannot send basic QOS frame");
@ -473,10 +522,19 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri
*
* });
*/
Deferred<const std::string&>& ChannelImpl::cancel(const std::string &tag, int flags)
DeferredCancel &ChannelImpl::cancel(const std::string &tag, int flags)
{
// send a cancel frame
return send<const std::string&>(BasicCancelFrame(_id, tag, flags & nowait), "Cannot send basic cancel frame");
// the cancel frame to send
BasicCancelFrame frame(_id, tag, flags & nowait);
// send the frame, and create deferred object
auto *deferred = new DeferredCancel(send(frame));
// push to list
push(deferred, "Cannot send basic cancel frame");
// done
return *deferred;
}
/**
@ -510,7 +568,7 @@ bool ChannelImpl::reject(uint64_t deliveryTag, int flags)
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& ChannelImpl::recover(int flags)
Deferred &ChannelImpl::recover(int flags)
{
// send a nack frame
return send(BasicRecoverFrame(_id, flags & requeue), "Cannot send basic recover frame");
@ -536,32 +594,16 @@ bool ChannelImpl::send(const Frame &frame)
* @param frame frame to send
* @param message the message to trigger if the frame cannot be send at all
*/
template <typename... Arguments>
Deferred<Arguments...>& ChannelImpl::send(const Frame &frame, const char *message)
Deferred &ChannelImpl::send(const Frame &frame, const char *message)
{
// create a new deferred handler and get a pointer to it
// note: cannot use auto here or the lambda below chokes
// when compiling under gcc 4.8
Deferred<Arguments...> *handler = &_callbacks.push_back(Deferred<Arguments...>());
// send the frame over the channel
if (!send(frame))
{
// we can immediately put the handler in failed state
handler->_failed = true;
// register an error on the deferred handler
// after a timeout, so it gets called only
// after a possible handler was installed.
_connection->_handler->setTimeout(_connection->_parent, 0, [handler, message]() {
// emit an error on the handler
handler->error(message);
});
}
// return the new handler
return *handler;
// send the frame, and create deferred object
auto *deferred = new Deferred(send(frame));
// push to list
push(deferred, message);
// done
return *deferred;
}
/**
@ -569,6 +611,8 @@ Deferred<Arguments...>& ChannelImpl::send(const Frame &frame, const char *messag
*/
void ChannelImpl::reportMessage()
{
// @todo what does this method do?
// skip if there is no message
if (!_message) return;

View File

@ -132,8 +132,8 @@ public:
// what if channel doesn't exist?
if (!channel) return false;
// report to the handler, we need to specify template arguments otherwise a string will lose const and reference
channel->reportSuccess<const std::string&, uint32_t, uint32_t>(this->name(), this->messageCount(), this->consumerCount());
// report success
channel->reportSuccess(name(), messageCount(), consumerCount());
// done
return true;