From e1b0e3dea1909142ed817c7c2fd68a6451672e9a Mon Sep 17 00:00:00 2001 From: Martijn Otto Date: Tue, 8 Apr 2014 16:12:04 +0200 Subject: [PATCH] Added a generic callback class that acts as a container for the different types of callbacks --- amqpcpp.h | 1 + include/callbacks.h | 10 ++--- include/channelimpl.h | 83 +++++++-------------------------------- include/deferred.h | 2 + src/channelimpl.cpp | 56 ++++++++++---------------- src/queuedeclareokframe.h | 22 +++++------ src/queuedeleteokframe.h | 14 +++---- src/queuepurgeokframe.h | 12 +++--- 8 files changed, 67 insertions(+), 133 deletions(-) diff --git a/amqpcpp.h b/amqpcpp.h index af79811..430844c 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -17,6 +17,7 @@ #include #include #include +#include // base C include files #include diff --git a/include/callbacks.h b/include/callbacks.h index b29bb38..3ed7601 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -90,13 +90,13 @@ public: * @return reference to the inserted deferred */ template - Deferred& push_back(const Deferred& item) + Deferred& push_back(Deferred&& item) { // retrieve the container auto &container = get>>(_callbacks); // add the element - container.push_back(item); + container.push_back(std::move(item)); // return reference to the new item return container.back(); @@ -128,7 +128,7 @@ public: */ template typename std::enable_if::value>::type - reportFailure(const std::string& message) + reportError(const std::string& message) {} /** @@ -138,7 +138,7 @@ public: */ template typename std::enable_if::value>::type - reportFailure(const std::string& message) + reportError(const std::string& message) { // retrieve the callbacks at current index auto &callbacks = std::get(_callbacks); @@ -147,7 +147,7 @@ public: for (auto &callback : callbacks) callback.error(message); // execute the next type - reportFailure(message); + reportError(message); } }; diff --git a/include/channelimpl.h b/include/channelimpl.h index 6651e9b..cf75dfb 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -51,13 +51,7 @@ private: /** * The callbacks waiting to be called */ - std::deque> _callbacks; - - /** - * Callbacks with additional parameters - */ - std::deque> _queueDeclaredCallbacks; - std::deque> _queueRemovedCallbacks; + Callbacks _callbacks; /** * The channel number @@ -399,18 +393,17 @@ public: * @param frame frame to send * @param message the message to trigger if the frame cannot be send at all */ - Deferred<>& send(const Frame &frame, const char *message); + template + Deferred& send(const Frame &frame, const char *message); /** - * Send a frame over the channel and - * get a deferred handler for it. - * - * @param frame frame to send - * @param message the message to trigger if the frame cannot be send at all - * @param queue the queue to store the callbacks in + * Report to the handler that the channel is opened */ - template - Deferred& send(const Frame &frame, const char *message, std::deque>& queue); + void reportReady() + { + // inform handler + if (_readyCallback) _readyCallback(_parent); + } /** * Report to the handler that the channel is closed @@ -430,20 +423,11 @@ public: * This function is called to report success for all * cases where the callback does not receive any parameters */ - void reportSuccess() + template + void reportSuccess(Arguments ...parameters) { - // report success for the oldest request - _callbacks.front().success(); - _callbacks.pop_front(); - } - - /** - * Report to the handler that the channel is opened - */ - void reportReady() - { - // inform handler - if (_readyCallback) _readyCallback(_parent); + // report success to the relevant callback + _callbacks.reportSuccess(std::forward(parameters)...); } /** @@ -458,45 +442,8 @@ public: // inform handler if (_errorCallback) _errorCallback(_parent, message); - // and all waiting deferred callbacks - for (auto &deferred : _callbacks) deferred.error(message); - for (auto &deferred : _queueDeclaredCallbacks) deferred.error(message); - for (auto &deferred : _queueRemovedCallbacks) deferred.error(message); - } - - /** - * Report that the queue was succesfully declared - * @param queueName name of the queue which was declared - * @param messagecount number of messages currently in the queue - * @param consumerCount number of active consumers in the queue - */ - void reportQueueDeclared(const std::string &queueName, uint32_t messageCount, uint32_t consumerCount) - { - // report success for the oldest queue declare callbacks - _queueDeclaredCallbacks.front().success(queueName, messageCount, consumerCount); - _queueDeclaredCallbacks.pop_front(); - } - - /** - * Report that a queue was succesfully deleted - * @param messageCount number of messages left in queue, now deleted - */ - void reportQueueDeleted(uint32_t messageCount) - { - // report success for the oldest queue remove callbacks - _queueRemovedCallbacks.front().success(messageCount); - _queueRemovedCallbacks.pop_front(); - } - - /** - * Report that a queue was succesfully purged - * @param messageCount number of messages purged - */ - void reportQueuePurged(uint32_t messageCount) - { - // report success for the oldest queue remove callbacks - _queueRemovedCallbacks.front().success(messageCount); - _queueRemovedCallbacks.pop_front(); + // report to all waiting callbacks too + _callbacks.reportError(message); } /** diff --git a/include/deferred.h b/include/deferred.h index 8e0e9a0..2d99187 100644 --- a/include/deferred.h +++ b/include/deferred.h @@ -15,6 +15,7 @@ namespace AMQP { // forward declaration class ChannelImpl; +class Callbacks; /** * Class definition @@ -53,6 +54,7 @@ private: * private members and construct us */ friend class ChannelImpl; + friend class Callbacks; /** * Indicate success diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index c951f5e..10ea529 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -102,7 +102,7 @@ ChannelImpl::~ChannelImpl() Deferred<>& ChannelImpl::pause() { // send a channel flow frame - return send(ChannelFlowFrame(_id, false), "Cannot send channel flow frame"); + return send<>(ChannelFlowFrame(_id, false), "Cannot send channel flow frame"); } /** @@ -116,7 +116,7 @@ Deferred<>& ChannelImpl::pause() Deferred<>& ChannelImpl::resume() { // send a channel flow frame - return send(ChannelFlowFrame(_id, true), "Cannot send channel flow frame"); + return send<>(ChannelFlowFrame(_id, true), "Cannot send channel flow frame"); } /** @@ -128,7 +128,7 @@ Deferred<>& ChannelImpl::resume() Deferred<>& ChannelImpl::startTransaction() { // send a transaction frame - return send(TransactionSelectFrame(_id), "Cannot send transaction start frame"); + return send<>(TransactionSelectFrame(_id), "Cannot send transaction start frame"); } /** @@ -140,7 +140,7 @@ Deferred<>& ChannelImpl::startTransaction() Deferred<>& ChannelImpl::commitTransaction() { // send a transaction frame - return send(TransactionCommitFrame(_id), "Cannot send transaction commit frame"); + return send<>(TransactionCommitFrame(_id), "Cannot send transaction commit frame"); } /** @@ -152,7 +152,7 @@ Deferred<>& ChannelImpl::commitTransaction() Deferred<>& ChannelImpl::rollbackTransaction() { // send a transaction frame - return send(TransactionRollbackFrame(_id), "Cannot send transaction commit frame"); + return send<>(TransactionRollbackFrame(_id), "Cannot send transaction commit frame"); } /** @@ -167,7 +167,7 @@ Deferred<>& ChannelImpl::close() Monitor monitor(this); // send a channel close frame - auto &handler = send(ChannelCloseFrame(_id), "Cannot send channel close frame"); + auto &handler = send<>(ChannelCloseFrame(_id), "Cannot send channel close frame"); // was the frame sent and are we still alive? if (handler && monitor.valid()) _state = state_closing; @@ -197,7 +197,7 @@ Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType t if (type == ExchangeType::headers)exchangeType = "headers"; // send declare exchange frame - return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame"); + return send<>(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame"); } /** @@ -215,7 +215,7 @@ Deferred<>& ChannelImpl::declareExchange(const std::string &name, ExchangeType t Deferred<>& ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { // send exchange bind frame - return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame"); + return send<>(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame"); } /** @@ -233,7 +233,7 @@ Deferred<>& ChannelImpl::bindExchange(const std::string &source, const std::stri Deferred<>& ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { // send exchange unbind frame - return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame"); + return send<>(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame"); } /** @@ -248,7 +248,7 @@ Deferred<>& ChannelImpl::unbindExchange(const std::string &source, const std::st Deferred<>& ChannelImpl::removeExchange(const std::string &name, int flags) { // send delete exchange frame - return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame"); + return send<>(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame"); } /** @@ -263,7 +263,7 @@ Deferred<>& ChannelImpl::removeExchange(const std::string &name, int flags) Deferred& ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments) { // send the queuedeclareframe - return send(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments), "Cannot send queue declare frame", _queueDeclaredCallbacks); + return send(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & exclusive, flags & autodelete, flags & nowait, arguments), "Cannot send queue declare frame"); } /** @@ -281,7 +281,7 @@ Deferred& ChannelImpl::declareQueue(cons Deferred<>& ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) { // send the bind queue frame - return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame"); + return send<>(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame"); } /** @@ -298,7 +298,7 @@ Deferred<>& ChannelImpl::bindQueue(const std::string &exchangeName, const std::s Deferred<>& ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { // send the unbind queue frame - return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame"); + return send<>(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame"); } /** @@ -322,7 +322,7 @@ Deferred<>& ChannelImpl::unbindQueue(const std::string &exchange, const std::str Deferred& ChannelImpl::purgeQueue(const std::string &name, int flags) { // send the queue purge frame - return send(QueuePurgeFrame(_id, name, flags & nowait), "Cannot send queue purge frame", _queueRemovedCallbacks); + return send(QueuePurgeFrame(_id, name, flags & nowait), "Cannot send queue purge frame"); } /** @@ -346,7 +346,7 @@ Deferred& ChannelImpl::purgeQueue(const std::string &name, int flags) Deferred& ChannelImpl::removeQueue(const std::string &name, int flags) { // send the remove queue frame - return send(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait), "Cannot send remove queue frame", _queueRemovedCallbacks); + return send(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait), "Cannot send remove queue frame"); } /** @@ -503,38 +503,22 @@ bool ChannelImpl::send(const Frame &frame) } /** - * Send a frame over the channel and - * get a deferred handler for it. + * Send a frame over the channel and get a deferred handler for it. * * @param frame frame to send * @param message the message to trigger if the frame cannot be send at all */ -Deferred<>& ChannelImpl::send(const Frame &frame, const char *message) -{ - // use the generic implementation - return send<>(frame, message, _callbacks); -} - -/** - * Send a frame over the channel and - * get a deferred handler for it. - * - * @param frame frame to send - * @param message the message to trigger if the frame cannot be send at all - * @param queue the queue to store the callbacks in - */ template -Deferred& ChannelImpl::send(const Frame &frame, const char *message, std::deque>& queue) +Deferred& ChannelImpl::send(const Frame &frame, const char *message) { // create a new deferred handler and get a pointer to it - queue.push_back(Deferred(_parent)); - auto *handler = &queue.back(); + auto &handler = _callbacks.push_back(Deferred(_parent)); // send the frame over the channel if (!send(frame)) { // we can immediately put the handler in failed state - handler->_failed = true; + handler._failed = true; // the frame could not be send // we should register an error @@ -546,7 +530,7 @@ Deferred& ChannelImpl::send(const Frame &frame, const char *messag } // return the new handler - return *handler; + return handler; } /** diff --git a/src/queuedeclareokframe.h b/src/queuedeclareokframe.h index 3cbc113..875ff2b 100644 --- a/src/queuedeclareokframe.h +++ b/src/queuedeclareokframe.h @@ -1,6 +1,6 @@ /** * Class describing an AMQP queue declare ok frame - * + * * @copyright 2014 Copernica BV */ @@ -70,7 +70,7 @@ public: /** * Constructor based on incoming data - * @param frame received frame + * @param frame received frame */ QueueDeclareOKFrame(ReceivedFrame &frame) : QueueFrame(frame), @@ -91,7 +91,7 @@ public: { return 11; } - + /** * Queue name * @return string @@ -105,7 +105,7 @@ public: * Number of messages * @return int32_t */ - int32_t messageCount() const + uint32_t messageCount() const { return _messageCount; } @@ -114,11 +114,11 @@ public: * Number of consumers * @return int32_t */ - int32_t consumerCount() const + uint32_t consumerCount() const { return _consumerCount; } - + /** * Process the frame * @param connection The connection over which it was received @@ -128,13 +128,13 @@ public: { // check if we have a channel ChannelImpl *channel = connection->channel(this->channel()); - + // what if channel doesn't exist? if (!channel) return false; - - // report to the handler - channel->reportQueueDeclared(this->name(), this->messageCount(), this->consumerCount()); - + + // report to the handler, we need to specify template arguments otherwise a string will lose const and reference + channel->reportSuccess(this->name(), this->messageCount(), this->consumerCount()); + // done return true; } diff --git a/src/queuedeleteokframe.h b/src/queuedeleteokframe.h index 881ed2b..39cd9b5 100644 --- a/src/queuedeleteokframe.h +++ b/src/queuedeleteokframe.h @@ -1,6 +1,6 @@ /** * Class describing an AMQP queue delete frame - * + * * @copyright 2014 Copernica BV */ @@ -79,7 +79,7 @@ public: { return _messageCount; } - + /** * Process the frame * @param connection The connection over which it was received @@ -89,13 +89,13 @@ public: { // check if we have a channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist if(!channel) return false; - + // report queue deletion success - channel->reportQueueDeleted(this->messageCount()); - + channel->reportSuccess(this->messageCount()); + // done return true; } @@ -105,4 +105,4 @@ public: * end namespace */ } - + diff --git a/src/queuepurgeokframe.h b/src/queuepurgeokframe.h index 715ef00..166cb60 100644 --- a/src/queuepurgeokframe.h +++ b/src/queuepurgeokframe.h @@ -1,6 +1,6 @@ /** * Class describing an AMQP queue purge frame - * + * * @copyright 2014 Copernica BV */ @@ -35,7 +35,7 @@ protected: // add fields buffer.add(_messageCount); } - + public: /** * Construct a queuepurgeokframe @@ -79,7 +79,7 @@ public: { return _messageCount; } - + /** * Process the frame * @param connection The connection over which it was received @@ -89,13 +89,13 @@ public: { // check if we have a channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist if(!channel) return false; // report queue purge success - channel->reportQueuePurged(this->messageCount()); - + channel->reportSuccess(this->messageCount()); + // done return true; }