Work in progress to convert channel handler to callback system

This commit is contained in:
Martijn Otto 2014-04-08 14:42:07 +02:00
parent 400c619b01
commit 2939272bc8
22 changed files with 1137 additions and 628 deletions

View File

@ -49,6 +49,8 @@
// mid level includes // mid level includes
#include <amqpcpp/exchangetype.h> #include <amqpcpp/exchangetype.h>
#include <amqpcpp/flags.h> #include <amqpcpp/flags.h>
#include <amqpcpp/deferred.h>
#include <amqpcpp/callbacks.h>
#include <amqpcpp/channelhandler.h> #include <amqpcpp/channelhandler.h>
#include <amqpcpp/channelimpl.h> #include <amqpcpp/channelimpl.h>
#include <amqpcpp/channel.h> #include <amqpcpp/channel.h>

157
include/callbacks.h Normal file
View File

@ -0,0 +1,157 @@
/**
* Callbacks.h
*
* Class storing deferred callbacks of different type.
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class for managing deferred callbacks
*/
class Callbacks
{
private:
/**
* Different callback types supported
*/
std::tuple<
std::deque<Deferred<>>,
std::deque<Deferred<const std::string&, uint32_t, uint32_t>>,
std::deque<Deferred<uint32_t>>
> _callbacks;
/**
* If all else fails, we have gotten the wrong
* type, which is not present in the arguments.
*
* This should result in a compile error.
*/
template <class T, std::size_t N, class... Arguments>
struct getIndex
{
// if this structure is used, we went past the last argument
// and this static_assert should trigger a compile failure.
static_assert(N < sizeof...(Arguments), "Type T not found in Arguments");
// we still have to provide this member though
static constexpr std::size_t value = N;
};
/**
* This structure has one static member that represents
* the index of T in Arguments. This variant is used where U
* does equal T, so a match is found, meaning the current
* index given is the right one.
*/
template <class T, std::size_t N, class... Arguments>
struct getIndex<T, N, T, Arguments...>
{
// element is same type as we are looking for
static constexpr std::size_t value = N;
};
/**
* This structure has one static member that represents
* the index of T in Arguments. This variant is used where U
* does not equal T, so we need to look at the next member.
*/
template <class T, std::size_t N, class U, class... Arguments>
struct getIndex<T, N, U, Arguments...>
{
// current N is not correct, unroll to next element
static constexpr std::size_t value = getIndex<T, N + 1, Arguments...>::value;
};
/**
* Retrieve the list of callbacks matching the type
*
* @param tuple tuple with callbacks
*/
template <class T, class... Arguments>
T& get(std::tuple<Arguments...>& tuple)
{
// retrieve the index at which the requested callbacks can be found
constexpr std::size_t index = getIndex<T, 0, Arguments...>::value;
// retrieve the callbacks
return std::get<index>(tuple);
}
public:
/**
* Add a deferred to the available callbacks
*
* @param deferred the deferred to add
* @return reference to the inserted deferred
*/
template <typename... Arguments>
Deferred<Arguments...>& push_back(const Deferred<Arguments...>& item)
{
// retrieve the container
auto &container = get<std::deque<Deferred<Arguments...>>>(_callbacks);
// add the element
container.push_back(item);
// return reference to the new item
return container.back();
}
/**
* Report success to the relevant callback
*
* @param mixed... additional parameters
*/
template <typename... Arguments>
void reportSuccess(Arguments ...parameters)
{
// retrieve the container and element
auto &container = get<std::deque<Deferred<Arguments...>>>(_callbacks);
auto &callback = container.front();
// execute the callback
callback.success(parameters...);
// remove the executed callback
container.pop_front();
}
/**
* Report a failure
*
* @param error a description of the error
*/
template <std::size_t N = 0>
typename std::enable_if<N == std::tuple_size<decltype(_callbacks)>::value>::type
reportFailure(const std::string& message)
{}
/**
* Report a failure
*
* @param error a description of the error
*/
template <std::size_t N = 0>
typename std::enable_if<N < std::tuple_size<decltype(_callbacks)>::value>::type
reportFailure(const std::string& message)
{
// retrieve the callbacks at current index
auto &callbacks = std::get<N>(_callbacks);
// report errors to all callbacks of the current type
for (auto &callback : callbacks) callback.error(message);
// execute the next type
reportFailure<N + 1>(message);
}
};
/**
* End namespace
*/
}

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
/** /**
* Class describing a (mid-level) AMQP channel implementation * Class describing a (mid-level) AMQP channel implementation
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -36,27 +36,55 @@ public:
virtual ~Channel() {} virtual ~Channel() {}
/** /**
* Pause deliveries on a channel * Callback that is called when the channel was succesfully created.
* *
* This will stop all incoming messages * Only one callback can be registered. Calling this function multiple
* * times will remove the old callback.
* This method returns true if the request to pause has been sent to the *
* broker. This does not necessarily mean that the channel is already * @param callback the callback to execute
* paused.
*
* @return bool
*/ */
bool pause() void onReady(const std::function<void(Channel *channel)>& callback)
{
// store callback in implementation
_implementation._readyCallback = callback;
}
/**
* Callback that is called when an error occurs.
*
* Only one error callback can be registered. Calling this function
* multiple times will remove the old callback.
*
* @param callback the callback to execute
*/
void onError(const std::function<void(Channel *channel, const std::string& message)>& callback)
{
// store callback in implementation
_implementation._errorCallback = callback;
}
/**
* Pause deliveries on a channel
*
* This will stop all incoming messages
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
Deferred<>& pause()
{ {
return _implementation.pause(); return _implementation.pause();
} }
/** /**
* Resume a paused channel * Resume a paused channel
* *
* @return bool * This will resume incoming messages
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool resume() Deferred<>& resume()
{ {
return _implementation.resume(); return _implementation.resume();
} }
@ -69,199 +97,254 @@ public:
{ {
return _implementation.connected(); return _implementation.connected();
} }
/** /**
* Start a transaction * Start a transaction
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool startTransaction() Deferred<>& startTransaction()
{ {
return _implementation.startTransaction(); return _implementation.startTransaction();
} }
/** /**
* Commit the current transaction * Commit the current transaction
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool commitTransaction() Deferred<>& commitTransaction()
{ {
return _implementation.commitTransaction(); return _implementation.commitTransaction();
} }
/** /**
* Rollback the current transaction * Rollback the current transaction
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool rollbackTransaction() Deferred<>& rollbackTransaction()
{ {
return _implementation.rollbackTransaction(); return _implementation.rollbackTransaction();
} }
/** /**
* Declare an exchange * Declare an exchange
* *
* If an empty name is supplied, a name will be assigned by the server. * If an empty name is supplied, a name will be assigned by the server.
* *
* The following flags can be used for the exchange: * The following flags can be used for the exchange:
* *
* - durable exchange survives a broker restart * - durable exchange survives a broker restart
* - autodelete exchange is automatically removed when all connected queues are removed * - autodelete exchange is automatically removed when all connected queues are removed
* - passive only check if the exchange exist * - passive only check if the exchange exist
* *
* @param name name of the exchange * @param name name of the exchange
* @param type exchange type * @param type exchange type
* @param flags exchange flags * @param flags exchange flags
* @param arguments additional arguments * @param arguments additional arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); } Deferred<>& declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); }
bool declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); } Deferred<>& declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); }
bool declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); } Deferred<>& declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); }
bool declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); } Deferred<>& declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); }
bool declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); } Deferred<>& declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); }
bool declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); } Deferred<>& declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); }
/** /**
* Remove an exchange * Remove an exchange
* *
* The following flags can be used for the exchange: * The following flags can be used for the exchange:
* *
* - ifunused only delete if no queues are connected * - ifunused only delete if no queues are connected
* @param name name of the exchange to remove * @param name name of the exchange to remove
* @param flags optional flags * @param flags optional flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); } Deferred<>& removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); }
/** /**
* Bind two exchanges to each other * Bind two exchanges to each other
* *
* The following flags can be used for the exchange * The following flags can be used for the exchange
* *
* - nowait do not wait on response * - nowait do not wait on response
* *
* @param source the source exchange * @param source the source exchange
* @param target the target exchange * @param target the target exchange
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags * @param flags optional flags
* @param arguments additional bind arguments * @param arguments additional bind arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); } Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); }
bool bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); } Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); }
bool bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); } Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); }
/** /**
* Unbind two exchanges from one another * Unbind two exchanges from one another
* *
* The following flags can be used for the exchange * The following flags can be used for the exchange
* *
* - nowait do not wait on response * - nowait do not wait on response
* *
* @param target the target exchange * @param target the target exchange
* @param source the source exchange * @param source the source exchange
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags * @param flags optional flags
* @param arguments additional unbind arguments * @param arguments additional unbind arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); } Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); }
bool unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); } Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); }
bool unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); } Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); }
/** /**
* Declare a queue * Declare a queue
* *
* If you do not supply a name, a name will be assigned by the server. * If you do not supply a name, a name will be assigned by the server.
* *
* The flags can be a combination of the following values: * The flags can be a combination of the following values:
* *
* - durable queue survives a broker restart * - durable queue survives a broker restart
* - autodelete queue is automatically removed when all connected consumers are gone * - autodelete queue is automatically removed when all connected consumers are gone
* - passive only check if the queue exist * - passive only check if the queue exist
* - exclusive the queue only exists for this connection, and is automatically removed when connection is gone * - exclusive the queue only exists for this connection, and is automatically removed when connection is gone
* *
* @param name name of the queue * @param name name of the queue
* @param flags combination of flags * @param flags combination of flags
* @param arguments optional arguments * @param arguments optional arguments
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) {
*
* std::cout << "Queue '" << name << "' has been declared with " << messageCount << " messages and " << consumerCount << " consumers" << std::endl;
*
* });
*/ */
bool declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); } Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); }
bool declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); } Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); }
bool declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); } Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); }
bool declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); } Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); }
bool declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); } Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); }
bool declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); } Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); }
/** /**
* Bind a queue to an exchange * Bind a queue to an exchange
* *
* The following flags can be used for the exchange * The following flags can be used for the exchange
* *
* - nowait do not wait on response * - nowait do not wait on response
* *
* @param exchange the source exchange * @param exchange the source exchange
* @param queue the target queue * @param queue the target queue
* @param routingkey the routing key * @param routingkey the routing key
* @param flags additional flags * @param flags additional flags
* @param arguments additional bind arguments * @param arguments additional bind arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); } Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); }
bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); } Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); }
bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); } Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); }
/** /**
* Unbind a queue from an exchange * Unbind a queue from an exchange
* @param exchange the source exchange * @param exchange the source exchange
* @param queue the target queue * @param queue the target queue
* @param routingkey the routing key * @param routingkey the routing key
* @param arguments additional bind arguments * @param arguments additional bind arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); } Deferred<>& unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); }
bool unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); } Deferred<>& unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); }
/** /**
* Purge a queue * Purge a queue
* *
* The following flags can be used for the exchange * The following flags can be used for the exchange
* *
* - nowait do not wait on response * - nowait do not wait on response
* *
* @param name name of the queue * @param name name of the queue
* @param flags additional flags * @param flags additional flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
*
* std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl;
*
* });
*/ */
bool purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); } Deferred<uint32_t>& purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); }
/** /**
* Remove a queue * Remove a queue
* *
* The following flags can be used for the exchange: * The following flags can be used for the exchange:
* *
* - ifunused only delete if no consumers are connected * - ifunused only delete if no consumers are connected
* - ifempty only delete if the queue is empty * - ifempty only delete if the queue is empty
* *
* @param name name of the queue to remove * @param name name of the queue to remove
* @param flags optional flags * @param flags optional flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
*
* std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl;
*
* });
*/ */
bool removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); } Deferred<uint32_t>& removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); }
/** /**
* Publish a message to an exchange * Publish a message to an exchange
* *
* The following flags can be used * The following flags can be used
* *
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
* *
* If either of the two flags is set, and the message could not immediately * If either of the two flags is set, and the message could not immediately
* be published, the message is returned by the server to the client. If you * be published, the message is returned by the server to the client. If you
* want to catch such returned messages, you need to implement the * want to catch such returned messages, you need to implement the
* ChannelHandler::onReturned() method. * ChannelHandler::onReturned() method.
* *
* @param exchange the exchange to publish to * @param exchange the exchange to publish to
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags (see above) * @param flags optional flags (see above)
@ -275,44 +358,44 @@ public:
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, 0, Envelope(message)); } bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, 0, Envelope(message)); }
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, flags, Envelope(message, size)); } bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, flags, Envelope(message, size)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, 0, Envelope(message, size)); } bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, 0, Envelope(message, size)); }
/** /**
* Set the Quality of Service (QOS) for this channel * Set the Quality of Service (QOS) for this channel
* *
* When you consume messages, every single messages needs to be ack'ed to inform * When you consume messages, every single message needs to be ack'ed to inform
* the RabbitMQ server that is has been received. The Qos setting specifies the * the RabbitMQ server that is has been received. The Qos setting specifies the
* number of unacked messages that may exist in the client application. The server * number of unacked messages that may exist in the client application. The server
* stops delivering more messages if the number of unack'ed messages has reached * stops delivering more messages if the number of unack'ed messages has reached
* the prefetchCount * the prefetchCount
* *
* @param prefetchCount maximum number of messages to prefetch * @param prefetchCount maximum number of messages to prefetch
* @return bool whether the Qos frame is sent. * @return bool whether the Qos frame is sent.
*/ */
bool setQos(uint16_t prefetchCount) Deferred<>& setQos(uint16_t prefetchCount)
{ {
return _implementation.setQos(prefetchCount); return _implementation.setQos(prefetchCount);
} }
/** /**
* Tell the RabbitMQ server that we're ready to consume messages * Tell the RabbitMQ server that we're ready to consume messages
* *
* After this method is called, RabbitMQ starts delivering messages to the client * After this method is called, RabbitMQ starts delivering messages to the client
* application. The consume tag is a string identifier that will be passed to * application. The consume tag is a string identifier that will be passed to
* each received message, so that you can associate incoming messages with a * each received message, so that you can associate incoming messages with a
* consumer. If you do not specify a consumer tag, the server will assign one * consumer. If you do not specify a consumer tag, the server will assign one
* for you. * for you.
* *
* The following flags are supported: * The following flags are supported:
* *
* - nolocal if set, messages published on this channel are not also consumed * - nolocal if set, messages published on this channel are not also consumed
* - noack if set, consumed messages do not have to be acked, this happens automatically * - noack if set, consumed messages do not have to be acked, this happens automatically
* - exclusive request exclusive access, only this consumer can access the queue * - exclusive request exclusive access, only this consumer can access the queue
* - nowait the server does not have to send a response back that consuming is active * - nowait the server does not have to send a response back that consuming is active
* *
* The method ChannelHandler::onConsumerStarted() will be called when the * The method ChannelHandler::onConsumerStarted() will be called when the
* consumer has started (unless the nowait option was set, in which case * consumer has started (unless the nowait option was set, in which case
* no confirmation method is called) * no confirmation method is called)
* *
* @param queue the queue from which you want to consume * @param queue the queue from which you want to consume
* @param tag a consumer tag that will be associated with this consume operation * @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags * @param flags additional flags
@ -325,56 +408,56 @@ public:
bool consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); } bool consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); }
bool consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); } bool consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); }
bool consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); } bool consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); }
/** /**
* Cancel a running consume call * Cancel a running consume call
* *
* If you want to stop a running consumer, you can use this method with the consumer tag * If you want to stop a running consumer, you can use this method with the consumer tag
* *
* The following flags are supported: * The following flags are supported:
* *
* - nowait the server does not have to send a response back that the consumer has been cancelled * - nowait the server does not have to send a response back that the consumer has been cancelled
* *
* The method ChannelHandler::onConsumerStopped() will be called when the consumer * The method ChannelHandler::onConsumerStopped() will be called when the consumer
* was succesfully stopped (unless the nowait option was used, in which case no * was succesfully stopped (unless the nowait option was used, in which case no
* confirmation method is called) * confirmation method is called)
* *
* @param tag the consumer tag * @param tag the consumer tag
* @param flags optional additional flags * @param flags optional additional flags
* @return bool * @return bool
*/ */
bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); }
/** /**
* Acknoldge a received message * Acknoldge a received message
* *
* When a message is received in the ChannelHandler::onReceived() method, * When a message is received in the ChannelHandler::onReceived() method,
* you must acknoledge it so that RabbitMQ removes it from the queue (unless * you must acknoledge it so that RabbitMQ removes it from the queue (unless
* you are consuming with the noack option). This method can be used for * you are consuming with the noack option). This method can be used for
* this acknoledging. * this acknoledging.
* *
* The following flags are supported: * The following flags are supported:
* *
* - multiple acknoledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too * - multiple acknoledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too
* *
* @param deliveryTag the unique delivery tag of the message * @param deliveryTag the unique delivery tag of the message
* @param flags optional flags * @param flags optional flags
* @return bool * @return bool
*/ */
bool ack(uint64_t deliveryTag, int flags=0) { return _implementation.ack(deliveryTag, flags); } bool ack(uint64_t deliveryTag, int flags=0) { return _implementation.ack(deliveryTag, flags); }
/** /**
* Reject or nack a message * Reject or nack a message
* *
* When a message was received in the ChannelHandler::onReceived() method, * When a message was received in the ChannelHandler::onReceived() method,
* and you don't want to acknoledge it, you can also choose to reject it by * and you don't want to acknoledge it, you can also choose to reject it by
* calling this reject method. * calling this reject method.
* *
* The following flags are supported: * The following flags are supported:
* *
* - multiple reject multiple messages: all un-acked messages that were earlier delivered are unacked too * - multiple reject multiple messages: all un-acked messages that were earlier delivered are unacked too
* - requeue if set, the message is put back in the queue, otherwise it is dead-lettered/removed * - requeue if set, the message is put back in the queue, otherwise it is dead-lettered/removed
* *
* @param deliveryTag the unique delivery tag of the message * @param deliveryTag the unique delivery tag of the message
* @param flags optional flags * @param flags optional flags
* @return bool * @return bool
@ -383,27 +466,28 @@ public:
/** /**
* Recover all messages that were not yet acked * Recover all messages that were not yet acked
* *
* This method asks the server to redeliver all unacknowledged messages on a specified * This method asks the server to redeliver all unacknowledged messages on a specified
* channel. Zero or more messages may be redelivered. * channel. Zero or more messages may be redelivered.
* *
* The following flags are supported: * The following flags are supported:
* *
* - requeue if set, the server will requeue the messages, so the could also end up with at different consumer * - requeue if set, the server will requeue the messages, so the could also end up with at different consumer
* *
* @param flags * @param flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool recover(int flags = 0) { return _implementation.recover(flags); } Deferred<>& recover(int flags = 0) { return _implementation.recover(flags); }
/** /**
* Close the current channel * Close the current channel
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool close() Deferred<>& close() { return _implementation.close(); }
{
return _implementation.close();
}
/** /**
* Get the channel we're working on * Get the channel we're working on

View File

@ -22,12 +22,6 @@ namespace AMQP {
class ChannelHandler class ChannelHandler
{ {
public: public:
/**
* Method that is called when the channel was succesfully created.
* @param channel the channel that is ready
*/
virtual void onReady(Channel *channel) {}
/** /**
* An error has occured on the channel * An error has occured on the channel
* The channel is no longer usable after an error has occured on it. * The channel is no longer usable after an error has occured on it.
@ -36,108 +30,6 @@ public:
*/ */
virtual void onError(Channel *channel, const std::string &message) {} virtual void onError(Channel *channel, const std::string &message) {}
/**
* Method that is called when the channel was paused
* This is the result of a call to Channel::pause()
* @param channel the channel that is now paused
*/
virtual void onPaused(Channel *channel) {}
/**
* Method that is called when the channel was resumed
* This is the result of a call to Channel::resume()
* @param channel the channel that is no longer paused
*/
virtual void onResumed(Channel *channel) {}
/**
* Method that is called when a channel is closed
* This is the result of a call to Channel::close()
* @param channel the channel that is closed
*/
virtual void onClosed(Channel *channel) {}
/**
* Method that is called when a transaction was started
* This is the result of a call to Channel::startTransaction()
* @param channel the channel on which the transaction was started
*/
virtual void onTransactionStarted(Channel *channel) {}
/**
* Method that is called when a transaction was committed
* This is the result of a call to Channel::commitTransaction()
* @param channel the channel on which the transaction was committed
*/
virtual void onTransactionCommitted(Channel *channel) {}
/**
* Method that is called when a transaction was rolled back
* This is the result of a call to Channel::rollbackTransaction()
* @param channel the channel on which the transaction was rolled back
*/
virtual void onTransactionRolledBack(Channel *channel) {}
/**
* Method that is called when an exchange is bound
* This is the result of a call to Channel::bindExchange()
* @param channel the channel on which the exchange was bound
*/
virtual void onExchangeBound(Channel *channel) {}
/**
* Method that is called when an exchange is unbound
* This is the result of a call to Channel::unbindExchange()
* @param channel the channel on which the exchange was unbound
*/
virtual void onExchangeUnbound(Channel *channel) {}
/**
* Method that is called when an exchange is deleted
* This is the result of a call to Channel::deleteExchange()
* @param channel the channel on which the exchange was deleted
*/
virtual void onExchangeDeleted(Channel *channel) {}
/**
* Mehod that is called when an exchange is declared
* This is the result of a call to Channel::declareExchange()
* @param channel the channel on which the exchange was declared
*/
virtual void onExchangeDeclared(Channel *channel) {}
/**
* Method that is called when a queue is declared
* This is the result of a call to Channel::declareQueue()
* @param channel the channel on which the queue was declared
* @param name name of the queue
* @param messageCount number of messages in queue
* @param consumerCount number of active consumers
*/
virtual void onQueueDeclared(Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) {}
/**
* Method that is called when a queue is bound
* This is the result of a call to Channel::bindQueue()
* @param channel the channel on which the queue was bound
*/
virtual void onQueueBound(Channel *channel) {}
/**
* Method that is called when a queue is deleted
* This is the result of a call to Channel::deleteQueue()
* @param channel the channel on which the queue was deleted
* @param messageCount number of messages deleted along with the queue
*/
virtual void onQueueDeleted(Channel *channel, uint32_t messageCount) {}
/**
* Method that is called when a queue is unbound
* This is the result of a call to Channel::unbindQueue()
* @param channel the channel on which the queue was unbound
*/
virtual void onQueueUnbound(Channel *channel) {}
/** /**
* Method that is called when a queue is purged * Method that is called when a queue is purged
* This is the result of a call to Channel::purgeQueue() * This is the result of a call to Channel::purgeQueue()
@ -146,13 +38,6 @@ public:
*/ */
virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {} virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {}
/**
* Method that is called when the quality-of-service was changed
* This is the result of a call to Channel::setQos()
* @param channel the channel on which the qos was set
*/
virtual void onQosSet(Channel *channel) {}
/** /**
* Method that is called when a consumer was started * Method that is called when a consumer was started
* This is the result of a call to Channel::consume() * This is the result of a call to Channel::consume()
@ -168,7 +53,7 @@ public:
* @param tag the consumer tag * @param tag the consumer tag
*/ */
virtual void onConsumerStopped(Channel *channel, const std::string &tag) {} virtual void onConsumerStopped(Channel *channel, const std::string &tag) {}
/** /**
* Method that is called when a message has been received on a channel * Method that is called when a message has been received on a channel
* This message will be called for every message that is received after * This message will be called for every message that is received after
@ -182,7 +67,7 @@ public:
* @param redelivered is this a redelivered message? * @param redelivered is this a redelivered message?
*/ */
virtual void onReceived(Channel *channel, const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {} virtual void onReceived(Channel *channel, const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {}
/** /**
* Method that is called when a message you tried to publish was returned * Method that is called when a message you tried to publish was returned
* by the server. This only happens when the 'mandatory' or 'immediate' flag * by the server. This only happens when the 'mandatory' or 'immediate' flag
@ -193,13 +78,6 @@ public:
* @param text human readable reply reason * @param text human readable reply reason
*/ */
virtual void onReturned(Channel *channel, const Message &message, int16_t code, const std::string &text) {} virtual void onReturned(Channel *channel, const Message &message, int16_t code, const std::string &text) {}
/**
* Method that is called when the server starts recovering messages
* This is the result of a call to Channel::recover()
* @param channel the channel on which the recover method was called
*/
virtual void onRecovering(Channel *channel) {}
}; };

