diff --git a/include/channelimpl.h b/include/channelimpl.h index 0f607bf..62cc70d 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -107,9 +107,16 @@ private: /** * Push a deferred result * @param result The deferred result - * @param error Error message in case the result is not ok + * @return Deferred The object just pushed */ - void push(Deferred *deferred, const char *error); + Deferred &push(Deferred *deferred); + + /** + * Send a framen and push a deferred result + * @param frame The frame to send + * @return Deferred The object just pushed + */ + Deferred &push(const Frame &frame); public: @@ -422,18 +429,10 @@ public: */ bool send(const Frame &frame); - /** - * Send a frame over the channel and - * get a deferred handler for it. - * - * @param frame frame to send - * @param message the message to trigger if the frame cannot be send at all - * @return Deferred the deferred object - */ - Deferred &send(const Frame &frame, const char *message); - /** * Report to the handler that the channel is opened + * + * @todo when is this sent? */ void reportReady() { @@ -443,6 +442,8 @@ public: /** * Report to the handler that the channel is closed + * + * @todo do we need this? */ void reportClosed() { @@ -450,6 +451,9 @@ public: _state = state_closed; // inform handler + + // @todo do we report success here? + reportSuccess(); } diff --git a/include/connectionhandler.h b/include/connectionhandler.h index 2ffe23b..220e2dc 100644 --- a/include/connectionhandler.h +++ b/include/connectionhandler.h @@ -55,7 +55,7 @@ public: * state and can no longer be used. * * This method has an empty default implementation, although you are very - * much advised to implement it. Because when an error occurs, the connection + * much advised to implement it. When an error occurs, the connection * is no longer usable, so you probably want to know. * * @param connection The connection that entered the error state @@ -65,14 +65,15 @@ public: /** * Method that is called when the login attempt succeeded. After this method - * was called, the connection is ready to use. This is the first method + * is called, the connection is ready to use. This is the first method * that is normally called after you've constructed the connection object. * * According to the AMQP protocol, you must wait for the connection to become * ready (and this onConnected method to be called) before you can start * using the Connection object. However, this AMQP library will cache all * methods that you call before the connection is ready, so in reality there - * is no real reason to wait. + * is no real reason to wait for this method to be called before you send + * the first instructions. * * @param connection The connection that can now be used */ diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index aac9ce6..a3cc37e 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -93,9 +93,8 @@ ChannelImpl::~ChannelImpl() /** * Push a deferred result * @param result The deferred object to push - * @param error Error message in case of error */ -void ChannelImpl::push(Deferred *deferred, const char *error) +Deferred &ChannelImpl::push(Deferred *deferred) { // do we already have an oldest? if (!_oldestCallback) _oldestCallback.reset(deferred); @@ -107,6 +106,19 @@ void ChannelImpl::push(Deferred *deferred, const char *error) _newestCallback = deferred; // @todo in case of error we have to report the error with a timeout + + // done + return *deferred; +} + +/** + * Send a frame and push a deferred result + * @param frame The frame to send + */ +Deferred &ChannelImpl::push(const Frame &frame) +{ + // send the frame, and push the result + return push(new Deferred(send(frame))); } /** @@ -120,7 +132,7 @@ void ChannelImpl::push(Deferred *deferred, const char *error) Deferred &ChannelImpl::pause() { // send a channel flow frame - return send(ChannelFlowFrame(_id, false), "Cannot send channel flow frame"); + return push(ChannelFlowFrame(_id, false)); } /** @@ -134,7 +146,7 @@ Deferred &ChannelImpl::pause() Deferred &ChannelImpl::resume() { // send a channel flow frame - return send(ChannelFlowFrame(_id, true), "Cannot send channel flow frame"); + return push(ChannelFlowFrame(_id, true)); } /** @@ -146,7 +158,7 @@ Deferred &ChannelImpl::resume() Deferred &ChannelImpl::startTransaction() { // send a transaction frame - return send(TransactionSelectFrame(_id), "Cannot send transaction start frame"); + return push(TransactionSelectFrame(_id)); } /** @@ -158,7 +170,7 @@ Deferred &ChannelImpl::startTransaction() Deferred &ChannelImpl::commitTransaction() { // send a transaction frame - return send(TransactionCommitFrame(_id), "Cannot send transaction commit frame"); + return push(TransactionCommitFrame(_id)); } /** @@ -170,7 +182,7 @@ Deferred &ChannelImpl::commitTransaction() Deferred &ChannelImpl::rollbackTransaction() { // send a transaction frame - return send(TransactionRollbackFrame(_id), "Cannot send transaction commit frame"); + return push(TransactionRollbackFrame(_id)); } /** @@ -181,14 +193,11 @@ Deferred &ChannelImpl::rollbackTransaction() */ Deferred &ChannelImpl::close() { - // channel could be dead after send operation, we need to monitor that - Monitor monitor(this); - // send a channel close frame - auto &handler = send(ChannelCloseFrame(_id), "Cannot send channel close frame"); + auto &handler = push(ChannelCloseFrame(_id)); // was the frame sent and are we still alive? - if (handler && monitor.valid()) _state = state_closing; + if (handler) _state = state_closing; // done return handler; @@ -215,7 +224,7 @@ Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType typ if (type == ExchangeType::headers)exchangeType = "headers"; // send declare exchange frame - return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame"); + return push(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments)); } /** @@ -233,7 +242,7 @@ Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType typ Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { // send exchange bind frame - return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame"); + return push(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments)); } /** @@ -251,7 +260,7 @@ Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { // send exchange unbind frame - return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame"); + return push(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments)); } /** @@ -266,7 +275,7 @@ Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::stri Deferred &ChannelImpl::removeExchange(const std::string &name, int flags) { // send delete exchange frame - return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame"); + return push(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait)); } /** @@ -287,7 +296,7 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con auto *result = new DeferredQueue(send(frame)); // add the deferred result - push(result, "Cannot send queue declare frame"); + push(result); // done return *result; @@ -308,7 +317,7 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) { // send the bind queue frame - return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame"); + return push(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments)); } /** @@ -325,7 +334,7 @@ Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::str Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { // send the unbind queue frame - return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame"); + return push(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments)); } /** @@ -355,7 +364,7 @@ DeferredDelete &ChannelImpl::purgeQueue(const std::string &name, int flags) auto *deferred = new DeferredDelete(send(frame)); // push to list - push(deferred, "Cannot send queue purge frame"); + push(deferred); // done return *deferred; @@ -388,7 +397,7 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags) auto *deferred = new DeferredDelete(send(frame)); // push to list - push(deferred, "Cannot send remove queue frame"); + push(deferred); // done return *deferred; @@ -462,7 +471,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin Deferred &ChannelImpl::setQos(uint16_t prefetchCount) { // send a qos frame - return send(BasicQosFrame(_id, prefetchCount, false), "Cannot send basic QOS frame"); + return push(BasicQosFrame(_id, prefetchCount, false)); } /** @@ -494,7 +503,7 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri auto *deferred = new DeferredConsumer(this, send(frame)); // push to list - push(deferred, "Cannot send basic consume frame"); + push(deferred); // done return *deferred; @@ -527,7 +536,7 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag, int flags) auto *deferred = new DeferredCancel(this, send(frame)); // push to list - push(deferred, "Cannot send basic cancel frame"); + push(deferred); // done return *deferred; @@ -567,7 +576,7 @@ bool ChannelImpl::reject(uint64_t deliveryTag, int flags) Deferred &ChannelImpl::recover(int flags) { // send a nack frame - return send(BasicRecoverFrame(_id, flags & requeue), "Cannot send basic recover frame"); + return push(BasicRecoverFrame(_id, flags & requeue)); } /** @@ -584,24 +593,6 @@ bool ChannelImpl::send(const Frame &frame) return _connection->send(frame); } -/** - * Send a frame over the channel and get a deferred handler for it. - * - * @param frame frame to send - * @param message the message to trigger if the frame cannot be send at all - */ -Deferred &ChannelImpl::send(const Frame &frame, const char *message) -{ - // send the frame, and create deferred object - auto *deferred = new Deferred(send(frame)); - - // push to list - push(deferred, message); - - // done - return *deferred; -} - /** * Report the received message */ diff --git a/src/deferredconsumer.cpp b/src/deferredconsumer.cpp index eb8c6e1..528cbcb 100644 --- a/src/deferredconsumer.cpp +++ b/src/deferredconsumer.cpp @@ -22,8 +22,6 @@ Deferred *DeferredConsumer::reportSuccess(const std::string &name) const // we now know the name, so we can install the message callback on the channel _channel->install(name, _messageCallback); - // @todo when a consumer stops, we should uninstall the message callback - // skip if no special callback was installed if (!_consumeCallback) return Deferred::reportSuccess();