diff --git a/amqpcpp.h b/amqpcpp.h index 3ae5c67..af79811 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -49,6 +49,8 @@ // mid level includes #include #include +#include +#include #include #include #include diff --git a/include/callbacks.h b/include/callbacks.h new file mode 100644 index 0000000..b29bb38 --- /dev/null +++ b/include/callbacks.h @@ -0,0 +1,157 @@ +/** + * Callbacks.h + * + * Class storing deferred callbacks of different type. + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class for managing deferred callbacks + */ +class Callbacks +{ +private: + /** + * Different callback types supported + */ + std::tuple< + std::deque>, + std::deque>, + std::deque> + > _callbacks; + + /** + * If all else fails, we have gotten the wrong + * type, which is not present in the arguments. + * + * This should result in a compile error. + */ + template + struct getIndex + { + // if this structure is used, we went past the last argument + // and this static_assert should trigger a compile failure. + static_assert(N < sizeof...(Arguments), "Type T not found in Arguments"); + + // we still have to provide this member though + static constexpr std::size_t value = N; + }; + + /** + * This structure has one static member that represents + * the index of T in Arguments. This variant is used where U + * does equal T, so a match is found, meaning the current + * index given is the right one. + */ + template + struct getIndex + { + // element is same type as we are looking for + static constexpr std::size_t value = N; + }; + + /** + * This structure has one static member that represents + * the index of T in Arguments. This variant is used where U + * does not equal T, so we need to look at the next member. + */ + template + struct getIndex + { + // current N is not correct, unroll to next element + static constexpr std::size_t value = getIndex::value; + }; + + /** + * Retrieve the list of callbacks matching the type + * + * @param tuple tuple with callbacks + */ + template + T& get(std::tuple& tuple) + { + // retrieve the index at which the requested callbacks can be found + constexpr std::size_t index = getIndex::value; + + // retrieve the callbacks + return std::get(tuple); + } +public: + /** + * Add a deferred to the available callbacks + * + * @param deferred the deferred to add + * @return reference to the inserted deferred + */ + template + Deferred& push_back(const Deferred& item) + { + // retrieve the container + auto &container = get>>(_callbacks); + + // add the element + container.push_back(item); + + // return reference to the new item + return container.back(); + } + + /** + * Report success to the relevant callback + * + * @param mixed... additional parameters + */ + template + void reportSuccess(Arguments ...parameters) + { + // retrieve the container and element + auto &container = get>>(_callbacks); + auto &callback = container.front(); + + // execute the callback + callback.success(parameters...); + + // remove the executed callback + container.pop_front(); + } + + /** + * Report a failure + * + * @param error a description of the error + */ + template + typename std::enable_if::value>::type + reportFailure(const std::string& message) + {} + + /** + * Report a failure + * + * @param error a description of the error + */ + template + typename std::enable_if::value>::type + reportFailure(const std::string& message) + { + // retrieve the callbacks at current index + auto &callbacks = std::get(_callbacks); + + // report errors to all callbacks of the current type + for (auto &callback : callbacks) callback.error(message); + + // execute the next type + reportFailure(message); + } +}; + +/** + * End namespace + */ +} diff --git a/include/channel.h b/include/channel.h index d9dae3e..8a2550a 100644 --- a/include/channel.h +++ b/include/channel.h @@ -1,7 +1,7 @@ #pragma once /** * Class describing a (mid-level) AMQP channel implementation - * + * * @copyright 2014 Copernica BV */ @@ -36,27 +36,55 @@ public: virtual ~Channel() {} /** - * Pause deliveries on a channel - * - * This will stop all incoming messages - * - * This method returns true if the request to pause has been sent to the - * broker. This does not necessarily mean that the channel is already - * paused. - * - * @return bool + * Callback that is called when the channel was succesfully created. + * + * Only one callback can be registered. Calling this function multiple + * times will remove the old callback. + * + * @param callback the callback to execute */ - bool pause() + void onReady(const std::function& callback) + { + // store callback in implementation + _implementation._readyCallback = callback; + } + + /** + * Callback that is called when an error occurs. + * + * Only one error callback can be registered. Calling this function + * multiple times will remove the old callback. + * + * @param callback the callback to execute + */ + void onError(const std::function& callback) + { + // store callback in implementation + _implementation._errorCallback = callback; + } + + /** + * Pause deliveries on a channel + * + * This will stop all incoming messages + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + */ + Deferred<>& pause() { return _implementation.pause(); } - + /** * Resume a paused channel - * - * @return bool + * + * This will resume incoming messages + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool resume() + Deferred<>& resume() { return _implementation.resume(); } @@ -69,199 +97,254 @@ public: { return _implementation.connected(); } - + /** * Start a transaction - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool startTransaction() + Deferred<>& startTransaction() { return _implementation.startTransaction(); } - + /** * Commit the current transaction - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool commitTransaction() + Deferred<>& commitTransaction() { return _implementation.commitTransaction(); } - + /** * Rollback the current transaction - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool rollbackTransaction() + Deferred<>& rollbackTransaction() { return _implementation.rollbackTransaction(); } - + /** * Declare an exchange - * + * * If an empty name is supplied, a name will be assigned by the server. - * + * * The following flags can be used for the exchange: - * + * * - durable exchange survives a broker restart * - autodelete exchange is automatically removed when all connected queues are removed * - passive only check if the exchange exist - * + * * @param name name of the exchange * @param type exchange type * @param flags exchange flags * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); } - bool declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); } - bool declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); } - bool declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); } - bool declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); } - bool declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); } - + Deferred<>& declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(name, type, flags, arguments); } + Deferred<>& declareExchange(const std::string &name, ExchangeType type, const Table &arguments) { return _implementation.declareExchange(name, type, 0, arguments); } + Deferred<>& declareExchange(const std::string &name, ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(name, type, flags, Table()); } + Deferred<>& declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation.declareExchange(std::string(), type, flags, arguments); } + Deferred<>& declareExchange(ExchangeType type, const Table &arguments) { return _implementation.declareExchange(std::string(), type, 0, arguments); } + Deferred<>& declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation.declareExchange(std::string(), type, flags, Table()); } + /** * Remove an exchange - * + * * The following flags can be used for the exchange: - * + * * - ifunused only delete if no queues are connected * @param name name of the exchange to remove * @param flags optional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); } - + Deferred<>& removeExchange(const std::string &name, int flags = 0) { return _implementation.removeExchange(name, flags); } + /** * Bind two exchanges to each other - * + * * The following flags can be used for the exchange - * + * * - nowait do not wait on response - * + * * @param source the source exchange * @param target the target exchange * @param routingkey the routing key * @param flags optional flags * @param arguments additional bind arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); } - bool bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); } - bool bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); } - + Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, flags, arguments); } + Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, const Table &arguments) { return _implementation.bindExchange(source, target, routingkey, 0, arguments); } + Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags = 0) { return _implementation.bindExchange(source, target, routingkey, flags, Table()); } + /** * Unbind two exchanges from one another - * + * * The following flags can be used for the exchange - * + * * - nowait do not wait on response - * + * * @param target the target exchange * @param source the source exchange * @param routingkey the routing key * @param flags optional flags * @param arguments additional unbind arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); } - bool unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); } - bool unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); } - + Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, flags, arguments); } + Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, const Table &arguments) { return _implementation.unbindExchange(target, source, routingkey, 0, arguments); } + Deferred<>& unbindExchange(const std::string &target, const std::string &source, const std::string &routingkey, int flags = 0) { return _implementation.unbindExchange(target, source, routingkey, flags, Table()); } + /** * Declare a queue - * + * * If you do not supply a name, a name will be assigned by the server. - * + * * The flags can be a combination of the following values: - * + * * - durable queue survives a broker restart * - autodelete queue is automatically removed when all connected consumers are gone * - passive only check if the queue exist * - exclusive the queue only exists for this connection, and is automatically removed when connection is gone - * + * * @param name name of the queue * @param flags combination of flags * @param arguments optional arguments + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) { + * + * std::cout << "Queue '" << name << "' has been declared with " << messageCount << " messages and " << consumerCount << " consumers" << std::endl; + * + * }); */ - bool declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); } - bool declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); } - bool declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); } - bool declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); } - bool declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); } - bool declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); } + Deferred& declareQueue(const std::string &name, int flags, const Table &arguments) { return _implementation.declareQueue(name, flags, arguments); } + Deferred& declareQueue(const std::string &name, const Table &arguments) { return _implementation.declareQueue(name, 0, arguments); } + Deferred& declareQueue(const std::string &name, int flags = 0) { return _implementation.declareQueue(name, flags, Table()); } + Deferred& declareQueue(int flags, const Table &arguments) { return _implementation.declareQueue(std::string(), flags, arguments); } + Deferred& declareQueue(const Table &arguments) { return _implementation.declareQueue(std::string(), 0, arguments); } + Deferred& declareQueue(int flags = 0) { return _implementation.declareQueue(std::string(), flags, Table()); } /** * Bind a queue to an exchange - * + * * The following flags can be used for the exchange - * + * * - nowait do not wait on response - * + * * @param exchange the source exchange * @param queue the target queue * @param routingkey the routing key * @param flags additional flags * @param arguments additional bind arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); } - bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); } - bool bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); } - + Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, flags, arguments); } + Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.bindQueue(exchange, queue, routingkey, 0, arguments); } + Deferred<>& bindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, int flags = 0) { return _implementation.bindQueue(exchange, queue, routingkey, flags, Table()); } + /** * Unbind a queue from an exchange * @param exchange the source exchange * @param queue the target queue * @param routingkey the routing key * @param arguments additional bind arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); } - bool unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); } - + Deferred<>& unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { return _implementation.unbindQueue(exchange, queue, routingkey, arguments); } + Deferred<>& unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey) { return _implementation.unbindQueue(exchange, queue, routingkey, Table()); } + /** * Purge a queue - * + * * The following flags can be used for the exchange - * + * * - nowait do not wait on response - * + * * @param name name of the queue * @param flags additional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, uint32_t messageCount); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) { + * + * std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl; + * + * }); */ - bool purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); } - + Deferred& purgeQueue(const std::string &name, int flags = 0){ return _implementation.purgeQueue(name, flags); } + /** * Remove a queue - * + * * The following flags can be used for the exchange: - * + * * - ifunused only delete if no consumers are connected * - ifempty only delete if the queue is empty * * @param name name of the queue to remove * @param flags optional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, uint32_t messageCount); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) { + * + * std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl; + * + * }); */ - bool removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); } - + Deferred& removeQueue(const std::string &name, int flags = 0) { return _implementation.removeQueue(name, flags); } + /** * Publish a message to an exchange - * + * * The following flags can be used - * + * * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method - * + * * If either of the two flags is set, and the message could not immediately * be published, the message is returned by the server to the client. If you - * want to catch such returned messages, you need to implement the + * want to catch such returned messages, you need to implement the * ChannelHandler::onReturned() method. - * + * * @param exchange the exchange to publish to * @param routingkey the routing key * @param flags optional flags (see above) @@ -275,44 +358,44 @@ public: bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, 0, Envelope(message)); } bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, flags, Envelope(message, size)); } bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, 0, Envelope(message, size)); } - + /** * Set the Quality of Service (QOS) for this channel - * - * When you consume messages, every single messages needs to be ack'ed to inform + * + * When you consume messages, every single message needs to be ack'ed to inform * the RabbitMQ server that is has been received. The Qos setting specifies the * number of unacked messages that may exist in the client application. The server * stops delivering more messages if the number of unack'ed messages has reached * the prefetchCount - * + * * @param prefetchCount maximum number of messages to prefetch * @return bool whether the Qos frame is sent. */ - bool setQos(uint16_t prefetchCount) + Deferred<>& setQos(uint16_t prefetchCount) { return _implementation.setQos(prefetchCount); } - + /** * Tell the RabbitMQ server that we're ready to consume messages - * + * * After this method is called, RabbitMQ starts delivering messages to the client * application. The consume tag is a string identifier that will be passed to - * each received message, so that you can associate incoming messages with a + * each received message, so that you can associate incoming messages with a * consumer. If you do not specify a consumer tag, the server will assign one * for you. - * + * * The following flags are supported: - * + * * - nolocal if set, messages published on this channel are not also consumed * - noack if set, consumed messages do not have to be acked, this happens automatically * - exclusive request exclusive access, only this consumer can access the queue * - nowait the server does not have to send a response back that consuming is active - * - * The method ChannelHandler::onConsumerStarted() will be called when the + * + * The method ChannelHandler::onConsumerStarted() will be called when the * consumer has started (unless the nowait option was set, in which case * no confirmation method is called) - * + * * @param queue the queue from which you want to consume * @param tag a consumer tag that will be associated with this consume operation * @param flags additional flags @@ -325,56 +408,56 @@ public: bool consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); } bool consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); } bool consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); } - + /** * Cancel a running consume call - * + * * If you want to stop a running consumer, you can use this method with the consumer tag - * + * * The following flags are supported: - * + * * - nowait the server does not have to send a response back that the consumer has been cancelled - * + * * The method ChannelHandler::onConsumerStopped() will be called when the consumer * was succesfully stopped (unless the nowait option was used, in which case no * confirmation method is called) - * + * * @param tag the consumer tag * @param flags optional additional flags * @return bool */ bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } - + /** * Acknoldge a received message - * + * * When a message is received in the ChannelHandler::onReceived() method, * you must acknoledge it so that RabbitMQ removes it from the queue (unless * you are consuming with the noack option). This method can be used for * this acknoledging. - * + * * The following flags are supported: - * + * * - multiple acknoledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too - * + * * @param deliveryTag the unique delivery tag of the message * @param flags optional flags * @return bool */ bool ack(uint64_t deliveryTag, int flags=0) { return _implementation.ack(deliveryTag, flags); } - + /** * Reject or nack a message - * + * * When a message was received in the ChannelHandler::onReceived() method, * and you don't want to acknoledge it, you can also choose to reject it by - * calling this reject method. - * + * calling this reject method. + * * The following flags are supported: - * + * * - multiple reject multiple messages: all un-acked messages that were earlier delivered are unacked too * - requeue if set, the message is put back in the queue, otherwise it is dead-lettered/removed - * + * * @param deliveryTag the unique delivery tag of the message * @param flags optional flags * @return bool @@ -383,27 +466,28 @@ public: /** * Recover all messages that were not yet acked - * - * This method asks the server to redeliver all unacknowledged messages on a specified + * + * This method asks the server to redeliver all unacknowledged messages on a specified * channel. Zero or more messages may be redelivered. - * + * * The following flags are supported: - * + * * - requeue if set, the server will requeue the messages, so the could also end up with at different consumer - * + * * @param flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool recover(int flags = 0) { return _implementation.recover(flags); } + Deferred<>& recover(int flags = 0) { return _implementation.recover(flags); } /** * Close the current channel - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool close() - { - return _implementation.close(); - } + Deferred<>& close() { return _implementation.close(); } /** * Get the channel we're working on diff --git a/include/channelhandler.h b/include/channelhandler.h index c0d3438..2ad468f 100644 --- a/include/channelhandler.h +++ b/include/channelhandler.h @@ -22,12 +22,6 @@ namespace AMQP { class ChannelHandler { public: - /** - * Method that is called when the channel was succesfully created. - * @param channel the channel that is ready - */ - virtual void onReady(Channel *channel) {} - /** * An error has occured on the channel * The channel is no longer usable after an error has occured on it. @@ -36,108 +30,6 @@ public: */ virtual void onError(Channel *channel, const std::string &message) {} - /** - * Method that is called when the channel was paused - * This is the result of a call to Channel::pause() - * @param channel the channel that is now paused - */ - virtual void onPaused(Channel *channel) {} - - /** - * Method that is called when the channel was resumed - * This is the result of a call to Channel::resume() - * @param channel the channel that is no longer paused - */ - virtual void onResumed(Channel *channel) {} - - /** - * Method that is called when a channel is closed - * This is the result of a call to Channel::close() - * @param channel the channel that is closed - */ - virtual void onClosed(Channel *channel) {} - - /** - * Method that is called when a transaction was started - * This is the result of a call to Channel::startTransaction() - * @param channel the channel on which the transaction was started - */ - virtual void onTransactionStarted(Channel *channel) {} - - /** - * Method that is called when a transaction was committed - * This is the result of a call to Channel::commitTransaction() - * @param channel the channel on which the transaction was committed - */ - virtual void onTransactionCommitted(Channel *channel) {} - - /** - * Method that is called when a transaction was rolled back - * This is the result of a call to Channel::rollbackTransaction() - * @param channel the channel on which the transaction was rolled back - */ - virtual void onTransactionRolledBack(Channel *channel) {} - - /** - * Method that is called when an exchange is bound - * This is the result of a call to Channel::bindExchange() - * @param channel the channel on which the exchange was bound - */ - virtual void onExchangeBound(Channel *channel) {} - - /** - * Method that is called when an exchange is unbound - * This is the result of a call to Channel::unbindExchange() - * @param channel the channel on which the exchange was unbound - */ - virtual void onExchangeUnbound(Channel *channel) {} - - /** - * Method that is called when an exchange is deleted - * This is the result of a call to Channel::deleteExchange() - * @param channel the channel on which the exchange was deleted - */ - virtual void onExchangeDeleted(Channel *channel) {} - - /** - * Mehod that is called when an exchange is declared - * This is the result of a call to Channel::declareExchange() - * @param channel the channel on which the exchange was declared - */ - virtual void onExchangeDeclared(Channel *channel) {} - - /** - * Method that is called when a queue is declared - * This is the result of a call to Channel::declareQueue() - * @param channel the channel on which the queue was declared - * @param name name of the queue - * @param messageCount number of messages in queue - * @param consumerCount number of active consumers - */ - virtual void onQueueDeclared(Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) {} - - /** - * Method that is called when a queue is bound - * This is the result of a call to Channel::bindQueue() - * @param channel the channel on which the queue was bound - */ - virtual void onQueueBound(Channel *channel) {} - - /** - * Method that is called when a queue is deleted - * This is the result of a call to Channel::deleteQueue() - * @param channel the channel on which the queue was deleted - * @param messageCount number of messages deleted along with the queue - */ - virtual void onQueueDeleted(Channel *channel, uint32_t messageCount) {} - - /** - * Method that is called when a queue is unbound - * This is the result of a call to Channel::unbindQueue() - * @param channel the channel on which the queue was unbound - */ - virtual void onQueueUnbound(Channel *channel) {} - /** * Method that is called when a queue is purged * This is the result of a call to Channel::purgeQueue() @@ -146,13 +38,6 @@ public: */ virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {} - /** - * Method that is called when the quality-of-service was changed - * This is the result of a call to Channel::setQos() - * @param channel the channel on which the qos was set - */ - virtual void onQosSet(Channel *channel) {} - /** * Method that is called when a consumer was started * This is the result of a call to Channel::consume() @@ -168,7 +53,7 @@ public: * @param tag the consumer tag */ virtual void onConsumerStopped(Channel *channel, const std::string &tag) {} - + /** * Method that is called when a message has been received on a channel * This message will be called for every message that is received after @@ -182,7 +67,7 @@ public: * @param redelivered is this a redelivered message? */ virtual void onReceived(Channel *channel, const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {} - + /** * Method that is called when a message you tried to publish was returned * by the server. This only happens when the 'mandatory' or 'immediate' flag @@ -193,13 +78,6 @@ public: * @param text human readable reply reason */ virtual void onReturned(Channel *channel, const Message &message, int16_t code, const std::string &text) {} - - /** - * Method that is called when the server starts recovering messages - * This is the result of a call to Channel::recover() - * @param channel the channel on which the recover method was called - */ - virtual void onRecovering(Channel *channel) {} }; diff --git a/include/channelimpl.h b/include/channelimpl.h index 77e4f16..6651e9b 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -37,7 +37,28 @@ private: * @var MyChannelHandler */ ChannelHandler *_handler; - + + /** + * Callback when the channel is ready + */ + std::function _readyCallback; + + /** + * Callback when the channel errors out + */ + std::function _errorCallback; + + /** + * The callbacks waiting to be called + */ + std::deque> _callbacks; + + /** + * Callbacks with additional parameters + */ + std::deque> _queueDeclaredCallbacks; + std::deque> _queueRemovedCallbacks; + /** * The channel number * @var uint16_t @@ -53,13 +74,13 @@ private: state_closing, state_closed } _state = state_connected; - + /** * Is a transaction now active? * @var bool */ bool _transaction = false; - + /** * The message that is now being received * @var MessageImpl @@ -68,11 +89,11 @@ private: /** * Construct a channel object - * + * * Note that the constructor is private, and that the Channel class is * a friend. By doing this we ensure that nobody can instantiate this * object, and that it can thus only be used inside the library. - * + * * @param parent the public channel object * @param connection pointer to the connection * @param handler handler that is notified on events @@ -96,23 +117,23 @@ public: /** * Pause deliveries on a channel - * + * * This will stop all incoming messages - * - * This method returns true if the request to pause has been sent to the - * broker. This does not necessarily mean that the channel is already - * paused. - * - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool pause(); - + Deferred<>& pause(); + /** * Resume a paused channel - * - * @return bool + * + * This will resume incoming messages + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool resume(); + Deferred<>& resume(); /** * Is the channel connected? @@ -122,123 +143,170 @@ public: { return _state == state_connected; } - + /** * Start a transaction - * @return bool */ - bool startTransaction(); - + Deferred<>& startTransaction(); + /** * Commit the current transaction - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool commitTransaction(); - + Deferred<>& commitTransaction(); + /** * Rollback the current transaction - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool rollbackTransaction(); - + Deferred<>& rollbackTransaction(); + /** * declare an exchange + * * @param name name of the exchange to declare * @param type type of exchange * @param flags additional settings for the exchange * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments); - + Deferred<>& declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments); + /** * bind two exchanges + * @param source exchange which binds to target * @param target exchange to bind to * @param routingKey routing key * @param glags additional flags * @param arguments additional arguments for binding - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); - + Deferred<>& bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); + /** * unbind two exchanges + * @param source the source exchange * @param target the target exchange * @param routingkey the routing key * @param flags optional flags * @param arguments additional unbind arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); - + Deferred<>& unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments); + /** * remove an exchange + * * @param name name of the exchange to remove * @param flags additional settings for deleting the exchange - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool removeExchange(const std::string &name, int flags); - + Deferred<>& removeExchange(const std::string &name, int flags); + /** * declare a queue * @param name queue name * @param flags additional settings for the queue * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool declareQueue(const std::string &name, int flags, const Table &arguments); - + Deferred& declareQueue(const std::string &name, int flags, const Table &arguments); + /** * Bind a queue to an exchange + * * @param exchangeName name of the exchange to bind to * @param queueName name of the queue * @param routingkey routingkey * @param flags additional flags * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments); + Deferred<>& bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments); /** * Unbind a queue from an exchange + * * @param exchange the source exchange * @param queue the target queue * @param routingkey the routing key * @param arguments additional bind arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool unbindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments); - + Deferred<>& unbindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, const Table &arguments); + /** * Purge a queue * @param queue queue to purge * @param flags additional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, uint32_t messageCount); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) { + * + * std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl; + * + * }); */ - bool purgeQueue(const std::string &name, int flags); - + Deferred& purgeQueue(const std::string &name, int flags); + /** * Remove a queue * @param queue queue to remove * @param flags additional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, uint32_t messageCount); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) { + * + * std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl; + * + * }); */ - bool removeQueue(const std::string &name, int flags); + Deferred& removeQueue(const std::string &name, int flags); /** * Publish a message to an exchange - * + * * The following flags can be used - * + * * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method - * + * * If the mandatory or immediate flag is set, and the message could not immediately * be published, the message will be returned to the client, and will eventually * end up in your ChannelHandler::onReturned() method. - * + * * @param exchange the exchange to publish to * @param routingkey the routing key * @param flags optional flags (see above) @@ -247,13 +315,15 @@ public: * @param size size of the message */ bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope); - + /** * Set the Quality of Service (QOS) of the entire connection * @param prefetchCount maximum number of messages to prefetch - * @return bool whether the Qos frame is sent. + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool setQos(uint16_t prefetchCount); + Deferred<>& setQos(uint16_t prefetchCount); /** * Tell the RabbitMQ server that we're ready to consume messages @@ -264,7 +334,7 @@ public: * @return bool */ bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments); - + /** * Cancel a running consumer * @param tag the consumer tag @@ -288,19 +358,23 @@ public: * @return bool */ bool reject(uint64_t deliveryTag, int flags); - + /** * Recover messages that were not yet ack'ed * @param flags optional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool recover(int flags); - + Deferred<>& recover(int flags); + /** * Close the current channel - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ - bool close(); + Deferred<>& close(); /** * Get the channel we're working on @@ -310,14 +384,34 @@ public: { return _id; } - + /** * Send a frame over the channel * @param frame frame to send * @return bool was frame succesfully sent? */ bool send(const Frame &frame); - + + /** + * Send a frame over the channel and + * get a deferred handler for it. + * + * @param frame frame to send + * @param message the message to trigger if the frame cannot be send at all + */ + Deferred<>& send(const Frame &frame, const char *message); + + /** + * Send a frame over the channel and + * get a deferred handler for it. + * + * @param frame frame to send + * @param message the message to trigger if the frame cannot be send at all + * @param queue the queue to store the callbacks in + */ + template + Deferred& send(const Frame &frame, const char *message, std::deque>& queue); + /** * Report to the handler that the channel is closed */ @@ -325,38 +419,33 @@ public: { // change state _state = state_closed; - + // inform handler - if (_handler) _handler->onClosed(_parent); + reportSuccess(); } - + /** - * Report to the handler that the channel is paused + * Report success + * + * This function is called to report success for all + * cases where the callback does not receive any parameters */ - void reportPaused() + void reportSuccess() { - // inform handler - if (_handler) _handler->onPaused(_parent); + // report success for the oldest request + _callbacks.front().success(); + _callbacks.pop_front(); } - - /** - * Report to the handler that the channel is resumed - */ - void reportResumed() - { - // inform handler - if (_handler) _handler->onResumed(_parent); - } - + /** * Report to the handler that the channel is opened */ void reportReady() { // inform handler - if (_handler) _handler->onReady(_parent); + if (_readyCallback) _readyCallback(_parent); } - + /** * Report an error message on a channel * @param message @@ -365,43 +454,16 @@ public: { // change state _state = state_closed; - + // inform handler - if (_handler) _handler->onError(_parent, message); + if (_errorCallback) _errorCallback(_parent, message); + + // and all waiting deferred callbacks + for (auto &deferred : _callbacks) deferred.error(message); + for (auto &deferred : _queueDeclaredCallbacks) deferred.error(message); + for (auto &deferred : _queueRemovedCallbacks) deferred.error(message); } - /** - * Report that the exchange is succesfully declared - */ - void reportExchangeDeclared() - { - if (_handler) _handler->onExchangeDeclared(_parent); - } - - /** - * Report that the exchange is succesfully deleted - */ - void reportExchangeDeleted() - { - if (_handler) _handler->onExchangeDeleted(_parent); - } - - /** - * Report that the exchange is bound - */ - void reportExchangeBound() - { - if (_handler) _handler->onExchangeBound(_parent); - } - - /** - * Report that the exchange is unbound - */ - void reportExchangeUnbound() - { - if (_handler) _handler->onExchangeUnbound(_parent); - } - /** * Report that the queue was succesfully declared * @param queueName name of the queue which was declared @@ -410,51 +472,33 @@ public: */ void reportQueueDeclared(const std::string &queueName, uint32_t messageCount, uint32_t consumerCount) { - if (_handler) _handler->onQueueDeclared(_parent, queueName, messageCount, consumerCount); + // report success for the oldest queue declare callbacks + _queueDeclaredCallbacks.front().success(queueName, messageCount, consumerCount); + _queueDeclaredCallbacks.pop_front(); } - - /** - * Report that a queue was succesfully bound - */ - void reportQueueBound() - { - if (_handler) _handler->onQueueBound(_parent); - } - - /** - * Report that a queue was succesfully unbound - */ - void reportQueueUnbound() - { - if (_handler) _handler->onQueueUnbound(_parent); - } - + /** * Report that a queue was succesfully deleted * @param messageCount number of messages left in queue, now deleted */ void reportQueueDeleted(uint32_t messageCount) { - if (_handler) _handler->onQueueDeleted(_parent, messageCount); + // report success for the oldest queue remove callbacks + _queueRemovedCallbacks.front().success(messageCount); + _queueRemovedCallbacks.pop_front(); } - + /** * Report that a queue was succesfully purged * @param messageCount number of messages purged */ void reportQueuePurged(uint32_t messageCount) { - if (_handler) _handler->onQueuePurged(_parent, messageCount); + // report success for the oldest queue remove callbacks + _queueRemovedCallbacks.front().success(messageCount); + _queueRemovedCallbacks.pop_front(); } - - /** - * Report that the qos has been set - */ - void reportQosSet() - { - if (_handler) _handler->onQosSet(_parent); - } - + /** * Report that a consumer has started * @param tag the consumer tag @@ -472,20 +516,12 @@ public: { if (_handler) _handler->onConsumerStopped(_parent, tag); } - + /** * Report that a message was received */ void reportMessage(); - /** - * Report that the recover operation has started - */ - void reportRecovering() - { - if (_handler) _handler->onRecovering(_parent); - } - /** * Create an incoming message * @param frame @@ -493,7 +529,7 @@ public: */ MessageImpl *message(const BasicDeliverFrame &frame); MessageImpl *message(const BasicReturnFrame &frame); - + /** * Retrieve the current incoming message * @return MessageImpl @@ -502,12 +538,12 @@ public: { return _message; } - + /** * The channel class is its friend, thus can it instantiate this object */ friend class Channel; - + }; /** diff --git a/include/connection.h b/include/connection.h index a67e6ad..a5fed2b 100644 --- a/include/connection.h +++ b/include/connection.h @@ -22,6 +22,24 @@ private: */ ConnectionImpl _implementation; + /** + * Function to execute code after a certain timeout. + * + * If the timeout is 0, the code is supposed to be run + * in the next iteration of the event loop. + * + * This is a simple placeholder function that will just + * execute the code immediately, it should be overridden + * by the timeout function the used event loop has. + * + * @param timeout the amount of time to wait + * @param callback the callback to execute after the timeout + */ + std::function)> _timeoutHandler = [](double timeout, const std::function& callback) { + // execute callback immediately + callback(); + }; + public: /** * Construct an AMQP object based on full login data diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 77adada..26f7f63 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -5,7 +5,7 @@ * This is the implementation of the connection - a class that can only be * constructed by the connection class itselves and that has all sorts of * methods that are only useful inside the library - * + * * @copyright 2014 Copernica BV */ @@ -44,13 +44,13 @@ protected: state_closing, // connection is busy closing (we have sent the close frame) state_closed // connection is closed } _state = state_protocol; - + /** * Has the close() method been called? * @var bool */ bool _closed = false; - + /** * All channels that are active * @var map @@ -62,13 +62,13 @@ protected: * @var uint16_t */ uint16_t _nextFreeChannel = 1; - + /** * Max number of channels (0 for unlimited) * @var uint16_t */ uint16_t _maxChannels = 0; - + /** * Max frame size * @var uint32_t @@ -80,19 +80,19 @@ protected: * @var Login */ Login _login; - + /** * Vhost to connect to * @var string */ std::string _vhost; - + /** * Queued messages that should be sent after the connection has been established * @var queue */ std::queue _queue; - + /** * Helper method to send the close frame * Return value tells if the connection is still valid @@ -100,17 +100,16 @@ protected: */ bool sendClose(); - private: /** * Construct an AMQP object based on full login data - * + * * The first parameter is a handler object. This handler class is * an interface that should be implemented by the caller. - * + * * Note that the constructor is private to ensure that nobody can construct * this class, only the real Connection class via a friend construct - * + * * @param parent Parent connection object * @param handler Connection handler * @param login Login data @@ -132,7 +131,7 @@ public: // must be busy doing the connection handshake, or already connected return _state == state_handshake || _state == state_connected; } - + /** * Mark the protocol as being ok */ @@ -141,7 +140,7 @@ public: // move on to handshake state if (_state == state_protocol) _state = state_handshake; } - + /** * Are we fully connected? * @return bool @@ -151,12 +150,12 @@ public: // state must be connected return _state == state_connected; } - + /** * Mark the connection as connected */ void setConnected(); - + /** * Retrieve the login data * @return Login @@ -165,7 +164,7 @@ public: { return _login; } - + /** * Retrieve the vhost * @return string @@ -185,7 +184,7 @@ public: _maxChannels = channels; _maxFrame = size; } - + /** * The max frame size * @return uint32_t @@ -194,7 +193,7 @@ public: { return _maxFrame; } - + /** * The max payload size for body frames * @return uint32_t @@ -204,7 +203,7 @@ public: // 8 bytes for header and end-of-frame byte return _maxFrame - 8; } - + /** * Add a channel to the connection, and return the channel ID that it * is allowed to use, or 0 when no more ID's are available @@ -212,16 +211,16 @@ public: * @return uint16_t */ uint16_t add(ChannelImpl *channel); - + /** * Remove a channel * @param channel */ void remove(ChannelImpl *channel); - + /** * Parse the buffer into a recognized frame - * + * * Every time that data comes in on the connection, you should call this method to parse * the incoming data, and let it handle by the AMQP library. This method returns the number * of bytes that were processed. @@ -246,9 +245,9 @@ public: /** * Send a frame over the connection - * + * * This is an internal method that you normally do not have to call yourself - * + * * @param frame the frame to send * @return bool */ @@ -256,11 +255,11 @@ public: /** * Get a channel by its identifier - * + * * This method only works if you had already created the channel before. * This is an internal method that you will not need if you cache the channel * object. - * + * * @param number channel identifier * @return channel the channel object, or nullptr if not yet created */ @@ -278,11 +277,11 @@ public: { // set connection state to closed _state = state_closed; - + // inform handler _handler->onError(_parent, message); } - + /** * Report that the connection is closed */ @@ -290,7 +289,7 @@ public: { // change state _state = state_closed; - + // inform the handler _handler->onClosed(_parent); } diff --git a/include/deferred.h b/include/deferred.h new file mode 100644 index 0000000..8e0e9a0 --- /dev/null +++ b/include/deferred.h @@ -0,0 +1,179 @@ +/** + * Deferred.h + * + * Class describing a set of actions that could + * possibly happen in the future that can be + * caught. + * + * @copyright 2014 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +// forward declaration +class ChannelImpl; + +/** + * Class definition + */ +template +class Deferred +{ +private: + /** + * The channel we operate under + */ + Channel *_channel; + + /** + * Do we already know we failed? + */ + bool _failed; + + /** + * Callback to execute on success + */ + std::function _successCallback; + + /** + * Callback to execute on failure + */ + std::function _errorCallback; + + /** + * Callback to execute either way + */ + std::function _finalizeCallback; + + /** + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + + /** + * Indicate success + * + * @param parameters... the extra parameters relevant for this deferred handler + */ + void success(Arguments ...parameters) + { + // execute callbacks if registered + if (_successCallback) _successCallback(_channel, parameters...); + if (_finalizeCallback) _finalizeCallback(_channel, ""); + } + + /** + * Indicate failure + * + * @param error description of the error that occured + */ + void error(const std::string& error) + { + // we are now in a failed state + _failed = true; + + // execute callbacks if registered + if (_errorCallback) _errorCallback(_channel, error); + if (_finalizeCallback) _finalizeCallback(_channel, error); + } + + /** + * Private constructor that can only be called + * from within the channel implementation + * + * @param channel the channel we operate under + * @param boolea are we already failed? + */ + Deferred(Channel *channel, bool failed = false) : + _channel(channel), + _failed(failed) + {} +public: + /** + * Deleted copy constructor + */ + Deferred(const Deferred& that) = delete; + + /** + * Move constructor + */ + Deferred(Deferred&& that) : + _successCallback(std::move(that._successCallback)), + _errorCallback(std::move(that._errorCallback)), + _finalizeCallback(std::move(that._finalizeCallback)) + {} + + /** + * Cast to a boolean + */ + operator bool () + { + return !_failed; + } + + /** + * Register a function to be called + * if and when the operation succesfully + * completes. + * + * Only one callback can be registered at a time. + * Successive calls to this function will clear + * callbacks registered before. + * + * @param callback the callback to execute + */ + Deferred& onSuccess(const std::function& callback) + { + // store callback + _successCallback = callback; + return *this; + } + + /** + * Register a function to be called + * if and when the operation fails. + * + * Only one callback can be registered at a time. + * Successive calls to this function will clear + * callbacks registered before. + * + * @param callback the callback to execute + */ + Deferred& onError(const std::function& callback) + { + // store callback + _errorCallback = callback; + return *this; + } + + /** + * Register a function to be called + * if and when the operation completes + * or fails. This function will be called + * either way. + * + * In the case of success, the provided + * error parameter will be an empty string. + * + * Only one callback can be registered at at time. + * Successive calls to this function will clear + * callbacks registered before. + * + * @param callback the callback to execute + */ + Deferred& onFinalize(const std::function& callback) + { + // store callback + _finalizeCallback = callback; + return *this; + } +}; + +/** + * End namespace + */ +} diff --git a/src/basicqosokframe.h b/src/basicqosokframe.h index 5d8bca1..dc6e612 100644 --- a/src/basicqosokframe.h +++ b/src/basicqosokframe.h @@ -1,6 +1,6 @@ /** * Class describing a basic QOS frame - * + * * @copyright 2014 Copernica BV */ @@ -32,7 +32,7 @@ public: * @param channel channel we're working on */ BasicQosOKFrame(uint16_t channel) : BasicFrame(channel, 0) {} - + /** * Constructor based on incoming data * @param frame @@ -52,7 +52,7 @@ public: { return 11; } - + /** * Process the frame * @param connection The connection over which it was received @@ -62,13 +62,13 @@ public: { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist - if (!channel) return false; - + if (!channel) return false; + // report - channel->reportQosSet(); - + channel->reportSuccess(); + // done return true; } diff --git a/src/basicrecoverokframe.h b/src/basicrecoverokframe.h index 66004b1..1bcd771 100644 --- a/src/basicrecoverokframe.h +++ b/src/basicrecoverokframe.h @@ -1,6 +1,6 @@ /** * Class describing a basic recover-async frame - * + * * @copyright 2014 Copernica BV */ @@ -57,7 +57,7 @@ public: { return 111; } - + /** * Process the frame * @param connection The connection over which it was received @@ -67,17 +67,17 @@ public: { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist - if (!channel) return false; - + if (!channel) return false; + // report - channel->reportRecovering(); - + channel->reportSuccess(); + // done return true; } - + }; diff --git a/src/channelflowokframe.h b/src/channelflowokframe.h index efc3ae6..fcd325f 100644 --- a/src/channelflowokframe.h +++ b/src/channelflowokframe.h @@ -88,14 +88,13 @@ public: { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist - if (!channel) return false; - - // is the flow active? - if (active()) channel->reportResumed(); - else channel->reportPaused(); - + if (!channel) return false; + + // report success for the call + channel->reportSuccess(); + // done return true; } diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 28a3f09..c951f5e 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -55,13 +55,13 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler { // add the channel to the connection _id = _connection->add(this); - + // check if the id is valid if (_id == 0) { // this is invalid _state = state_closed; - + // invalid id, this channel can not exist handler->onError(_parent, "Max number of channels reached"); } @@ -69,7 +69,7 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler { // busy connecting _state = state_connected; - + // valid id, send a channel open frame send(ChannelOpenFrame(_id)); } @@ -80,106 +80,114 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler */ ChannelImpl::~ChannelImpl() { - // remove incoming message + // remove incoming message if (_message) delete _message; _message = nullptr; - + // remove this channel from the connection (but not if the connection is already destructed) if (_connection) _connection->remove(this); - + // close the channel now close(); } /** * Pause deliveries on a channel - * + * * This will stop all incoming messages - * - * This method returns true if the request to pause has been sent to the - * broker. This does not necessarily mean that the channel is already - * paused. - * - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::pause() +Deferred<>& ChannelImpl::pause() { - // send a flow frame - return send(ChannelFlowFrame(_id, false)); + // send a channel flow frame + return send(ChannelFlowFrame(_id, false), "Cannot send channel flow frame"); } /** * Resume a paused channel - * - * @return bool + * + * This will resume incoming messages + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::resume() +Deferred<>& ChannelImpl::resume() { - // send a flow frame - return send(ChannelFlowFrame(_id, true)); + // send a channel flow frame + return send(ChannelFlowFrame(_id, true), "Cannot send channel flow frame"); } /** * Start a transaction - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::startTransaction() +Deferred<>& ChannelImpl::startTransaction() { - // send a flow frame - return send(TransactionSelectFrame(_id)); -} + // send a transaction frame + return send(TransactionSelectFrame(_id), "Cannot send transaction start frame"); +} /** * Commit the current transaction - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::commitTransaction() +Deferred<>& ChannelImpl::commitTransaction() { - // send a flow frame - return send(TransactionCommitFrame(_id)); + // send a transaction frame + return send(TransactionCommitFrame(_id), "Cannot send transaction commit frame"); } /** * Rollback the current transaction - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::rollbackTransaction() +Deferred<>& ChannelImpl::rollbackTransaction() { - // send a flow frame - return send(TransactionRollbackFrame(_id)); + // send a transaction frame + return send(TransactionRollbackFrame(_id), "Cannot send transaction commit frame"); } /** * Close the current channel - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::close() +Deferred<>& ChannelImpl::close() { // channel could be dead after send operation, we need to monitor that Monitor monitor(this); - // send a flow frame - if (!send(ChannelCloseFrame(_id))) return false; + // send a channel close frame + auto &handler = send(ChannelCloseFrame(_id), "Cannot send channel close frame"); - // leap out if channel was destructed - if (!monitor.valid()) return true; - - // now it is closing - _state = state_closing; + // was the frame sent and are we still alive? + if (handler && monitor.valid()) _state = state_closing; // done - return true; + return handler; } /** * declare an exchange + * @param name name of the exchange to declare * @param type type of exchange * @param flags additional settings for the exchange * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) +Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { // convert exchange type std::string exchangeType; @@ -189,49 +197,58 @@ bool ChannelImpl::declareExchange(const std::string &name, ExchangeType type, in if (type == ExchangeType::headers)exchangeType = "headers"; // send declare exchange frame - return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments)); + return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame"); } /** * bind an exchange + * * @param source exchange which binds to target * @param target exchange to bind to * @param routingKey routing key * @param flags additional flags * @param arguments additional arguments for binding - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) +Deferred<>& ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { // send exchange bind frame - return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments)); + return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame"); } /** * unbind two exchanges + * * @param source the source exchange * @param target the target exchange * @param routingkey the routing key * @param flags optional flags * @param arguments additional unbind arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) +Deferred<>& ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { // send exchange unbind frame - return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments)); + return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame"); } /** * remove an exchange + * * @param name name of the exchange to remove * @param flags additional settings for deleting the exchange - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::removeExchange(const std::string &name, int flags) +Deferred<>& ChannelImpl::removeExchange(const std::string &name, int flags) { // send delete exchange frame - return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait)); + return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame"); } /** @@ -239,75 +256,107 @@ bool ChannelImpl::removeExchange(const std::string &name, int flags) * @param name queue name * @param flags additional settings for the queue * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments) +Deferred& ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments) { // send the queuedeclareframe - return send(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments)); + return send(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments), "Cannot send queue declare frame", _queueDeclaredCallbacks); } /** * Bind a queue to an exchange + * * @param exchangeName name of the exchange to bind to * @param queueName name of the queue * @param routingkey routingkey * @param flags additional flags * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) +Deferred<>& ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) { // send the bind queue frame - return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments)); + return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame"); } /** * Unbind a queue from an exchange + * * @param exchange the source exchange * @param queue the target queue * @param routingkey the routing key * @param arguments additional bind arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) +Deferred<>& ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { // send the unbind queue frame - return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments)); + return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame"); } /** * Purge a queue * @param queue queue to purge * @param flags additional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, uint32_t messageCount); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) { + * + * std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl; + * + * }); */ -bool ChannelImpl::purgeQueue(const std::string &name, int flags) +Deferred& ChannelImpl::purgeQueue(const std::string &name, int flags) { // send the queue purge frame - return send(QueuePurgeFrame(_id, name, flags & nowait)); + return send(QueuePurgeFrame(_id, name, flags & nowait), "Cannot send queue purge frame", _queueRemovedCallbacks); } /** * Remove a queue * @param queue queue to remove * @param flags additional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, uint32_t messageCount); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) { + * + * std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl; + * + * }); */ -bool ChannelImpl::removeQueue(const std::string &name, int flags) +Deferred& ChannelImpl::removeQueue(const std::string &name, int flags) { // send the remove queue frame - return send(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait)); + return send(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait), "Cannot send remove queue frame", _queueRemovedCallbacks); } /** * Publish a message to an exchange - * + * * The following flags can be used - * + * * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method - * + * * @param exchange the exchange to publish to * @param routingkey the routing key * @param flags optional flags (see above) @@ -320,46 +369,46 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin // we are going to send out multiple frames, each one will trigger a call to the handler, // which in turn could destruct the channel object, we need to monitor that Monitor monitor(this); - + // @todo do not copy the entire buffer to individual frames - + // send the publish frame if (!send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate))) return false; - + // channel still valid? if (!monitor.valid()) return false; // send header if (!send(BasicHeaderFrame(_id, envelope))) return false; - + // channel and connection still valid? if (!monitor.valid() || !_connection) return false; - + // the max payload size is the max frame size minus the bytes for headers and trailer uint32_t maxpayload = _connection->maxPayload(); uint32_t bytessent = 0; - + // the buffer const char *data = envelope.body(); uint32_t bytesleft = envelope.bodySize(); - + // split up the body in multiple frames depending on the max frame size while (bytesleft > 0) { // size of this chunk uint32_t chunksize = std::min(maxpayload, bytesleft); - + // send out a body frame if (!send(BodyFrame(_id, data + bytessent, chunksize))) return false; - + // channel still valid? if (!monitor.valid()) return false; - + // update counters bytessent += chunksize; bytesleft -= chunksize; } - + // done return true; } @@ -367,12 +416,14 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin /** * Set the Quality of Service (QOS) for this channel * @param prefetchCount maximum number of messages to prefetch - * @return bool whether the Qos frame is sent. + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::setQos(uint16_t prefetchCount) +Deferred<>& ChannelImpl::setQos(uint16_t prefetchCount) { // send a qos frame - return send(BasicQosFrame(_id, prefetchCount, false)); + return send(BasicQosFrame(_id, prefetchCount, false), "Cannot send basic QOS frame"); } /** @@ -427,12 +478,14 @@ bool ChannelImpl::reject(uint64_t deliveryTag, int flags) /** * Recover un-acked messages * @param flags optional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. */ -bool ChannelImpl::recover(int flags) +Deferred<>& ChannelImpl::recover(int flags) { // send a nack frame - return send(BasicRecoverFrame(_id, flags & requeue)); + return send(BasicRecoverFrame(_id, flags & requeue), "Cannot send basic recover frame"); } /** @@ -444,11 +497,58 @@ bool ChannelImpl::send(const Frame &frame) { // skip if channel is not connected if (_state != state_connected || !_connection) return false; - + // send to tcp connection return _connection->send(frame); } +/** + * Send a frame over the channel and + * get a deferred handler for it. + * + * @param frame frame to send + * @param message the message to trigger if the frame cannot be send at all + */ +Deferred<>& ChannelImpl::send(const Frame &frame, const char *message) +{ + // use the generic implementation + return send<>(frame, message, _callbacks); +} + +/** + * Send a frame over the channel and + * get a deferred handler for it. + * + * @param frame frame to send + * @param message the message to trigger if the frame cannot be send at all + * @param queue the queue to store the callbacks in + */ +template +Deferred& ChannelImpl::send(const Frame &frame, const char *message, std::deque>& queue) +{ + // create a new deferred handler and get a pointer to it + queue.push_back(Deferred(_parent)); + auto *handler = &queue.back(); + + // send the frame over the channel + if (!send(frame)) + { + // we can immediately put the handler in failed state + handler->_failed = true; + + // the frame could not be send + // we should register an error + // on the handler, but only after + // a timeout, so a handler can + // be attached first + + // TODO + } + + // return the new handler + return *handler; +} + /** * Report the received message */ @@ -456,18 +556,18 @@ void ChannelImpl::reportMessage() { // skip if there is no message if (!_message) return; - + // after the report the channel may be destructed, monitor that Monitor monitor(this); - + // do we have a handler? if (_handler) _message->report(_parent, _handler); - + // skip if channel was destructed if (!monitor.valid()) return; - + // no longer need the message - delete _message; + delete _message; _message = nullptr; } @@ -480,7 +580,7 @@ MessageImpl *ChannelImpl::message(const BasicDeliverFrame &frame) { // it should not be possible that a message already exists, but lets check it anyhow if (_message) delete _message; - + // construct a message return _message = new ConsumedMessage(frame); } @@ -494,7 +594,7 @@ MessageImpl *ChannelImpl::message(const BasicReturnFrame &frame) { // it should not be possible that a message already exists, but lets check it anyhow if (_message) delete _message; - + // construct a message return _message = new ReturnedMessage(frame); } diff --git a/src/exchangebindokframe.h b/src/exchangebindokframe.h index ad3c54c..5b1f29c 100644 --- a/src/exchangebindokframe.h +++ b/src/exchangebindokframe.h @@ -1,6 +1,6 @@ /** * Exchangebindokframe.h - * + * * @copyright 2014 Copernica BV */ @@ -8,7 +8,7 @@ * Set up namespace */ namespace AMQP { - + /** * Class definition */ @@ -29,10 +29,10 @@ protected: public: /** * Constructor based on incoming data - * + * * @param frame received frame to decode */ - ExchangeBindOKFrame(ReceivedFrame &frame) : + ExchangeBindOKFrame(ReceivedFrame &frame) : ExchangeFrame(frame) {} @@ -47,12 +47,12 @@ public: ExchangeBindOKFrame(uint16_t channel) : ExchangeFrame(channel, 0) {} - + virtual uint16_t methodID() const override { return 31; } - + /** * Process the frame * @param connection The connection over which it was received @@ -62,17 +62,17 @@ public: { // check if we have a channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist if(!channel) return false; // report to handler - channel->reportExchangeBound(); - + channel->reportSuccess(); + // done return true; } }; - + // end namespace } diff --git a/src/exchangedeclareokframe.h b/src/exchangedeclareokframe.h index b271601..f86bf3e 100644 --- a/src/exchangedeclareokframe.h +++ b/src/exchangedeclareokframe.h @@ -1,6 +1,6 @@ /** * Class describing an AMQP exchange declare ok frame - * + * * @copyright 2014 Copernica BV */ @@ -55,7 +55,7 @@ public: { return 11; } - + /** * Process the frame * @param connection The connection over which it was received @@ -65,13 +65,13 @@ public: { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist if(!channel) return false; - + // report exchange declare ok - channel->reportExchangeDeclared(); - + channel->reportSuccess(); + // done return true; } diff --git a/src/exchangedeleteokframe.h b/src/exchangedeleteokframe.h index c386358..ce44336 100644 --- a/src/exchangedeleteokframe.h +++ b/src/exchangedeleteokframe.h @@ -1,6 +1,6 @@ /** * Class describing an AMQP exchange delete ok frame - * + * * @copyright 2014 Copernica BV */ @@ -32,7 +32,7 @@ public: * * @param frame received frame */ - ExchangeDeleteOKFrame(ReceivedFrame &frame) : + ExchangeDeleteOKFrame(ReceivedFrame &frame) : ExchangeFrame(frame) {} @@ -56,7 +56,7 @@ public: { return 21; } - + /** * Process the frame * @param connection The connection over which it was received @@ -66,13 +66,13 @@ public: { // check if we have a channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist if(!channel) return false; - + // report to handler - channel->reportExchangeDeleted(); - + channel->reportSuccess(); + // done return true; } diff --git a/src/exchangeunbindokframe.h b/src/exchangeunbindokframe.h index f3a2a9b..ea256ab 100644 --- a/src/exchangeunbindokframe.h +++ b/src/exchangeunbindokframe.h @@ -1,5 +1,5 @@ /** - * Exchangeunbindokframe.h + * Exchangeunbindokframe.h * * @copyright 2014 Copernica BV */ @@ -9,7 +9,7 @@ * Set up namespace */ namespace AMQP { - + /** * Class definition */ @@ -30,10 +30,10 @@ protected: public: /** * Constructor based on incoming data - * + * * @param frame received frame to decode */ - ExchangeUnbindOKFrame(ReceivedFrame &frame) : + ExchangeUnbindOKFrame(ReceivedFrame &frame) : ExchangeFrame(frame) {} @@ -48,12 +48,12 @@ public: ExchangeUnbindOKFrame(uint16_t channel) : ExchangeFrame(channel, 0) {} - + virtual uint16_t methodID() const override { return 51; } - + /** * Process the frame * @param connection The connection over which it was received @@ -63,17 +63,17 @@ public: { // check if we have a channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist if(!channel) return false; // report to handler - channel->reportExchangeUnbound(); - + channel->reportSuccess(); + // done return true; } }; - + // end namespace } diff --git a/src/methodframe.h b/src/methodframe.h index 749e088..8b847fd 100644 --- a/src/methodframe.h +++ b/src/methodframe.h @@ -75,13 +75,10 @@ public: * @param connection The connection over which it was received * @return bool Was it succesfully processed? */ - virtual bool process(ConnectionImpl *connection) override + virtual bool process[[ noreturn ]](ConnectionImpl *connection) override { // this is an exception throw ProtocolException("unimplemented frame type " + std::to_string(type()) + " class " + std::to_string(classID()) + " method " + std::to_string(methodID())); - - // unreachable - return false; } }; diff --git a/src/queuebindokframe.h b/src/queuebindokframe.h index 6c66cb2..1d55a33 100644 --- a/src/queuebindokframe.h +++ b/src/queuebindokframe.h @@ -1,6 +1,6 @@ /** * Class describing an AMQP queue bind ok frame - * + * * @copyright 2014 Copernica BV */ @@ -54,7 +54,7 @@ public: { return 21; } - + /** * Process the frame * @param connection The connection over which it was received @@ -64,13 +64,13 @@ public: { // check if we have a channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist if(!channel) return false; - + // report to handler - channel->reportQueueBound(); - + channel->reportSuccess(); + // done return true; } diff --git a/src/queueunbindokframe.h b/src/queueunbindokframe.h index cd779f0..14a2761 100644 --- a/src/queueunbindokframe.h +++ b/src/queueunbindokframe.h @@ -1,6 +1,6 @@ /** * Class describing an AMQP queue unbind ok frame - * + * * @copyright 2014 Copernica BV */ @@ -24,7 +24,7 @@ protected: { // call base QueueFrame::fill(buffer); - } + } public: /** * Decode a queueunbindokframe from a received frame @@ -58,7 +58,7 @@ public: { return 51; } - + /** * Process the frame * @param connection The connection over which it was received @@ -68,13 +68,13 @@ public: { // check if we have a channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist if(!channel) return false; - + // report queue unbind success - channel->reportQueueUnbound(); - + channel->reportSuccess(); + // done return true; } diff --git a/src/transactioncommitokframe.h b/src/transactioncommitokframe.h index a10e1fe..27f7004 100644 --- a/src/transactioncommitokframe.h +++ b/src/transactioncommitokframe.h @@ -59,6 +59,26 @@ public: { return 21; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report that the channel is open + channel->reportSuccess(); + + // done + return true; + } }; /** diff --git a/src/transactionrollbackokframe.h b/src/transactionrollbackokframe.h index a203e4c..e87d893 100644 --- a/src/transactionrollbackokframe.h +++ b/src/transactionrollbackokframe.h @@ -58,7 +58,27 @@ public: virtual uint16_t methodID() const override { return 31; - } + } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report that the channel is open + channel->reportSuccess(); + + // done + return true; + } }; /** diff --git a/src/transactionselectokframe.h b/src/transactionselectokframe.h index 9529144..8a842f7 100644 --- a/src/transactionselectokframe.h +++ b/src/transactionselectokframe.h @@ -59,6 +59,26 @@ public: { return 11; } + + /** + * Process the frame + * @param connection The connection over which it was received + * @return bool Was it succesfully processed? + */ + virtual bool process(ConnectionImpl *connection) override + { + // we need the appropriate channel + ChannelImpl *channel = connection->channel(this->channel()); + + // channel does not exist + if(!channel) return false; + + // report that the channel is open + channel->reportSuccess(); + + // done + return true; + } }; /**