View File

@ -37,7 +37,28 @@ private:
* @var MyChannelHandler * @var MyChannelHandler
*/ */
ChannelHandler *_handler; ChannelHandler *_handler;
/**
* Callback when the channel is ready
*/
std::function<void(Channel *channel)> _readyCallback;
/**
* Callback when the channel errors out
*/
std::function<void(Channel *channel, const std::string& message)> _errorCallback;
/**
* The callbacks waiting to be called
*/
std::deque<Deferred<>> _callbacks;
/**
* Callbacks with additional parameters
*/
std::deque<Deferred<const std::string&, uint32_t, uint32_t>> _queueDeclaredCallbacks;
std::deque<Deferred<uint32_t>> _queueRemovedCallbacks;
/** /**
* The channel number * The channel number
* @var uint16_t * @var uint16_t
@ -53,13 +74,13 @@ private:
state_closing, state_closing,
state_closed state_closed
} _state = state_connected; } _state = state_connected;
/** /**
* Is a transaction now active? * Is a transaction now active?
* @var bool * @var bool
*/ */
bool _transaction = false; bool _transaction = false;
/** /**
* The message that is now being received * The message that is now being received
* @var MessageImpl * @var MessageImpl
@ -68,11 +89,11 @@ private:
/** /**
* Construct a channel object * Construct a channel object
* *
* Note that the constructor is private, and that the Channel class is * Note that the constructor is private, and that the Channel class is
* a friend. By doing this we ensure that nobody can instantiate this * a friend. By doing this we ensure that nobody can instantiate this
* object, and that it can thus only be used inside the library. * object, and that it can thus only be used inside the library.
* *
* @param parent the public channel object * @param parent the public channel object
* @param connection pointer to the connection * @param connection pointer to the connection
* @param handler handler that is notified on events * @param handler handler that is notified on events
@ -96,23 +117,23 @@ public:
/** /**
* Pause deliveries on a channel * Pause deliveries on a channel
* *
* This will stop all incoming messages * This will stop all incoming messages
* *
* This method returns true if the request to pause has been sent to the * This function returns a deferred handler. Callbacks can be installed
* broker. This does not necessarily mean that the channel is already * using onSuccess(), onError() and onFinalize() methods.
* paused.
*
* @return bool
*/ */
bool pause(); Deferred<>& pause();
/** /**
* Resume a paused channel * Resume a paused channel
* *
* @return bool * This will resume incoming messages
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool resume(); Deferred<>& resume();
/** /**
* Is the channel connected? * Is the channel connected?
@ -122,123 +143,170 @@ public:
{ {
return _state == state_connected; return _state == state_connected;
} }
/** /**
* Start a transaction * Start a transaction
* @return bool
*/ */
bool startTransaction(); Deferred<>& startTransaction();
/** /**
* Commit the current transaction * Commit the current transaction
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool commitTransaction(); Deferred<>& commitTransaction();
/** /**
* Rollback the current transaction * Rollback the current transaction
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool rollbackTransaction(); Deferred<>& rollbackTransaction();
/** /**
* declare an exchange * declare an exchange
*
* @param name name of the exchange to declare * @param name name of the exchange to declare
* @param type type of exchange * @param type type of exchange
* @param flags additional settings for the exchange * @param flags additional settings for the exchange
* @param arguments additional arguments * @param arguments additional arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments); Deferred<>& declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments);
/** /**
* bind two exchanges * bind two exchanges
* @param source exchange which binds to target * @param source exchange which binds to target
* @param target exchange to bind to * @param target exchange to bind to
* @param routingKey routing key * @param routingKey routing key
* @param glags additional flags * @param glags additional flags
* @param arguments additional arguments for binding * @param arguments additional arguments for binding
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments);
/** /**
* unbind two exchanges * unbind two exchanges
* @param source the source exchange * @param source the source exchange
* @param target the target exchange * @param target the target exchange
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags * @param flags optional flags
* @param arguments additional unbind arguments * @param arguments additional unbind arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); Deferred<>& unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments);
/** /**
* remove an exchange * remove an exchange
*
* @param name name of the exchange to remove * @param name name of the exchange to remove
* @param flags additional settings for deleting the exchange * @param flags additional settings for deleting the exchange
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool removeExchange(const std::string &name, int flags); Deferred<>& removeExchange(const std::string &name, int flags);
/** /**
* declare a queue * declare a queue
* @param name queue name * @param name queue name
* @param flags additional settings for the queue * @param flags additional settings for the queue
* @param arguments additional arguments * @param arguments additional arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool declareQueue(const std::string &name, int flags, const Table &arguments); Deferred<const std::string&, uint32_t, uint32_t>& declareQueue(const std::string &name, int flags, const Table &arguments);
/** /**
* Bind a queue to an exchange * Bind a queue to an exchange
*
* @param exchangeName name of the exchange to bind to * @param exchangeName name of the exchange to bind to
* @param queueName name of the queue * @param queueName name of the queue
* @param routingkey routingkey * @param routingkey routingkey
* @param flags additional flags * @param flags additional flags
* @param arguments additional arguments * @param arguments additional arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments); Deferred<>& bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments);
/** /**
* Unbind a queue from an exchange * Unbind a queue from an exchange
*
* @param exchange the source exchange * @param exchange the source exchange
* @param queue the target queue * @param queue the target queue
* @param routingkey the routing key * @param routingkey the routing key
* @param arguments additional bind arguments * @param arguments additional bind arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool unbindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments); Deferred<>& unbindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments);
/** /**
* Purge a queue * Purge a queue
* @param queue queue to purge * @param queue queue to purge
* @param flags additional flags * @param flags additional flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
*
* std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl;
*
* });
*/ */
bool purgeQueue(const std::string &name, int flags); Deferred<uint32_t>& purgeQueue(const std::string &name, int flags);
/** /**
* Remove a queue * Remove a queue
* @param queue queue to remove * @param queue queue to remove
* @param flags additional flags * @param flags additional flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
*
* std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl;
*
* });
*/ */
bool removeQueue(const std::string &name, int flags); Deferred<uint32_t>& removeQueue(const std::string &name, int flags);
/** /**
* Publish a message to an exchange * Publish a message to an exchange
* *
* The following flags can be used * The following flags can be used
* *
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
* *
* If the mandatory or immediate flag is set, and the message could not immediately * If the mandatory or immediate flag is set, and the message could not immediately
* be published, the message will be returned to the client, and will eventually * be published, the message will be returned to the client, and will eventually
* end up in your ChannelHandler::onReturned() method. * end up in your ChannelHandler::onReturned() method.
* *
* @param exchange the exchange to publish to * @param exchange the exchange to publish to
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags (see above) * @param flags optional flags (see above)
@ -247,13 +315,15 @@ public:
* @param size size of the message * @param size size of the message
*/ */
bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope); bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope);
/** /**
* Set the Quality of Service (QOS) of the entire connection * Set the Quality of Service (QOS) of the entire connection
* @param prefetchCount maximum number of messages to prefetch * @param prefetchCount maximum number of messages to prefetch
* @return bool whether the Qos frame is sent. *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool setQos(uint16_t prefetchCount); Deferred<>& setQos(uint16_t prefetchCount);
/** /**
* Tell the RabbitMQ server that we're ready to consume messages * Tell the RabbitMQ server that we're ready to consume messages
@ -264,7 +334,7 @@ public:
* @return bool * @return bool
*/ */
bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments); bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments);
/** /**
* Cancel a running consumer * Cancel a running consumer
* @param tag the consumer tag * @param tag the consumer tag
@ -288,19 +358,23 @@ public:
* @return bool * @return bool
*/ */
bool reject(uint64_t deliveryTag, int flags); bool reject(uint64_t deliveryTag, int flags);
/** /**
* Recover messages that were not yet ack'ed * Recover messages that were not yet ack'ed
* @param flags optional flags * @param flags optional flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool recover(int flags); Deferred<>& recover(int flags);
/** /**
* Close the current channel * Close the current channel
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool close(); Deferred<>& close();
/** /**
* Get the channel we're working on * Get the channel we're working on
@ -310,14 +384,34 @@ public:
{ {
return _id; return _id;
} }
/** /**
* Send a frame over the channel * Send a frame over the channel
* @param frame frame to send * @param frame frame to send
* @return bool was frame succesfully sent? * @return bool was frame succesfully sent?
*/ */
bool send(const Frame &frame); bool send(const Frame &frame);
/**
* Send a frame over the channel and
* get a deferred handler for it.
*
* @param frame frame to send
* @param message the message to trigger if the frame cannot be send at all
*/
Deferred<>& send(const Frame &frame, const char *message);
/**
* Send a frame over the channel and
* get a deferred handler for it.
*
* @param frame frame to send
* @param message the message to trigger if the frame cannot be send at all
* @param queue the queue to store the callbacks in
*/
template <typename... Arguments>
Deferred<Arguments...>& send(const Frame &frame, const char *message, std::deque<Deferred<Arguments...>>& queue);
/** /**
* Report to the handler that the channel is closed * Report to the handler that the channel is closed
*/ */
@ -325,38 +419,33 @@ public:
{ {
// change state // change state
_state = state_closed; _state = state_closed;
// inform handler // inform handler
if (_handler) _handler->onClosed(_parent); reportSuccess();
} }
/** /**
* Report to the handler that the channel is paused * Report success
*
* This function is called to report success for all
* cases where the callback does not receive any parameters
*/ */
void reportPaused() void reportSuccess()
{ {
// inform handler // report success for the oldest request
if (_handler) _handler->onPaused(_parent); _callbacks.front().success();
_callbacks.pop_front();
} }
/**
* Report to the handler that the channel is resumed
*/
void reportResumed()
{
// inform handler
if (_handler) _handler->onResumed(_parent);
}
/** /**
* Report to the handler that the channel is opened * Report to the handler that the channel is opened
*/ */
void reportReady() void reportReady()
{ {
// inform handler // inform handler
if (_handler) _handler->onReady(_parent); if (_readyCallback) _readyCallback(_parent);
} }
/** /**
* Report an error message on a channel * Report an error message on a channel
* @param message * @param message
@ -365,43 +454,16 @@ public:
{ {
// change state // change state
_state = state_closed; _state = state_closed;
// inform handler // inform handler
if (_handler) _handler->onError(_parent, message); if (_errorCallback) _errorCallback(_parent, message);
// and all waiting deferred callbacks
for (auto &deferred : _callbacks) deferred.error(message);
for (auto &deferred : _queueDeclaredCallbacks) deferred.error(message);
for (auto &deferred : _queueRemovedCallbacks) deferred.error(message);
} }
/**
* Report that the exchange is succesfully declared
*/
void reportExchangeDeclared()
{
if (_handler) _handler->onExchangeDeclared(_parent);
}
/**
* Report that the exchange is succesfully deleted
*/
void reportExchangeDeleted()
{
if (_handler) _handler->onExchangeDeleted(_parent);
}
/**
* Report that the exchange is bound
*/
void reportExchangeBound()
{
if (_handler) _handler->onExchangeBound(_parent);
}
/**
* Report that the exchange is unbound
*/
void reportExchangeUnbound()
{
if (_handler) _handler->onExchangeUnbound(_parent);
}
/** /**
* Report that the queue was succesfully declared * Report that the queue was succesfully declared
* @param queueName name of the queue which was declared * @param queueName name of the queue which was declared
@ -410,51 +472,33 @@ public:
*/ */
void reportQueueDeclared(const std::string &queueName, uint32_t messageCount, uint32_t consumerCount) void reportQueueDeclared(const std::string &queueName, uint32_t messageCount, uint32_t consumerCount)
{ {
if (_handler) _handler->onQueueDeclared(_parent, queueName, messageCount, consumerCount); // report success for the oldest queue declare callbacks
_queueDeclaredCallbacks.front().success(queueName, messageCount, consumerCount);
_queueDeclaredCallbacks.pop_front();
} }
/**
* Report that a queue was succesfully bound
*/
void reportQueueBound()
{
if (_handler) _handler->onQueueBound(_parent);
}
/**
* Report that a queue was succesfully unbound
*/
void reportQueueUnbound()
{
if (_handler) _handler->onQueueUnbound(_parent);
}
/** /**
* Report that a queue was succesfully deleted * Report that a queue was succesfully deleted
* @param messageCount number of messages left in queue, now deleted * @param messageCount number of messages left in queue, now deleted
*/ */
void reportQueueDeleted(uint32_t messageCount) void reportQueueDeleted(uint32_t messageCount)
{ {
if (_handler) _handler->onQueueDeleted(_parent, messageCount); // report success for the oldest queue remove callbacks
_queueRemovedCallbacks.front().success(messageCount);
_queueRemovedCallbacks.pop_front();
} }
/** /**
* Report that a queue was succesfully purged * Report that a queue was succesfully purged
* @param messageCount number of messages purged * @param messageCount number of messages purged
*/ */
void reportQueuePurged(uint32_t messageCount) void reportQueuePurged(uint32_t messageCount)
{ {
if (_handler) _handler->onQueuePurged(_parent, messageCount); // report success for the oldest queue remove callbacks
_queueRemovedCallbacks.front().success(messageCount);
_queueRemovedCallbacks.pop_front();
} }
/**
* Report that the qos has been set
*/
void reportQosSet()
{
if (_handler) _handler->onQosSet(_parent);
}
/** /**
* Report that a consumer has started * Report that a consumer has started
* @param tag the consumer tag * @param tag the consumer tag
@ -472,20 +516,12 @@ public:
{ {
if (_handler) _handler->onConsumerStopped(_parent, tag); if (_handler) _handler->onConsumerStopped(_parent, tag);
} }
/** /**
* Report that a message was received * Report that a message was received
*/ */
void reportMessage(); void reportMessage();
/**
* Report that the recover operation has started
*/
void reportRecovering()
{
if (_handler) _handler->onRecovering(_parent);
}
/** /**
* Create an incoming message * Create an incoming message
* @param frame * @param frame
@ -493,7 +529,7 @@ public:
*/ */
MessageImpl *message(const BasicDeliverFrame &frame); MessageImpl *message(const BasicDeliverFrame &frame);
MessageImpl *message(const BasicReturnFrame &frame); MessageImpl *message(const BasicReturnFrame &frame);
/** /**
* Retrieve the current incoming message * Retrieve the current incoming message
* @return MessageImpl * @return MessageImpl
@ -502,12 +538,12 @@ public:
{ {
return _message; return _message;
} }
/** /**
* The channel class is its friend, thus can it instantiate this object * The channel class is its friend, thus can it instantiate this object
*/ */
friend class Channel; friend class Channel;
}; };
/** /**

View File

@ -22,6 +22,24 @@ private:
*/ */
ConnectionImpl _implementation; ConnectionImpl _implementation;
/**
* Function to execute code after a certain timeout.
*
* If the timeout is 0, the code is supposed to be run
* in the next iteration of the event loop.
*
* This is a simple placeholder function that will just
* execute the code immediately, it should be overridden
* by the timeout function the used event loop has.
*
* @param timeout the amount of time to wait
* @param callback the callback to execute after the timeout
*/
std::function<void(double timeout, const std::function<void()>)> _timeoutHandler = [](double timeout, const std::function<void()>& callback) {
// execute callback immediately
callback();
};
public: public:
/** /**
* Construct an AMQP object based on full login data * Construct an AMQP object based on full login data

View File

@ -5,7 +5,7 @@
* This is the implementation of the connection - a class that can only be * This is the implementation of the connection - a class that can only be
* constructed by the connection class itselves and that has all sorts of * constructed by the connection class itselves and that has all sorts of
* methods that are only useful inside the library * methods that are only useful inside the library
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -44,13 +44,13 @@ protected:
state_closing, // connection is busy closing (we have sent the close frame) state_closing, // connection is busy closing (we have sent the close frame)
state_closed // connection is closed state_closed // connection is closed
} _state = state_protocol; } _state = state_protocol;
/** /**
* Has the close() method been called? * Has the close() method been called?
* @var bool * @var bool
*/ */
bool _closed = false; bool _closed = false;
/** /**
* All channels that are active * All channels that are active
* @var map * @var map
@ -62,13 +62,13 @@ protected:
* @var uint16_t * @var uint16_t
*/ */
uint16_t _nextFreeChannel = 1; uint16_t _nextFreeChannel = 1;
/** /**
* Max number of channels (0 for unlimited) * Max number of channels (0 for unlimited)
* @var uint16_t * @var uint16_t
*/ */
uint16_t _maxChannels = 0; uint16_t _maxChannels = 0;
/** /**
* Max frame size * Max frame size
* @var uint32_t * @var uint32_t
@ -80,19 +80,19 @@ protected:
* @var Login * @var Login
*/ */
Login _login; Login _login;
/** /**
* Vhost to connect to * Vhost to connect to
* @var string * @var string
*/ */
std::string _vhost; std::string _vhost;
/** /**
* Queued messages that should be sent after the connection has been established * Queued messages that should be sent after the connection has been established
* @var queue * @var queue
*/ */
std::queue<OutBuffer> _queue; std::queue<OutBuffer> _queue;
/** /**
* Helper method to send the close frame * Helper method to send the close frame
* Return value tells if the connection is still valid * Return value tells if the connection is still valid
@ -100,17 +100,16 @@ protected:
*/ */
bool sendClose(); bool sendClose();
private: private:
/** /**
* Construct an AMQP object based on full login data * Construct an AMQP object based on full login data
* *
* The first parameter is a handler object. This handler class is * The first parameter is a handler object. This handler class is
* an interface that should be implemented by the caller. * an interface that should be implemented by the caller.
* *
* Note that the constructor is private to ensure that nobody can construct * Note that the constructor is private to ensure that nobody can construct
* this class, only the real Connection class via a friend construct * this class, only the real Connection class via a friend construct
* *
* @param parent Parent connection object * @param parent Parent connection object
* @param handler Connection handler * @param handler Connection handler
* @param login Login data * @param login Login data
@ -132,7 +131,7 @@ public:
// must be busy doing the connection handshake, or already connected // must be busy doing the connection handshake, or already connected
return _state == state_handshake || _state == state_connected; return _state == state_handshake || _state == state_connected;
} }
/** /**
* Mark the protocol as being ok * Mark the protocol as being ok
*/ */
@ -141,7 +140,7 @@ public:
// move on to handshake state // move on to handshake state
if (_state == state_protocol) _state = state_handshake; if (_state == state_protocol) _state = state_handshake;
} }
/** /**
* Are we fully connected? * Are we fully connected?
* @return bool * @return bool
@ -151,12 +150,12 @@ public:
// state must be connected // state must be connected
return _state == state_connected; return _state == state_connected;
} }
/** /**
* Mark the connection as connected * Mark the connection as connected
*/ */
void setConnected(); void setConnected();
/** /**
* Retrieve the login data * Retrieve the login data
* @return Login * @return Login
@ -165,7 +164,7 @@ public:
{ {
return _login; return _login;
} }
/** /**
* Retrieve the vhost * Retrieve the vhost
* @return string * @return string
@ -185,7 +184,7 @@ public:
_maxChannels = channels; _maxChannels = channels;
_maxFrame = size; _maxFrame = size;
} }
/** /**
* The max frame size * The max frame size
* @return uint32_t * @return uint32_t
@ -194,7 +193,7 @@ public:
{ {
return _maxFrame; return _maxFrame;
} }
/** /**
* The max payload size for body frames * The max payload size for body frames
* @return uint32_t * @return uint32_t
@ -204,7 +203,7 @@ public:
// 8 bytes for header and end-of-frame byte // 8 bytes for header and end-of-frame byte
return _maxFrame - 8; return _maxFrame - 8;
} }
/** /**
* Add a channel to the connection, and return the channel ID that it * Add a channel to the connection, and return the channel ID that it
* is allowed to use, or 0 when no more ID's are available * is allowed to use, or 0 when no more ID's are available
@ -212,16 +211,16 @@ public:
* @return uint16_t * @return uint16_t
*/ */
uint16_t add(ChannelImpl *channel); uint16_t add(ChannelImpl *channel);
/** /**
* Remove a channel * Remove a channel
* @param channel * @param channel
*/ */
void remove(ChannelImpl *channel); void remove(ChannelImpl *channel);
/** /**
* Parse the buffer into a recognized frame * Parse the buffer into a recognized frame
* *
* Every time that data comes in on the connection, you should call this method to parse * Every time that data comes in on the connection, you should call this method to parse
* the incoming data, and let it handle by the AMQP library. This method returns the number * the incoming data, and let it handle by the AMQP library. This method returns the number
* of bytes that were processed. * of bytes that were processed.
@ -246,9 +245,9 @@ public:
/** /**
* Send a frame over the connection * Send a frame over the connection
* *
* This is an internal method that you normally do not have to call yourself * This is an internal method that you normally do not have to call yourself
* *
* @param frame the frame to send * @param frame the frame to send
* @return bool * @return bool
*/ */
@ -256,11 +255,11 @@ public:
/** /**
* Get a channel by its identifier * Get a channel by its identifier
* *
* This method only works if you had already created the channel before. * This method only works if you had already created the channel before.
* This is an internal method that you will not need if you cache the channel * This is an internal method that you will not need if you cache the channel
* object. * object.
* *
* @param number channel identifier * @param number channel identifier
* @return channel the channel object, or nullptr if not yet created * @return channel the channel object, or nullptr if not yet created
*/ */
@ -278,11 +277,11 @@ public:
{ {
// set connection state to closed // set connection state to closed
_state = state_closed; _state = state_closed;
// inform handler // inform handler
_handler->onError(_parent, message); _handler->onError(_parent, message);
} }
/** /**
* Report that the connection is closed * Report that the connection is closed
*/ */
@ -290,7 +289,7 @@ public:
{ {
// change state // change state
_state = state_closed; _state = state_closed;
// inform the handler // inform the handler
_handler->onClosed(_parent); _handler->onClosed(_parent);
} }

