From d2c17869e0b644ac9e9955aeff72c39bca3de19d Mon Sep 17 00:00:00 2001 From: Martijn Otto Date: Thu, 10 Apr 2014 12:51:04 +0200 Subject: [PATCH] Moved the remaining methods over to deferred handlers --- amqpcpp.h | 2 +- include/callbacks.h | 3 +- include/channel.h | 82 ++++--- include/channelhandler.h | 87 -------- include/channelimpl.h | 97 +++++---- include/deferred.h | 38 ++-- src/basiccancelokframe.h | 14 +- src/basicconsumeokframe.h | 12 +- src/basicreturnframe.h | 13 +- src/channelimpl.cpp | 105 +++++---- src/consumedmessage.h | 19 +- src/messageimpl.h | 29 ++- src/returnedmessage.h | 14 +- tests/main.cpp | 4 +- tests/myconnection.cpp | 448 ++++++++++++-------------------------- tests/myconnection.h | 195 ++--------------- 16 files changed, 388 insertions(+), 774 deletions(-) delete mode 100644 include/channelhandler.h diff --git a/amqpcpp.h b/amqpcpp.h index 430844c..908da12 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -51,8 +51,8 @@ #include #include #include +#include #include -#include #include #include #include diff --git a/include/callbacks.h b/include/callbacks.h index 3ed7601..9d378df 100644 --- a/include/callbacks.h +++ b/include/callbacks.h @@ -23,7 +23,8 @@ private: std::tuple< std::deque>, std::deque>, - std::deque> + std::deque>, + std::deque> > _callbacks; /** diff --git a/include/channel.h b/include/channel.h index 8a2550a..e5df3d3 100644 --- a/include/channel.h +++ b/include/channel.h @@ -26,9 +26,8 @@ public: /** * Construct a channel object * @param connection - * @param handler */ - Channel(Connection *connection, ChannelHandler *handler) : _implementation(this, connection, handler) {} + Channel(Connection *connection) : _implementation(this, connection) {} /** * Destructor @@ -324,7 +323,7 @@ public: * * void myCallback(AMQP::Channel *channel, uint32_t messageCount); * - * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) { + * For example: channel.removeQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) { * * std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl; * @@ -335,29 +334,20 @@ public: /** * Publish a message to an exchange * - * The following flags can be used - * - * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method - * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method - * * If either of the two flags is set, and the message could not immediately * be published, the message is returned by the server to the client. If you - * want to catch such returned messages, you need to implement the - * ChannelHandler::onReturned() method. + * want to catch such returned messages, you need to install a handler using + * the onReturned() method. * * @param exchange the exchange to publish to * @param routingkey the routing key - * @param flags optional flags (see above) * @param envelope the full envelope to send * @param message the message to send * @param size size of the message */ - bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, flags, envelope); } - bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, 0, envelope); } - bool publish(const std::string &exchange, const std::string &routingKey, int flags, const std::string &message) { return _implementation.publish(exchange, routingKey, flags, Envelope(message)); } - bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, 0, Envelope(message)); } - bool publish(const std::string &exchange, const std::string &routingKey, int flags, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, flags, Envelope(message, size)); } - bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, 0, Envelope(message, size)); } + bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, envelope); } + bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, Envelope(message)); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, Envelope(message, size)); } /** * Set the Quality of Service (QOS) for this channel @@ -392,7 +382,7 @@ public: * - exclusive request exclusive access, only this consumer can access the queue * - nowait the server does not have to send a response back that consuming is active * - * The method ChannelHandler::onConsumerStarted() will be called when the + * The method Deferred::onSuccess() will be called when the * consumer has started (unless the nowait option was set, in which case * no confirmation method is called) * @@ -400,14 +390,26 @@ public: * @param tag a consumer tag that will be associated with this consume operation * @param flags additional flags * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, const std::string& tag); + * + * For example: channel.consume("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * + * std::cout << "Started consuming under tag " << tag << std::endl; + * + * }); */ - bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation.consume(queue, tag, flags, arguments); } - bool consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation.consume(queue, tag, flags, Table()); } - bool consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation.consume(queue, tag, 0, arguments); } - bool consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); } - bool consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); } - bool consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); } + DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation.consume(queue, tag, flags, arguments); } + DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation.consume(queue, tag, flags, Table()); } + DeferredConsumer& consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation.consume(queue, tag, 0, arguments); } + DeferredConsumer& consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); } + DeferredConsumer& consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); } + DeferredConsumer& consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); } /** * Cancel a running consume call @@ -418,27 +420,39 @@ public: * * - nowait the server does not have to send a response back that the consumer has been cancelled * - * The method ChannelHandler::onConsumerStopped() will be called when the consumer + * The method Deferred::onSuccess() will be called when the consumer * was succesfully stopped (unless the nowait option was used, in which case no * confirmation method is called) * * @param tag the consumer tag * @param flags optional additional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, const std::string& tag); + * + * For example: channel.cancel("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * + * std::cout << "Stopped consuming under tag " << tag << std::endl; + * + * }); */ - bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } + Deferred& cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); } /** * Acknoldge a received message * - * When a message is received in the ChannelHandler::onReceived() method, - * you must acknoledge it so that RabbitMQ removes it from the queue (unless + * When a message is received in the DeferredConsumer::onReceived() method, + * you must acknowledge it so that RabbitMQ removes it from the queue (unless * you are consuming with the noack option). This method can be used for - * this acknoledging. + * this acknowledging. * * The following flags are supported: * - * - multiple acknoledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too + * - multiple acknowledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too * * @param deliveryTag the unique delivery tag of the message * @param flags optional flags @@ -449,8 +463,8 @@ public: /** * Reject or nack a message * - * When a message was received in the ChannelHandler::onReceived() method, - * and you don't want to acknoledge it, you can also choose to reject it by + * When a message was received in the DeferredConsumer::onReceived() method, + * and you don't want to acknowledge it, you can also choose to reject it by * calling this reject method. * * The following flags are supported: diff --git a/include/channelhandler.h b/include/channelhandler.h deleted file mode 100644 index 2ad468f..0000000 --- a/include/channelhandler.h +++ /dev/null @@ -1,87 +0,0 @@ -#pragma once -/** - * ChannelHandler.h - * - * Interface that should be implemented by a user of the AMQP library, - * and that is passed to the Connection::createChannel() method. - * - * This interface contains a number of methods that are called when - * the channel changes state. - * - * @copyright 2014 Copernica BV - */ - -/** - * Set up namespace - */ -namespace AMQP { - -/** - * Class definition - */ -class ChannelHandler -{ -public: - /** - * An error has occured on the channel - * The channel is no longer usable after an error has occured on it. - * @param channel the channel on which the error occured - * @param message human readable error message - */ - virtual void onError(Channel *channel, const std::string &message) {} - - /** - * Method that is called when a queue is purged - * This is the result of a call to Channel::purgeQueue() - * @param channel the channel on which the queue was emptied - * @param messageCount number of message purged - */ - virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {} - - /** - * Method that is called when a consumer was started - * This is the result of a call to Channel::consume() - * @param channel the channel on which the consumer was started - * @param tag the consumer tag - */ - virtual void onConsumerStarted(Channel *channel, const std::string &tag) {} - - /** - * Method that is called when a consumer was stopped - * This is the result of a call to Channel::cancel() - * @param channel the channel on which the consumer was stopped - * @param tag the consumer tag - */ - virtual void onConsumerStopped(Channel *channel, const std::string &tag) {} - - /** - * Method that is called when a message has been received on a channel - * This message will be called for every message that is received after - * you started consuming. Make sure you acknowledge the messages when its - * safe to remove them from RabbitMQ (unless you set no-ack option when you - * started the consumer) - * @param channel the channel on which the consumer was started - * @param message the consumed message - * @param deliveryTag the delivery tag, you need this to acknowledge the message - * @param consumerTag the consumer identifier that was used to retrieve this message - * @param redelivered is this a redelivered message? - */ - virtual void onReceived(Channel *channel, const Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) {} - - /** - * Method that is called when a message you tried to publish was returned - * by the server. This only happens when the 'mandatory' or 'immediate' flag - * was set with the Channel::publish() call. - * @param channel the channel on which the message was returned - * @param message the returned message - * @param code the reply code - * @param text human readable reply reason - */ - virtual void onReturned(Channel *channel, const Message &message, int16_t code, const std::string &text) {} - -}; - -/** - * End of namespace - */ -} diff --git a/include/channelimpl.h b/include/channelimpl.h index cf75dfb..37e8e07 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -32,12 +32,6 @@ private: */ ConnectionImpl *_connection; - /** - * The handler that is notified about events - * @var MyChannelHandler - */ - ChannelHandler *_handler; - /** * Callback when the channel is ready */ @@ -48,6 +42,11 @@ private: */ std::function _errorCallback; + /** + * Callback to execute when a message arrives + */ + std::unique_ptr _consumer; + /** * The callbacks waiting to be called */ @@ -69,12 +68,6 @@ private: state_closed } _state = state_connected; - /** - * Is a transaction now active? - * @var bool - */ - bool _transaction = false; - /** * The message that is now being received * @var MessageImpl @@ -90,9 +83,8 @@ private: * * @param parent the public channel object * @param connection pointer to the connection - * @param handler handler that is notified on events */ - ChannelImpl(Channel *parent, Connection *connection, ChannelHandler *handler = nullptr); + ChannelImpl(Channel *parent, Connection *connection); public: /** @@ -292,23 +284,17 @@ public: /** * Publish a message to an exchange * - * The following flags can be used - * - * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method - * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method - * * If the mandatory or immediate flag is set, and the message could not immediately * be published, the message will be returned to the client, and will eventually - * end up in your ChannelHandler::onReturned() method. + * end up in your onReturned() handler method. * * @param exchange the exchange to publish to * @param routingkey the routing key - * @param flags optional flags (see above) * @param envelope the full envelope to send * @param message the message to send * @param size size of the message */ - bool publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope); + bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope); /** * Set the Quality of Service (QOS) of the entire connection @@ -325,20 +311,44 @@ public: * @param tag a consumer tag that will be associated with this consume operation * @param flags additional flags * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, const std::string& tag); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * + * std::cout << "Started consuming under tag " << tag << std::endl; + * + * }); */ - bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments); + DeferredConsumer& consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments); /** * Cancel a running consumer * @param tag the consumer tag * @param flags optional flags - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, const std::string& tag); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * + * std::cout << "Started consuming under tag " << tag << std::endl; + * + * }); */ - bool cancel(const std::string &tag, int flags); + Deferred& cancel(const std::string &tag, int flags); /** - * Acknoledge a message + * Acknowledge a message * @param deliveryTag the delivery tag * @param flags optional flags * @return bool @@ -446,24 +456,6 @@ public: _callbacks.reportError(message); } - /** - * Report that a consumer has started - * @param tag the consumer tag - */ - void reportConsumerStarted(const std::string &tag) - { - if (_handler) _handler->onConsumerStarted(_parent, tag); - } - - /** - * Report that a consumer has stopped - * @param tag the consumer tag - */ - void reportConsumerStopped(const std::string &tag) - { - if (_handler) _handler->onConsumerStopped(_parent, tag); - } - /** * Report that a message was received */ @@ -475,7 +467,6 @@ public: * @return MessageImpl */ MessageImpl *message(const BasicDeliverFrame &frame); - MessageImpl *message(const BasicReturnFrame &frame); /** * Retrieve the current incoming message @@ -486,6 +477,20 @@ public: return _message; } + /** + * Report that the consumer has started + * + * @param consumerTag the tag under which we are now consuming + */ + void reportConsumerStarted(const std::string& consumerTag) + { + // if we do not have a consumer, something is very wrong + if (!_consumer) reportError("Received basic consume ok frame, but no consumer was found"); + + // otherwise, we now report the consumer as started + else _consumer->success(consumerTag); + } + /** * The channel class is its friend, thus can it instantiate this object */ diff --git a/include/deferred.h b/include/deferred.h index 2d99187..489c537 100644 --- a/include/deferred.h +++ b/include/deferred.h @@ -24,16 +24,6 @@ template class Deferred { private: - /** - * The channel we operate under - */ - Channel *_channel; - - /** - * Do we already know we failed? - */ - bool _failed; - /** * Callback to execute on success */ @@ -49,19 +39,12 @@ private: */ std::function _finalizeCallback; - /** - * The channel implementation may call our - * private members and construct us - */ - friend class ChannelImpl; - friend class Callbacks; - /** * Indicate success * * @param parameters... the extra parameters relevant for this deferred handler */ - void success(Arguments ...parameters) + void success(Arguments ...parameters) const { // execute callbacks if registered if (_successCallback) _successCallback(_channel, parameters...); @@ -84,7 +67,24 @@ private: } /** - * Private constructor that can only be called + * The channel implementation may call our + * private members and construct us + */ + friend class ChannelImpl; + friend class Callbacks; +protected: + /** + * The channel we operate under + */ + Channel *_channel; + + /** + * Do we already know we failed? + */ + bool _failed; + + /** + * Protected constructor that can only be called * from within the channel implementation * * @param channel the channel we operate under diff --git a/src/basiccancelokframe.h b/src/basiccancelokframe.h index 0841379..6d9d42f 100644 --- a/src/basiccancelokframe.h +++ b/src/basiccancelokframe.h @@ -1,6 +1,6 @@ /** * Class describing a basic cancel ok frame - * + * * @copyright 2014 Copernica BV */ @@ -42,7 +42,7 @@ public: * * @param frame received frame */ - BasicCancelOKFrame(ReceivedFrame &frame) : + BasicCancelOKFrame(ReceivedFrame &frame) : BasicFrame(frame), _consumerTag(frame) {} @@ -89,13 +89,13 @@ public: { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist - if (!channel) return false; - + if (!channel) return false; + // report - channel->reportConsumerStopped(consumerTag()); - + channel->reportSuccess(consumerTag()); + // done return true; } diff --git a/src/basicconsumeokframe.h b/src/basicconsumeokframe.h index cf7298a..1ed8ef4 100644 --- a/src/basicconsumeokframe.h +++ b/src/basicconsumeokframe.h @@ -1,6 +1,6 @@ /** * Class describing a basic consume ok frame - * + * * @copyright 2014 Copernica BV */ @@ -52,7 +52,7 @@ public: * * @param frame received frame */ - BasicConsumeOKFrame(ReceivedFrame &frame) : + BasicConsumeOKFrame(ReceivedFrame &frame) : BasicFrame(frame), _consumerTag(frame) {} @@ -89,13 +89,13 @@ public: { // we need the appropriate channel ChannelImpl *channel = connection->channel(this->channel()); - + // channel does not exist - if (!channel) return false; - + if (!channel) return false; + // report channel->reportConsumerStarted(consumerTag()); - + // done return true; } diff --git a/src/basicreturnframe.h b/src/basicreturnframe.h index 1759240..6bb8aef 100644 --- a/src/basicreturnframe.h +++ b/src/basicreturnframe.h @@ -144,17 +144,8 @@ public: */ virtual bool process(ConnectionImpl *connection) override { - // we need the appropriate channel - ChannelImpl *channel = connection->channel(this->channel()); - - // channel does not exist - if (!channel) return false; - - // construct the message - channel->message(*this); - - // done - return true; + // we no longer support returned messages + return false; } }; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 10ea529..f65237f 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -48,10 +48,9 @@ namespace AMQP { * @param connection * @param handler */ -ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler *handler) : +ChannelImpl::ChannelImpl(Channel *parent, Connection *connection) : _parent(parent), - _connection(&connection->_implementation), - _handler(handler) + _connection(&connection->_implementation) { // add the channel to the connection _id = _connection->add(this); @@ -61,9 +60,6 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler { // this is invalid _state = state_closed; - - // invalid id, this channel can not exist - handler->onError(_parent, "Max number of channels reached"); } else { @@ -352,19 +348,13 @@ Deferred& ChannelImpl::removeQueue(const std::string &name, int flags) /** * Publish a message to an exchange * - * The following flags can be used - * - * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method - * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method - * * @param exchange the exchange to publish to * @param routingkey the routing key - * @param flags optional flags (see above) * @param envelope the full envelope to send * @param message the message to send * @param size size of the message */ -bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope) +bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { // we are going to send out multiple frames, each one will trigger a call to the handler, // which in turn could destruct the channel object, we need to monitor that @@ -373,7 +363,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin // @todo do not copy the entire buffer to individual frames // send the publish frame - if (!send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate))) return false; + if (!send(BasicPublishFrame(_id, exchange, routingKey))) return false; // channel still valid? if (!monitor.valid()) return false; @@ -432,27 +422,65 @@ Deferred<>& ChannelImpl::setQos(uint16_t prefetchCount) * @param tag a consumer tag that will be associated with this consume operation * @param flags additional flags * @param arguments additional arguments - * @return bool + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, const std::string& tag); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * + * std::cout << "Started consuming under tag " << tag << std::endl; + * + * }); */ -bool ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) +DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { - // send a consume frame - return send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments)); + // create the deferred consumer + _consumer = std::unique_ptr(new DeferredConsumer(_parent, false)); + + // can we send the basic consume frame? + if (!send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments))) + { + // we set the consumer to be failed immediately + _consumer->_failed = true; + + // we should call the error function later + // TODO + } + + // return the consumer + return *_consumer; } /** * Cancel a running consumer * @param tag the consumer tag * @param flags optional flags + * + * This function returns a deferred handler. Callbacks can be installed + * using onSuccess(), onError() and onFinalize() methods. + * + * The onSuccess() callback that you can install should have the following signature: + * + * void myCallback(AMQP::Channel *channel, const std::string& tag); + * + * For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + * + * std::cout << "Started consuming under tag " << tag << std::endl; + * + * }); */ -bool ChannelImpl::cancel(const std::string &tag, int flags) +Deferred& ChannelImpl::cancel(const std::string &tag, int flags) { // send a cancel frame - return send(BasicCancelFrame(_id, tag, flags & nowait)); + return send(BasicCancelFrame(_id, tag, flags & nowait), "Cannot send basic cancel frame"); } /** - * Acknoledge a message + * Acknowledge a message * @param deliveryTag the delivery tag * @param flags optional flags * @return bool @@ -541,14 +569,23 @@ void ChannelImpl::reportMessage() // skip if there is no message if (!_message) return; - // after the report the channel may be destructed, monitor that - Monitor monitor(this); + // do we even have a consumer? + if (!_consumer) + { + // this should not be possible: receiving a message without doing a consume() call + reportError("Received message without having a consumer"); + } + else + { + // after the report the channel may be destructed, monitor that + Monitor monitor(this); - // do we have a handler? - if (_handler) _message->report(_parent, _handler); + // send message to the consumer + _message->report(*_consumer); - // skip if channel was destructed - if (!monitor.valid()) return; + // skip if channel was destructed + if (!monitor.valid()) return; + } // no longer need the message delete _message; @@ -569,20 +606,6 @@ MessageImpl *ChannelImpl::message(const BasicDeliverFrame &frame) return _message = new ConsumedMessage(frame); } -/** - * Create an incoming message - * @param frame - * @return MessageImpl - */ -MessageImpl *ChannelImpl::message(const BasicReturnFrame &frame) -{ - // it should not be possible that a message already exists, but lets check it anyhow - if (_message) delete _message; - - // construct a message - return _message = new ReturnedMessage(frame); -} - /** * End of namespace */ diff --git a/src/consumedmessage.h b/src/consumedmessage.h index 30ed208..2227763 100644 --- a/src/consumedmessage.h +++ b/src/consumedmessage.h @@ -26,7 +26,7 @@ private: * @var uint64_t */ uint64_t _deliveryTag; - + /** * Is this a redelivered message? * @var bool @@ -39,25 +39,24 @@ public: * Constructor * @param frame */ - ConsumedMessage(const BasicDeliverFrame &frame) : - MessageImpl(frame.exchange(), frame.routingKey()), + ConsumedMessage(const BasicDeliverFrame &frame) : + MessageImpl(frame.exchange(), frame.routingKey()), _consumerTag(frame.consumerTag()), _deliveryTag(frame.deliveryTag()), _redelivered(frame.redelivered()) {} - + /** * Destructor */ virtual ~ConsumedMessage() {} - + /** * Report to the handler - * @param channel - * @param handler + * @param consumer */ - virtual void report(Channel *channel, ChannelHandler *handler) override + virtual void report(const DeferredConsumer& consumer) override { - // report to the handler - handler->onReceived(channel, *this, _deliveryTag, _consumerTag, _redelivered); + // send ourselves to the consumer + consumer.message(*this, _deliveryTag, _consumerTag, _redelivered); } }; diff --git a/src/messageimpl.h b/src/messageimpl.h index 27d5b85..1b087dd 100644 --- a/src/messageimpl.h +++ b/src/messageimpl.h @@ -20,10 +20,10 @@ class MessageImpl : public Message private: /** * How many bytes have been received? - * @var uint64_t + * @var uint64_t */ uint64_t _received; - + /** * Was the buffer allocated by us? * @var bool @@ -36,8 +36,8 @@ protected: * @param exchange * @param routingKey */ - MessageImpl(const std::string &exchange, const std::string &routingKey) : - Message(exchange, routingKey), + MessageImpl(const std::string &exchange, const std::string &routingKey) : + Message(exchange, routingKey), _received(0), _selfAllocated(false) {} @@ -45,12 +45,12 @@ public: /** * Destructor */ - virtual ~MessageImpl() + virtual ~MessageImpl() { // clear up memory if it was self allocated if (_selfAllocated) delete[] _body; } - + /** * Set the body size * This field is set when the header is received @@ -60,7 +60,7 @@ public: { _bodySize = size; } - + /** * Append data * @param buffer incoming data @@ -84,27 +84,26 @@ public: // it does not yet fit, do we have to allocate? if (!_body) _body = new char[_bodySize]; _selfAllocated = true; - + // prevent that size is too big if (size > _bodySize - _received) size = _bodySize - _received; - + // append data memcpy((char *)(_body + _received), buffer, size); - + // we have more data now _received += size; - + // done return _received >= _bodySize; } } - + /** * Report to the handler - * @param channel - * @param handler + * @param consumer */ - virtual void report(Channel *channel, ChannelHandler *handler) = 0; + virtual void report(const DeferredConsumer& consumer) = 0; }; /** diff --git a/src/returnedmessage.h b/src/returnedmessage.h index 61115d7..322bb88 100644 --- a/src/returnedmessage.h +++ b/src/returnedmessage.h @@ -24,7 +24,7 @@ private: * @var int16_t */ int16_t _replyCode; - + /** * The reply message * @var string @@ -40,21 +40,19 @@ public: ReturnedMessage(const BasicReturnFrame &frame) : MessageImpl(frame.exchange(), frame.routingKey()), _replyCode(frame.replyCode()), _replyText(frame.replyText()) {} - + /** * Destructor */ virtual ~ReturnedMessage() {} - + /** * Report to the handler - * @param channel - * @param handler + * @param consumer */ - virtual void report(Channel *channel, ChannelHandler *handler) override + virtual void report(const DeferredConsumer& consumer) override { - // report to the handler - handler->onReturned(channel, *this, _replyCode, _replyText); + // we no longer support returned messages } }; diff --git a/tests/main.cpp b/tests/main.cpp index 49076b7..f0c1b04 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -30,13 +30,13 @@ using namespace Copernica; * @return int */ int main(int argc, const char *argv[]) -{ +{ // need an ip if (argc != 2) { // report error std::cerr << "usage: " << argv[0] << " " << std::endl; - + // done return -1; } diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index 64c82b4..7234ad7 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -3,13 +3,13 @@ * * @copyright 2014 Copernica BV */ - + /** * Required external libraries */ #include #include - + #include /** @@ -17,7 +17,7 @@ */ using namespace std; using namespace Copernica; - + /** * Required local class definitions */ @@ -33,23 +33,11 @@ MyConnection::MyConnection(const std::string &ip) : { // start connecting if (_socket.connect(Network::Ipv4Address(ip), 5672)) return; - + // failure onFailure(&_socket); } -/** - * Destructor - */ -MyConnection::~MyConnection() -{ - // do we still have a channel? - if (_channel) delete _channel; - - // do we still have a connection? - if (_connection) delete _connection; -} - /** * Method that is called when the connection failed * @param socket Pointer to the socket @@ -78,19 +66,118 @@ void MyConnection::onConnected(Network::TcpSocket *socket) { // report connection std::cout << "connected" << std::endl; - + // we are connected, leap out if there already is a amqp connection if (_connection) return; - + // create amqp connection, and a new channel - _connection = new AMQP::Connection(this, AMQP::Login("guest", "guest"), "/"); - _channel = new AMQP::Channel(_connection, this); - - // we declare a queue, an exchange and we publish a message - _channel->declareQueue("my_queue"); -// _channel->declareQueue("my_queue", AMQP::autodelete); - _channel->declareExchange("my_exchange", AMQP::direct); - _channel->bindQueue("my_exchange", "my_queue", "key"); + _connection = std::unique_ptr(new AMQP::Connection(this, AMQP::Login("guest", "guest"), "/")); + _channel = std::unique_ptr(new AMQP::Channel(_connection.get())); + + // watch for the channel becoming ready + _channel->onReady([](AMQP::Channel *channel) { + // show that we are ready + std::cout << "AMQP channel ready, id: " << (int) channel->id() << std::endl; + }); + + // and of course for channel errors + _channel->onError([this](AMQP::Channel *channel, const std::string& message) { + // inform the user of the error + std::cerr << "AMQP channel error on channel " << channel->id() << ": " << message << std::endl; + + // delete the channel + _channel = nullptr; + + // close the connection + _connection->close(); + }); + + // declare a queue and let us know when it succeeds + _channel->declareQueue("my_queue").onSuccess([](AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount){ + // queue was successfully declared + std::cout << "AMQP Queue declared with name '" << name << "', " << messageCount << " messages and " << consumerCount << " consumer" << std::endl; + }); + + // also declare an exchange + _channel->declareExchange("my_exchange", AMQP::direct).onSuccess([](AMQP::Channel *channel) { + // exchange successfully declared + std::cout << "AMQP exchange declared" << std::endl; + }); + + // bind the queue to the exchange + _channel->bindQueue("my_exchange", "my_queue", "key").onSuccess([](AMQP::Channel *channel) { + // queue successfully bound to exchange + std::cout << "AMQP Queue bound" << std::endl; + }); + + // set quality of service + _channel->setQos(1).onSuccess([](AMQP::Channel *channel) { + // quality of service successfully set + std::cout << "AMQP Quality of Service set" << std::endl; + }); + + // publish a message to the exchange + if (!_channel->publish("my_exchange", "key", "my_message")) + { + // we could not publish the message, something is wrong somewhere + std::cerr << "Unable to publish message" << std::endl; + + // close the channel + _channel->close().onSuccess([this](AMQP::Channel *channel) { + // also close the connection + _connection->close(); + }); + } + + // consume the message we just published + _channel->consume("my_queue", "my_consumer", AMQP::exclusive) + .onReceived([this](AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) { + // show the message data + std::cout << "AMQP consumed: " << message.message() << std::endl; + + // ack the message + _channel->ack(deliveryTag); + + // and stop consuming (there is only one message anyways) + _channel->cancel("my_consumer").onSuccess([](AMQP::Channel *channel, const std::string& tag) { + // we successfully stopped consuming + std::cout << "Stopped consuming under tag " << tag << std::endl; + }); + + // unbind the queue again + _channel->unbindQueue("my_exchange", "my_queue", "key").onSuccess([](AMQP::Channel *channel) { + // queueu successfully unbound + std::cout << "Queue unbound" << std::endl; + }); + + // the queue should now be empty, so we can delete it + _channel->removeQueue("my_queue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) { + // queue was removed, it should have been empty, so messageCount should be 0 + if (messageCount) std::cerr << "Removed queue which should have been empty but contained " << messageCount << " messages" << std::endl; + + // no messages is the expected behavior + else std::cout << "Queue removed" << std::endl; + }); + + // also remove the exchange + _channel->removeExchange("my_exchange").onSuccess([](AMQP::Channel *channel) { + // exchange was successfully removed + std::cout << "Removed exchange" << std::endl; + }); + + // everything done, close the channel + _channel->close().onSuccess([this](AMQP::Channel *channel) { + // channel was closed + std::cout << "Channel closed" << std::endl; + + // close the connection too + _connection->close(); + }); + }) + .onSuccess([](AMQP::Channel *channel, const std::string& tag) { + // consumer was started + std::cout << "Started consuming under tag " << tag << std::endl; + }); } /** @@ -103,12 +190,11 @@ void MyConnection::onClosed(Network::TcpSocket *socket) std::cout << "myconnection closed" << std::endl; // close the channel and connection - if (_channel) delete _channel; - if (_connection) delete _connection; - - // set to null _channel = nullptr; _connection = nullptr; + + // stop the loop + Event::MainLoop::instance()->stop(); } /** @@ -119,14 +205,13 @@ void MyConnection::onLost(Network::TcpSocket *socket) { // report error std::cout << "connection lost" << std::endl; - + // close the channel and connection - if (_channel) delete _channel; - if (_connection) delete _connection; - - // set to null _channel = nullptr; _connection = nullptr; + + // stop the loop + Event::MainLoop::instance()->stop(); } /** @@ -136,15 +221,12 @@ void MyConnection::onLost(Network::TcpSocket *socket) */ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer) { - // send what came in - std::cout << "received: " << buffer->size() << " bytes" << std::endl; - // leap out if there is no connection if (!_connection) return; - + // let the data be handled by the connection size_t bytes = _connection->parse(buffer->data(), buffer->size()); - + // shrink the buffer buffer->shrink(bytes); } @@ -152,7 +234,7 @@ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer) /** * Method that is called when data needs to be sent over the network * - * Note that the AMQP library does no buffering by itself. This means + * Note that the AMQP library does no buffering by itself. This means * that this method should always send out all data or do the buffering * itself. * @@ -162,21 +244,27 @@ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer) */ void MyConnection::onData(AMQP::Connection *connection, const char *buffer, size_t size) { -// // report what is going on -// std::cout << "send: " << size << std::endl; -// -// for (unsigned i=0; i(new AMQP::Channel(connection)); } - -/** - * Method that is called when the channel was succesfully created. - * Only after the channel was created, you can use it for subsequent messages over it - * @param channel - */ -void MyConnection::onReady(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP channel ready, id: " << (int) channel->id() << std::endl; -} - -/** - * An error has occured on the channel - * @param channel - * @param message - */ - -void MyConnection::onError(AMQP::Channel *channel, const std::string &message) -{ - // show - std::cout << "AMQP channel error, id: " << (int) channel->id() << " - message: " << message << std::endl; - - // main channel cause an error, get rid of if - delete _channel; - - // reset pointer - _channel = nullptr; -} - -/** - * Method that is called when the channel was paused - * @param channel - */ -void MyConnection::onPaused(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP channel paused" << std::endl; -} - -/** - * Method that is called when the channel was resumed - * @param channel - */ -void MyConnection::onResumed(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP channel resumed" << std::endl; -} - -/** - * Method that is called when a channel is closed - * @param channel - */ -void MyConnection::onClosed(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP channel closed" << std::endl; -} - -/** - * Method that is called when a transaction was started - * @param channel - */ -void MyConnection::onTransactionStarted(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP transaction started" << std::endl; -} - -/** - * Method that is called when a transaction was committed - * @param channel - */ -void MyConnection::onTransactionCommitted(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP transaction committed" << std::endl; -} - -/** - * Method that is called when a transaction was rolled back - * @param channel - */ -void MyConnection::onTransactionRolledBack(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP transaction rolled back" << std::endl; -} - -/** - * Mehod that is called when an exchange is declared - * @param channel - */ -void MyConnection::onExchangeDeclared(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP exchange declared" << std::endl; -} - -/** - * Method that is called when an exchange is bound - * @param channel - */ -void MyConnection::onExchangeBound(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP Exchange bound" << std::endl; -} - -/** - * Method that is called when an exchange is unbound - * @param channel - */ -void MyConnection::onExchangeUnbound(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP Exchange unbound" << std::endl; -} - -/** - * Method that is called when an exchange is deleted - * @param channel - */ -void MyConnection::onExchangeDeleted(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP Exchange deleted" << std::endl; -} - -/** - * Method that is called when a queue is declared - * @param channel - * @param name name of the queue - * @param messageCount number of messages in queue - * @param consumerCount number of active consumers - */ -void MyConnection::onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) -{ - // show - std::cout << "AMQP Queue declared" << std::endl; -} - -/** - * Method that is called when a queue is bound - * @param channel - * @param - */ -void MyConnection::onQueueBound(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP Queue bound" << std::endl; - -// _connection->setQos(10); -// _channel->setQos(1); - - - _channel->publish("my_exchange", "invalid-key", AMQP::mandatory, "this is the message"); -// _channel->consume("my_queue"); -} - -/** - * Method that is called when a queue is deleted - * @param channel - * @param messageCount number of messages deleted along with the queue - */ -void MyConnection::onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount) -{ - // show - std::cout << "AMQP Queue deleted" << std::endl; -} - -/** - * Method that is called when a queue is unbound - * @param channel - */ -void MyConnection::onQueueUnbound(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP Queue unbound" << std::endl; -} - -/** - * Method that is called when a queue is purged - * @param messageCount number of message purged - */ -void MyConnection::onQueuePurged(AMQP::Channel *channel, uint32_t messageCount) -{ - // show - std::cout << "AMQP Queue purged" << std::endl; -} - -/** - * Method that is called when the quality-of-service was changed - * This is the result of a call to Channel::setQos() - */ -void MyConnection::onQosSet(AMQP::Channel *channel) -{ - // show - std::cout << "AMQP Qos set" << std::endl; -} - -/** - * Method that is called when a consumer was started - * This is the result of a call to Channel::consume() - * @param channel the channel on which the consumer was started - * @param tag the consumer tag - */ -void MyConnection::onConsumerStarted(AMQP::Channel *channel, const std::string &tag) -{ - // show - std::cout << "AMQP consumer started" << std::endl; -} - -/** - * Method that is called when a message has been received on a channel - * @param channel the channel on which the consumer was started - * @param message the consumed message - * @param deliveryTag the delivery tag, you need this to acknowledge the message - * @param consumerTag the consumer identifier that was used to retrieve this message - * @param redelivered is this a redelivered message? - */ -void MyConnection::onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) -{ - // show - std::cout << "AMQP consumed: " << message.message() << std::endl; - - // ack the message - channel->ack(deliveryTag); -} - -/** - * Method that is called when a message you tried to publish was returned - * by the server. This only happens when the 'mandatory' or 'immediate' flag - * was set with the Channel::publish() call. - * @param channel the channel on which the message was returned - * @param message the returned message - * @param code the reply code - * @param text human readable reply reason - */ -void MyConnection::onReturned(AMQP::Channel *channel, const AMQP::Message &message, int16_t code, const std::string &text) -{ - // show - std::cout << "AMQP message returned: " << text << std::endl; -} - -/** - * Method that is called when a consumer was stopped - * This is the result of a call to Channel::cancel() - * @param channel the channel on which the consumer was stopped - * @param tag the consumer tag - */ -void MyConnection::onConsumerStopped(AMQP::Channel *channel, const std::string &tag) -{ - // show - std::cout << "AMQP consumer stopped" << std::endl; -} - diff --git a/tests/myconnection.h b/tests/myconnection.h index 61b0e1e..698428b 100644 --- a/tests/myconnection.h +++ b/tests/myconnection.h @@ -9,9 +9,8 @@ /** * Class definition */ -class MyConnection : +class MyConnection : public AMQP::ConnectionHandler, - public AMQP::ChannelHandler, public Network::TcpHandler { private: @@ -20,18 +19,18 @@ private: * @var TcpSocket */ Network::TcpSocket _socket; - + /** * The AMQP connection * @var Connection */ - AMQP::Connection *_connection; - + std::unique_ptr _connection; + /** * The AMQP channel * @var Channel */ - AMQP::Channel *_channel; + std::unique_ptr _channel; /** * Method that is called when the connection failed @@ -44,13 +43,13 @@ private: * @param socket Pointer to the socket */ virtual void onTimeout(Network::TcpSocket *socket) override; - + /** * Method that is called when the connection succeeded * @param socket Pointer to the socket */ virtual void onConnected(Network::TcpSocket *socket) override; - + /** * Method that is called when the socket is closed (as a result of a TcpSocket::close() call) * @param socket Pointer to the socket @@ -62,18 +61,18 @@ private: * @param socket Pointer to the socket */ virtual void onLost(Network::TcpSocket *socket) override; - + /** * Method that is called when data is received on the socket * @param socket Pointer to the socket * @param buffer Pointer to the fill input buffer */ virtual void onData(Network::TcpSocket *socket, Network::Buffer *buffer) override; - + /** * Method that is called when data needs to be sent over the network * - * Note that the AMQP library does no buffering by itself. This means + * Note that the AMQP library does no buffering by itself. This means * that this method should always send out all data or do the buffering * itself. * @@ -82,11 +81,17 @@ private: * @param size Size of the buffer */ virtual void onData(AMQP::Connection *connection, const char *buffer, size_t size) override; - + + /** + * Method that is called when the connection to AMQP was closed + * @param connection pointer to connection object + */ + virtual void onClosed(AMQP::Connection *connection) override; + /** * When the connection ends up in an error state this method is called. * This happens when data comes in that does not match the AMQP protocol - * + * * After this method is called, the connection no longer is in a valid * state and can be used. In normal circumstances this method is not called. * @@ -103,175 +108,11 @@ private: */ virtual void onConnected(AMQP::Connection *connection) override; - /** - * Method that is called when the channel was succesfully created. - * Only after the channel was created, you can use it for subsequent messages over it - * @param channel - */ - virtual void onReady(AMQP::Channel *channel) override; - - /** - * An error has occured on the channel - * @param channel - * @param message - */ - virtual void onError(AMQP::Channel *channel, const std::string &message) override; - - /** - * Method that is called when the channel was paused - * @param channel - */ - virtual void onPaused(AMQP::Channel *channel) override; - - /** - * Method that is called when the channel was resumed - * @param channel - */ - virtual void onResumed(AMQP::Channel *channel) override; - - /** - * Method that is called when a channel is closed - * @param channel - */ - virtual void onClosed(AMQP::Channel *channel) override; - - /** - * Method that is called when a transaction was started - * @param channel - */ - virtual void onTransactionStarted(AMQP::Channel *channel) override; - - /** - * Method that is called when a transaction was committed - * @param channel - */ - virtual void onTransactionCommitted(AMQP::Channel *channel) override; - - /** - * Method that is called when a transaction was rolled back - * @param channel - */ - virtual void onTransactionRolledBack(AMQP::Channel *channel) override; - - /** - * Method that is called when an exchange is bound - * @param channel - */ - virtual void onExchangeBound(AMQP::Channel *channel) override; - - /** - * Method that is called when an exchange is unbound - * @param channel - */ - virtual void onExchangeUnbound(AMQP::Channel *channel) override; - - /** - * Method that is called when an exchange is deleted - * @param channel - */ - virtual void onExchangeDeleted(AMQP::Channel *channel) override; - - /** - * Mehod that is called when an exchange is declared - * @param channel - */ - virtual void onExchangeDeclared(AMQP::Channel *channel) override; - - /** - * Method that is called when a queue is declared - * @param channel - * @param name name of the queue - * @param messageCount number of messages in queue - * @param consumerCount number of active consumers - */ - virtual void onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) override; - - /** - * Method that is called when a queue is bound - * @param channel - * @param - */ - virtual void onQueueBound(AMQP::Channel *channel) override; - - /** - * Method that is called when a queue is deleted - * @param channel - * @param messageCount number of messages deleted along with the queue - */ - virtual void onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount) override; - - /** - * Method that is called when a queue is unbound - * @param channel - */ - virtual void onQueueUnbound(AMQP::Channel *channel) override; - - /** - * Method that is called when a queue is purged - * @param messageCount number of message purged - */ - virtual void onQueuePurged(AMQP::Channel *channel, uint32_t messageCount) override; - - /** - * Method that is called when the quality-of-service was changed - * This is the result of a call to Channel::setQos() - */ - virtual void onQosSet(AMQP::Channel *channel) override; - - /** - * Method that is called when a consumer was started - * This is the result of a call to Channel::consume() - * @param channel the channel on which the consumer was started - * @param tag the consumer tag - */ - virtual void onConsumerStarted(AMQP::Channel *channel, const std::string &tag) override; - - /** - * Method that is called when a consumer was stopped - * This is the result of a call to Channel::cancel() - * @param channel the channel on which the consumer was stopped - * @param tag the consumer tag - */ - virtual void onConsumerStopped(AMQP::Channel *channel, const std::string &tag) override; - - /** - * Method that is called when a message has been received on a channel - * This message will be called for every message that is received after - * you started consuming. Make sure you acknowledge the messages when its - * safe to remove them from RabbitMQ (unless you set no-ack option when you - * started the consumer) - * @param channel the channel on which the consumer was started - * @param message the consumed message - * @param deliveryTag the delivery tag, you need this to acknowledge the message - * @param consumerTag the consumer identifier that was used to retrieve this message - * @param redelivered is this a redelivered message? - */ - virtual void onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) override; - - /** - * Method that is called when a message you tried to publish was returned - * by the server. This only happens when the 'mandatory' or 'immediate' flag - * was set with the Channel::publish() call. - * @param channel the channel on which the message was returned - * @param message the returned message - * @param code the reply code - * @param text human readable reply reason - */ - virtual void onReturned(AMQP::Channel *channel, const AMQP::Message &message, int16_t code, const std::string &text) override; - - public: /** * Constructor * @param ip */ MyConnection(const std::string &ip); - - /** - * Destructor - */ - virtual ~MyConnection(); - - };