179
include/deferred.h Normal file
View File

@ -0,0 +1,179 @@
/**
* Deferred.h
*
* Class describing a set of actions that could
* possibly happen in the future that can be
* caught.
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
// forward declaration
class ChannelImpl;
/**
* Class definition
*/
template <typename... Arguments>
class Deferred
{
private:
/**
* The channel we operate under
*/
Channel *_channel;
/**
* Do we already know we failed?
*/
bool _failed;
/**
* Callback to execute on success
*/
std::function<void(Channel *channel, Arguments ...parameters)> _successCallback;
/**
* Callback to execute on failure
*/
std::function<void(Channel *channel, const std::string& error)> _errorCallback;
/**
* Callback to execute either way
*/
std::function<void(Channel *channel, const std::string& error)> _finalizeCallback;
/**
* The channel implementation may call our
* private members and construct us
*/
friend class ChannelImpl;
/**
* Indicate success
*
* @param parameters... the extra parameters relevant for this deferred handler
*/
void success(Arguments ...parameters)
{
// execute callbacks if registered
if (_successCallback) _successCallback(_channel, parameters...);
if (_finalizeCallback) _finalizeCallback(_channel, "");
}
/**
* Indicate failure
*
* @param error description of the error that occured
*/
void error(const std::string& error)
{
// we are now in a failed state
_failed = true;
// execute callbacks if registered
if (_errorCallback) _errorCallback(_channel, error);
if (_finalizeCallback) _finalizeCallback(_channel, error);
}
/**
* Private constructor that can only be called
* from within the channel implementation
*
* @param channel the channel we operate under
* @param boolea are we already failed?
*/
Deferred(Channel *channel, bool failed = false) :
_channel(channel),
_failed(failed)
{}
public:
/**
* Deleted copy constructor
*/
Deferred(const Deferred& that) = delete;
/**
* Move constructor
*/
Deferred(Deferred&& that) :
_successCallback(std::move(that._successCallback)),
_errorCallback(std::move(that._errorCallback)),
_finalizeCallback(std::move(that._finalizeCallback))
{}
/**
* Cast to a boolean
*/
operator bool ()
{
return !_failed;
}
/**
* Register a function to be called
* if and when the operation succesfully
* completes.
*
* Only one callback can be registered at a time.
* Successive calls to this function will clear
* callbacks registered before.
*
* @param callback the callback to execute
*/
Deferred& onSuccess(const std::function<void(Channel *channel, Arguments ...parameters)>& callback)
{
// store callback
_successCallback = callback;
return *this;
}
/**
* Register a function to be called
* if and when the operation fails.
*
* Only one callback can be registered at a time.
* Successive calls to this function will clear
* callbacks registered before.
*
* @param callback the callback to execute
*/
Deferred& onError(const std::function<void(Channel *channel, const std::string& error)>& callback)
{
// store callback
_errorCallback = callback;
return *this;
}
/**
* Register a function to be called
* if and when the operation completes
* or fails. This function will be called
* either way.
*
* In the case of success, the provided
* error parameter will be an empty string.
*
* Only one callback can be registered at at time.
* Successive calls to this function will clear
* callbacks registered before.
*
* @param callback the callback to execute
*/
Deferred& onFinalize(const std::function<void(Channel *channel, const std::string& error)>& callback)
{
// store callback
_finalizeCallback = callback;
return *this;
}
};
/**
* End namespace
*/
}

View File

@ -1,6 +1,6 @@
/** /**
* Class describing a basic QOS frame * Class describing a basic QOS frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -32,7 +32,7 @@ public:
* @param channel channel we're working on * @param channel channel we're working on
*/ */
BasicQosOKFrame(uint16_t channel) : BasicFrame(channel, 0) {} BasicQosOKFrame(uint16_t channel) : BasicFrame(channel, 0) {}
/** /**
* Constructor based on incoming data * Constructor based on incoming data
* @param frame * @param frame
@ -52,7 +52,7 @@ public:
{ {
return 11; return 11;
} }
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
@ -62,13 +62,13 @@ public:
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if (!channel) return false; if (!channel) return false;
// report // report
channel->reportQosSet(); channel->reportSuccess();
// done // done
return true; return true;
} }

View File

@ -1,6 +1,6 @@
/** /**
* Class describing a basic recover-async frame * Class describing a basic recover-async frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -57,7 +57,7 @@ public:
{ {
return 111; return 111;
} }
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
@ -67,17 +67,17 @@ public:
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if (!channel) return false; if (!channel) return false;
// report // report
channel->reportRecovering(); channel->reportSuccess();
// done // done
return true; return true;
} }
}; };

View File

@ -88,14 +88,13 @@ public:
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if (!channel) return false; if (!channel) return false;
// is the flow active? // report success for the call
if (active()) channel->reportResumed(); channel->reportSuccess();
else channel->reportPaused();
// done // done
return true; return true;
} }

View File

@ -55,13 +55,13 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler
{ {
// add the channel to the connection // add the channel to the connection
_id = _connection->add(this); _id = _connection->add(this);
// check if the id is valid // check if the id is valid
if (_id == 0) if (_id == 0)
{ {
// this is invalid // this is invalid
_state = state_closed; _state = state_closed;
// invalid id, this channel can not exist // invalid id, this channel can not exist
handler->onError(_parent, "Max number of channels reached"); handler->onError(_parent, "Max number of channels reached");
} }
@ -69,7 +69,7 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler
{ {
// busy connecting // busy connecting
_state = state_connected; _state = state_connected;
// valid id, send a channel open frame // valid id, send a channel open frame
send(ChannelOpenFrame(_id)); send(ChannelOpenFrame(_id));
} }
@ -80,106 +80,114 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler
*/ */
ChannelImpl::~ChannelImpl() ChannelImpl::~ChannelImpl()
{ {
// remove incoming message // remove incoming message
if (_message) delete _message; if (_message) delete _message;
_message = nullptr; _message = nullptr;
// remove this channel from the connection (but not if the connection is already destructed) // remove this channel from the connection (but not if the connection is already destructed)
if (_connection) _connection->remove(this); if (_connection) _connection->remove(this);
// close the channel now // close the channel now
close(); close();
} }
/** /**
* Pause deliveries on a channel * Pause deliveries on a channel
* *
* This will stop all incoming messages * This will stop all incoming messages
* *
* This method returns true if the request to pause has been sent to the * This function returns a deferred handler. Callbacks can be installed
* broker. This does not necessarily mean that the channel is already * using onSuccess(), onError() and onFinalize() methods.
* paused.
*
* @return bool
*/ */
bool ChannelImpl::pause() Deferred<>& ChannelImpl::pause()
{ {
// send a flow frame // send a channel flow frame
return send(ChannelFlowFrame(_id, false)); return send(ChannelFlowFrame(_id, false), "Cannot send channel flow frame");
} }
/** /**
* Resume a paused channel * Resume a paused channel
* *
* @return bool * This will resume incoming messages
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::resume() Deferred<>& ChannelImpl::resume()
{ {
// send a flow frame // send a channel flow frame
return send(ChannelFlowFrame(_id, true)); return send(ChannelFlowFrame(_id, true), "Cannot send channel flow frame");
} }
/** /**
* Start a transaction * Start a transaction
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::startTransaction() Deferred<>& ChannelImpl::startTransaction()
{ {
// send a flow frame // send a transaction frame
return send(TransactionSelectFrame(_id)); return send(TransactionSelectFrame(_id), "Cannot send transaction start frame");
} }
/** /**
* Commit the current transaction * Commit the current transaction
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::commitTransaction() Deferred<>& ChannelImpl::commitTransaction()
{ {
// send a flow frame // send a transaction frame
return send(TransactionCommitFrame(_id)); return send(TransactionCommitFrame(_id), "Cannot send transaction commit frame");
} }
/** /**
* Rollback the current transaction * Rollback the current transaction
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::rollbackTransaction() Deferred<>& ChannelImpl::rollbackTransaction()
{ {
// send a flow frame // send a transaction frame
return send(TransactionRollbackFrame(_id)); return send(TransactionRollbackFrame(_id), "Cannot send transaction commit frame");
} }
/** /**
* Close the current channel * Close the current channel
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::close() Deferred<>& ChannelImpl::close()
{ {
// channel could be dead after send operation, we need to monitor that // channel could be dead after send operation, we need to monitor that
Monitor monitor(this); Monitor monitor(this);
// send a flow frame // send a channel close frame
if (!send(ChannelCloseFrame(_id))) return false; auto &handler = send(ChannelCloseFrame(_id), "Cannot send channel close frame");
// leap out if channel was destructed // was the frame sent and are we still alive?
if (!monitor.valid()) return true; if (handler && monitor.valid()) _state = state_closing;
// now it is closing
_state = state_closing;
// done // done
return true; return handler;
} }
/** /**
* declare an exchange * declare an exchange
* @param name name of the exchange to declare * @param name name of the exchange to declare
* @param type type of exchange * @param type type of exchange
* @param flags additional settings for the exchange * @param flags additional settings for the exchange
* @param arguments additional arguments * @param arguments additional arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments)
{ {
// convert exchange type // convert exchange type
std::string exchangeType; std::string exchangeType;
@ -189,49 +197,58 @@ bool ChannelImpl::declareExchange(const std::string &name, ExchangeType type, in
if (type == ExchangeType::headers)exchangeType = "headers"; if (type == ExchangeType::headers)exchangeType = "headers";
// send declare exchange frame // send declare exchange frame
return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments)); return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame");
} }
/** /**
* bind an exchange * bind an exchange
*
* @param source exchange which binds to target * @param source exchange which binds to target
* @param target exchange to bind to * @param target exchange to bind to
* @param routingKey routing key * @param routingKey routing key
* @param flags additional flags * @param flags additional flags
* @param arguments additional arguments for binding * @param arguments additional arguments for binding
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) Deferred<>& ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
{ {
// send exchange bind frame // send exchange bind frame
return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments)); return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame");
} }
/** /**
* unbind two exchanges * unbind two exchanges
*
* @param source the source exchange * @param source the source exchange
* @param target the target exchange * @param target the target exchange
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags * @param flags optional flags
* @param arguments additional unbind arguments * @param arguments additional unbind arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) Deferred<>& ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
{ {
// send exchange unbind frame // send exchange unbind frame
return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments)); return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame");
} }
/** /**
* remove an exchange * remove an exchange
*
* @param name name of the exchange to remove * @param name name of the exchange to remove
* @param flags additional settings for deleting the exchange * @param flags additional settings for deleting the exchange
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::removeExchange(const std::string &name, int flags) Deferred<>& ChannelImpl::removeExchange(const std::string &name, int flags)
{ {
// send delete exchange frame // send delete exchange frame
return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait)); return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame");
} }
/** /**
@ -239,75 +256,107 @@ bool ChannelImpl::removeExchange(const std::string &name, int flags)
* @param name queue name * @param name queue name
* @param flags additional settings for the queue * @param flags additional settings for the queue
* @param arguments additional arguments * @param arguments additional arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments) Deferred<const std::string&, uint32_t, uint32_t>& ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments)
{ {
// send the queuedeclareframe // send the queuedeclareframe
return send(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments)); return send(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments), "Cannot send queue declare frame", _queueDeclaredCallbacks);
} }
/** /**
* Bind a queue to an exchange * Bind a queue to an exchange
*
* @param exchangeName name of the exchange to bind to * @param exchangeName name of the exchange to bind to
* @param queueName name of the queue * @param queueName name of the queue
* @param routingkey routingkey * @param routingkey routingkey
* @param flags additional flags * @param flags additional flags
* @param arguments additional arguments * @param arguments additional arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) Deferred<>& ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments)
{ {
// send the bind queue frame // send the bind queue frame
return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments)); return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame");
} }
/** /**
* Unbind a queue from an exchange * Unbind a queue from an exchange
*
* @param exchange the source exchange * @param exchange the source exchange
* @param queue the target queue * @param queue the target queue
* @param routingkey the routing key * @param routingkey the routing key
* @param arguments additional bind arguments * @param arguments additional bind arguments
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) Deferred<>& ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments)
{ {
// send the unbind queue frame // send the unbind queue frame
return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments)); return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame");
} }
/** /**
* Purge a queue * Purge a queue
* @param queue queue to purge * @param queue queue to purge
* @param flags additional flags * @param flags additional flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
*
* std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl;
*
* });
*/ */
bool ChannelImpl::purgeQueue(const std::string &name, int flags) Deferred<uint32_t>& ChannelImpl::purgeQueue(const std::string &name, int flags)
{ {
// send the queue purge frame // send the queue purge frame
return send(QueuePurgeFrame(_id, name, flags & nowait)); return send(QueuePurgeFrame(_id, name, flags & nowait), "Cannot send queue purge frame", _queueRemovedCallbacks);
} }
/** /**
* Remove a queue * Remove a queue
* @param queue queue to remove * @param queue queue to remove
* @param flags additional flags * @param flags additional flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*
* The onSuccess() callback that you can install should have the following signature:
*
* void myCallback(AMQP::Channel *channel, uint32_t messageCount);
*
* For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
*
* std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl;
*
* });
*/ */
bool ChannelImpl::removeQueue(const std::string &name, int flags) Deferred<uint32_t>& ChannelImpl::removeQueue(const std::string &name, int flags)
{ {
// send the remove queue frame // send the remove queue frame
return send(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait)); return send(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait), "Cannot send remove queue frame", _queueRemovedCallbacks);
} }
/** /**
* Publish a message to an exchange * Publish a message to an exchange
* *
* The following flags can be used * The following flags can be used
* *
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
* *
* @param exchange the exchange to publish to * @param exchange the exchange to publish to
* @param routingkey the routing key * @param routingkey the routing key
* @param flags optional flags (see above) * @param flags optional flags (see above)
@ -320,46 +369,46 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
// we are going to send out multiple frames, each one will trigger a call to the handler, // we are going to send out multiple frames, each one will trigger a call to the handler,
// which in turn could destruct the channel object, we need to monitor that // which in turn could destruct the channel object, we need to monitor that
Monitor monitor(this); Monitor monitor(this);
// @todo do not copy the entire buffer to individual frames // @todo do not copy the entire buffer to individual frames
// send the publish frame // send the publish frame
if (!send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate))) return false; if (!send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate))) return false;
// channel still valid? // channel still valid?
if (!monitor.valid()) return false; if (!monitor.valid()) return false;
// send header // send header
if (!send(BasicHeaderFrame(_id, envelope))) return false; if (!send(BasicHeaderFrame(_id, envelope))) return false;
// channel and connection still valid? // channel and connection still valid?
if (!monitor.valid() || !_connection) return false; if (!monitor.valid() || !_connection) return false;
// the max payload size is the max frame size minus the bytes for headers and trailer // the max payload size is the max frame size minus the bytes for headers and trailer
uint32_t maxpayload = _connection->maxPayload(); uint32_t maxpayload = _connection->maxPayload();
uint32_t bytessent = 0; uint32_t bytessent = 0;
// the buffer // the buffer
const char *data = envelope.body(); const char *data = envelope.body();
uint32_t bytesleft = envelope.bodySize(); uint32_t bytesleft = envelope.bodySize();
// split up the body in multiple frames depending on the max frame size // split up the body in multiple frames depending on the max frame size
while (bytesleft > 0) while (bytesleft > 0)
{ {
// size of this chunk // size of this chunk
uint32_t chunksize = std::min(maxpayload, bytesleft); uint32_t chunksize = std::min(maxpayload, bytesleft);
// send out a body frame // send out a body frame
if (!send(BodyFrame(_id, data + bytessent, chunksize))) return false; if (!send(BodyFrame(_id, data + bytessent, chunksize))) return false;
// channel still valid? // channel still valid?
if (!monitor.valid()) return false; if (!monitor.valid()) return false;
// update counters // update counters
bytessent += chunksize; bytessent += chunksize;
bytesleft -= chunksize; bytesleft -= chunksize;
} }
// done // done
return true; return true;
} }
@ -367,12 +416,14 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
/** /**
* Set the Quality of Service (QOS) for this channel * Set the Quality of Service (QOS) for this channel
* @param prefetchCount maximum number of messages to prefetch * @param prefetchCount maximum number of messages to prefetch
* @return bool whether the Qos frame is sent. *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::setQos(uint16_t prefetchCount) Deferred<>& ChannelImpl::setQos(uint16_t prefetchCount)
{ {
// send a qos frame // send a qos frame
return send(BasicQosFrame(_id, prefetchCount, false)); return send(BasicQosFrame(_id, prefetchCount, false), "Cannot send basic QOS frame");
} }
/** /**
@ -427,12 +478,14 @@ bool ChannelImpl::reject(uint64_t deliveryTag, int flags)
/** /**
* Recover un-acked messages * Recover un-acked messages
* @param flags optional flags * @param flags optional flags
* @return bool *
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/ */
bool ChannelImpl::recover(int flags) Deferred<>& ChannelImpl::recover(int flags)
{ {
// send a nack frame // send a nack frame
return send(BasicRecoverFrame(_id, flags & requeue)); return send(BasicRecoverFrame(_id, flags & requeue), "Cannot send basic recover frame");
} }
/** /**
@ -444,11 +497,58 @@ bool ChannelImpl::send(const Frame &frame)
{ {
// skip if channel is not connected // skip if channel is not connected
if (_state != state_connected || !_connection) return false; if (_state != state_connected || !_connection) return false;
// send to tcp connection // send to tcp connection
return _connection->send(frame); return _connection->send(frame);
} }
/**
* Send a frame over the channel and
* get a deferred handler for it.
*
* @param frame frame to send
* @param message the message to trigger if the frame cannot be send at all
*/
Deferred<>& ChannelImpl::send(const Frame &frame, const char *message)
{
// use the generic implementation
return send<>(frame, message, _callbacks);
}
/**
* Send a frame over the channel and
* get a deferred handler for it.
*
* @param frame frame to send
* @param message the message to trigger if the frame cannot be send at all
* @param queue the queue to store the callbacks in
*/
template <typename... Arguments>
Deferred<Arguments...>& ChannelImpl::send(const Frame &frame, const char *message, std::deque<Deferred<Arguments...>>& queue)
{
// create a new deferred handler and get a pointer to it
queue.push_back(Deferred<Arguments...>(_parent));
auto *handler = &queue.back();
// send the frame over the channel
if (!send(frame))
{
// we can immediately put the handler in failed state
handler->_failed = true;
// the frame could not be send
// we should register an error
// on the handler, but only after
// a timeout, so a handler can
// be attached first
// TODO
}
// return the new handler
return *handler;
}
/** /**
* Report the received message * Report the received message
*/ */
@ -456,18 +556,18 @@ void ChannelImpl::reportMessage()
{ {
// skip if there is no message // skip if there is no message
if (!_message) return; if (!_message) return;
// after the report the channel may be destructed, monitor that // after the report the channel may be destructed, monitor that
Monitor monitor(this); Monitor monitor(this);
// do we have a handler? // do we have a handler?
if (_handler) _message->report(_parent, _handler); if (_handler) _message->report(_parent, _handler);
// skip if channel was destructed // skip if channel was destructed
if (!monitor.valid()) return; if (!monitor.valid()) return;
// no longer need the message // no longer need the message
delete _message; delete _message;
_message = nullptr; _message = nullptr;
} }
@ -480,7 +580,7 @@ MessageImpl *ChannelImpl::message(const BasicDeliverFrame &frame)
{ {
// it should not be possible that a message already exists, but lets check it anyhow // it should not be possible that a message already exists, but lets check it anyhow
if (_message) delete _message; if (_message) delete _message;
// construct a message // construct a message
return _message = new ConsumedMessage(frame); return _message = new ConsumedMessage(frame);
} }
@ -494,7 +594,7 @@ MessageImpl *ChannelImpl::message(const BasicReturnFrame &frame)
{ {
// it should not be possible that a message already exists, but lets check it anyhow // it should not be possible that a message already exists, but lets check it anyhow
if (_message) delete _message; if (_message) delete _message;
// construct a message // construct a message
return _message = new ReturnedMessage(frame); return _message = new ReturnedMessage(frame);
} }

View File

@ -1,6 +1,6 @@
/** /**
* Exchangebindokframe.h * Exchangebindokframe.h
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -8,7 +8,7 @@
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/** /**
* Class definition * Class definition
*/ */
@ -29,10 +29,10 @@ protected:
public: public:
/** /**
* Constructor based on incoming data * Constructor based on incoming data
* *
* @param frame received frame to decode * @param frame received frame to decode
*/ */
ExchangeBindOKFrame(ReceivedFrame &frame) : ExchangeBindOKFrame(ReceivedFrame &frame) :
ExchangeFrame(frame) ExchangeFrame(frame)
{} {}
@ -47,12 +47,12 @@ public:
ExchangeBindOKFrame(uint16_t channel) : ExchangeBindOKFrame(uint16_t channel) :
ExchangeFrame(channel, 0) ExchangeFrame(channel, 0)
{} {}
virtual uint16_t methodID() const override virtual uint16_t methodID() const override
{ {
return 31; return 31;
} }
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
@ -62,17 +62,17 @@ public:
{ {
// check if we have a channel // check if we have a channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if(!channel) return false; if(!channel) return false;
// report to handler // report to handler
channel->reportExchangeBound(); channel->reportSuccess();
// done // done
return true; return true;
} }
}; };
// end namespace // end namespace
} }

View File

@ -1,6 +1,6 @@
/** /**
* Class describing an AMQP exchange declare ok frame * Class describing an AMQP exchange declare ok frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -55,7 +55,7 @@ public:
{ {
return 11; return 11;
} }
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
@ -65,13 +65,13 @@ public:
{ {
// we need the appropriate channel // we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if(!channel) return false; if(!channel) return false;
// report exchange declare ok // report exchange declare ok
channel->reportExchangeDeclared(); channel->reportSuccess();
// done // done
return true; return true;
} }

View File

@ -1,6 +1,6 @@
/** /**
* Class describing an AMQP exchange delete ok frame * Class describing an AMQP exchange delete ok frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -32,7 +32,7 @@ public:
* *
* @param frame received frame * @param frame received frame
*/ */
ExchangeDeleteOKFrame(ReceivedFrame &frame) : ExchangeDeleteOKFrame(ReceivedFrame &frame) :
ExchangeFrame(frame) ExchangeFrame(frame)
{} {}
@ -56,7 +56,7 @@ public:
{ {
return 21; return 21;
} }
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
@ -66,13 +66,13 @@ public:
{ {
// check if we have a channel // check if we have a channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if(!channel) return false; if(!channel) return false;
// report to handler // report to handler
channel->reportExchangeDeleted(); channel->reportSuccess();
// done // done
return true; return true;
} }

View File

@ -1,5 +1,5 @@
/** /**
* Exchangeunbindokframe.h * Exchangeunbindokframe.h
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -9,7 +9,7 @@
* Set up namespace * Set up namespace
*/ */
namespace AMQP { namespace AMQP {
/** /**
* Class definition * Class definition
*/ */
@ -30,10 +30,10 @@ protected:
public: public:
/** /**
* Constructor based on incoming data * Constructor based on incoming data
* *
* @param frame received frame to decode * @param frame received frame to decode
*/ */
ExchangeUnbindOKFrame(ReceivedFrame &frame) : ExchangeUnbindOKFrame(ReceivedFrame &frame) :
ExchangeFrame(frame) ExchangeFrame(frame)
{} {}
@ -48,12 +48,12 @@ public:
ExchangeUnbindOKFrame(uint16_t channel) : ExchangeUnbindOKFrame(uint16_t channel) :
ExchangeFrame(channel, 0) ExchangeFrame(channel, 0)
{} {}
virtual uint16_t methodID() const override virtual uint16_t methodID() const override
{ {
return 51; return 51;
} }
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
@ -63,17 +63,17 @@ public:
{ {
// check if we have a channel // check if we have a channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if(!channel) return false; if(!channel) return false;
// report to handler // report to handler
channel->reportExchangeUnbound(); channel->reportSuccess();
// done // done
return true; return true;
} }
}; };
// end namespace // end namespace
} }

View File

@ -75,13 +75,10 @@ public:
* @param connection The connection over which it was received * @param connection The connection over which it was received
* @return bool Was it succesfully processed? * @return bool Was it succesfully processed?
*/ */
virtual bool process(ConnectionImpl *connection) override virtual bool process[[ noreturn ]](ConnectionImpl *connection) override
{ {
// this is an exception // this is an exception
throw ProtocolException("unimplemented frame type " + std::to_string(type()) + " class " + std::to_string(classID()) + " method " + std::to_string(methodID())); throw ProtocolException("unimplemented frame type " + std::to_string(type()) + " class " + std::to_string(classID()) + " method " + std::to_string(methodID()));
// unreachable
return false;
} }
}; };

View File

@ -1,6 +1,6 @@
/** /**
* Class describing an AMQP queue bind ok frame * Class describing an AMQP queue bind ok frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -54,7 +54,7 @@ public:
{ {
return 21; return 21;
} }
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
@ -64,13 +64,13 @@ public:
{ {
// check if we have a channel // check if we have a channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if(!channel) return false; if(!channel) return false;
// report to handler // report to handler
channel->reportQueueBound(); channel->reportSuccess();
// done // done
return true; return true;
} }

View File

@ -1,6 +1,6 @@
/** /**
* Class describing an AMQP queue unbind ok frame * Class describing an AMQP queue unbind ok frame
* *
* @copyright 2014 Copernica BV * @copyright 2014 Copernica BV
*/ */
@ -24,7 +24,7 @@ protected:
{ {
// call base // call base
QueueFrame::fill(buffer); QueueFrame::fill(buffer);
} }
public: public:
/** /**
* Decode a queueunbindokframe from a received frame * Decode a queueunbindokframe from a received frame
@ -58,7 +58,7 @@ public:
{ {
return 51; return 51;
} }
/** /**
* Process the frame * Process the frame
* @param connection The connection over which it was received * @param connection The connection over which it was received
@ -68,13 +68,13 @@ public:
{ {
// check if we have a channel // check if we have a channel
ChannelImpl *channel = connection->channel(this->channel()); ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist // channel does not exist
if(!channel) return false; if(!channel) return false;
// report queue unbind success // report queue unbind success
channel->reportQueueUnbound(); channel->reportSuccess();
// done // done
return true; return true;
} }

View File

@ -59,6 +59,26 @@ public:
{ {
return 21; return 21;
} }
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is open
channel->reportSuccess();
// done
return true;
}
}; };
/** /**

View File

@ -58,7 +58,27 @@ public:
virtual uint16_t methodID() const override virtual uint16_t methodID() const override
{ {
return 31; return 31;
} }
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is open
channel->reportSuccess();
// done
return true;
}
}; };
/** /**

View File

@ -59,6 +59,26 @@ public:
{ {
return 11; return 11;
} }
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is open
channel->reportSuccess();
// done
return true;
}
}; };
/** /**