refactored dealing with error messages

This commit is contained in:
Emiel Bruijntjes 2014-04-15 13:01:27 +02:00
parent ae7a32a8bf
commit d08270701e
4 changed files with 54 additions and 60 deletions

View File

@ -107,9 +107,16 @@ private:
/**
* Push a deferred result
* @param result The deferred result
* @param error Error message in case the result is not ok
* @return Deferred The object just pushed
*/
void push(Deferred *deferred, const char *error);
Deferred &push(Deferred *deferred);
/**
* Send a framen and push a deferred result
* @param frame The frame to send
* @return Deferred The object just pushed
*/
Deferred &push(const Frame &frame);
public:
@ -422,18 +429,10 @@ public:
*/
bool send(const Frame &frame);
/**
* Send a frame over the channel and
* get a deferred handler for it.
*
* @param frame frame to send
* @param message the message to trigger if the frame cannot be send at all
* @return Deferred the deferred object
*/
Deferred &send(const Frame &frame, const char *message);
/**
* Report to the handler that the channel is opened
*
* @todo when is this sent?
*/
void reportReady()
{
@ -443,6 +442,8 @@ public:
/**
* Report to the handler that the channel is closed
*
* @todo do we need this?
*/
void reportClosed()
{
@ -450,6 +451,9 @@ public:
_state = state_closed;
// inform handler
// @todo do we report success here?
reportSuccess();
}

View File

@ -55,7 +55,7 @@ public:
* state and can no longer be used.
*
* This method has an empty default implementation, although you are very
* much advised to implement it. Because when an error occurs, the connection
* much advised to implement it. When an error occurs, the connection
* is no longer usable, so you probably want to know.
*
* @param connection The connection that entered the error state
@ -65,14 +65,15 @@ public:
/**
* Method that is called when the login attempt succeeded. After this method
* was called, the connection is ready to use. This is the first method
* is called, the connection is ready to use. This is the first method
* that is normally called after you've constructed the connection object.
*
* According to the AMQP protocol, you must wait for the connection to become
* ready (and this onConnected method to be called) before you can start
* using the Connection object. However, this AMQP library will cache all
* methods that you call before the connection is ready, so in reality there
* is no real reason to wait.
* is no real reason to wait for this method to be called before you send
* the first instructions.
*
* @param connection The connection that can now be used
*/

View File

@ -93,9 +93,8 @@ ChannelImpl::~ChannelImpl()
/**
* Push a deferred result
* @param result The deferred object to push
* @param error Error message in case of error
*/
void ChannelImpl::push(Deferred *deferred, const char *error)
Deferred &ChannelImpl::push(Deferred *deferred)
{
// do we already have an oldest?
if (!_oldestCallback) _oldestCallback.reset(deferred);
@ -107,6 +106,19 @@ void ChannelImpl::push(Deferred *deferred, const char *error)
_newestCallback = deferred;
// @todo in case of error we have to report the error with a timeout
// done
return *deferred;
}
/**
* Send a frame and push a deferred result
* @param frame The frame to send
*/
Deferred &ChannelImpl::push(const Frame &frame)
{
// send the frame, and push the result
return push(new Deferred(send(frame)));
}
/**
@ -120,7 +132,7 @@ void ChannelImpl::push(Deferred *deferred, const char *error)
Deferred &ChannelImpl::pause()
{
// send a channel flow frame
return send(ChannelFlowFrame(_id, false), "Cannot send channel flow frame");
return push(ChannelFlowFrame(_id, false));
}
/**
@ -134,7 +146,7 @@ Deferred &ChannelImpl::pause()
Deferred &ChannelImpl::resume()
{
// send a channel flow frame
return send(ChannelFlowFrame(_id, true), "Cannot send channel flow frame");
return push(ChannelFlowFrame(_id, true));
}
/**
@ -146,7 +158,7 @@ Deferred &ChannelImpl::resume()
Deferred &ChannelImpl::startTransaction()
{
// send a transaction frame
return send(TransactionSelectFrame(_id), "Cannot send transaction start frame");
return push(TransactionSelectFrame(_id));
}
/**
@ -158,7 +170,7 @@ Deferred &ChannelImpl::startTransaction()
Deferred &ChannelImpl::commitTransaction()
{
// send a transaction frame
return send(TransactionCommitFrame(_id), "Cannot send transaction commit frame");
return push(TransactionCommitFrame(_id));
}
/**
@ -170,7 +182,7 @@ Deferred &ChannelImpl::commitTransaction()
Deferred &ChannelImpl::rollbackTransaction()
{
// send a transaction frame
return send(TransactionRollbackFrame(_id), "Cannot send transaction commit frame");
return push(TransactionRollbackFrame(_id));
}
/**
@ -181,14 +193,11 @@ Deferred &ChannelImpl::rollbackTransaction()
*/
Deferred &ChannelImpl::close()
{
// channel could be dead after send operation, we need to monitor that
Monitor monitor(this);
// send a channel close frame
auto &handler = send(ChannelCloseFrame(_id), "Cannot send channel close frame");
auto &handler = push(ChannelCloseFrame(_id));
// was the frame sent and are we still alive?
if (handler && monitor.valid()) _state = state_closing;
if (handler) _state = state_closing;
// done
return handler;
@ -215,7 +224,7 @@ Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType typ
if (type == ExchangeType::headers)exchangeType = "headers";
// send declare exchange frame
return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments), "Cannot send exchange declare frame");
return push(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments));
}
/**
@ -233,7 +242,7 @@ Deferred &ChannelImpl::declareExchange(const std::string &name, ExchangeType typ
Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
{
// send exchange bind frame
return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange bind frame");
return push(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments));
}
/**
@ -251,7 +260,7 @@ Deferred &ChannelImpl::bindExchange(const std::string &source, const std::string
Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
{
// send exchange unbind frame
return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments), "Cannot send exchange unbind frame");
return push(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments));
}
/**
@ -266,7 +275,7 @@ Deferred &ChannelImpl::unbindExchange(const std::string &source, const std::stri
Deferred &ChannelImpl::removeExchange(const std::string &name, int flags)
{
// send delete exchange frame
return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait), "Cannot send exchange delete frame");
return push(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait));
}
/**
@ -287,7 +296,7 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con
auto *result = new DeferredQueue(send(frame));
// add the deferred result
push(result, "Cannot send queue declare frame");
push(result);
// done
return *result;
@ -308,7 +317,7 @@ DeferredQueue &ChannelImpl::declareQueue(const std::string &name, int flags, con
Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments)
{
// send the bind queue frame
return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments), "Cannot send queue bind frame");
return push(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments));
}
/**
@ -325,7 +334,7 @@ Deferred &ChannelImpl::bindQueue(const std::string &exchangeName, const std::str
Deferred &ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments)
{
// send the unbind queue frame
return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments), "Cannot send queue unbind frame");
return push(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments));
}
/**
@ -355,7 +364,7 @@ DeferredDelete &ChannelImpl::purgeQueue(const std::string &name, int flags)
auto *deferred = new DeferredDelete(send(frame));
// push to list
push(deferred, "Cannot send queue purge frame");
push(deferred);
// done
return *deferred;
@ -388,7 +397,7 @@ DeferredDelete &ChannelImpl::removeQueue(const std::string &name, int flags)
auto *deferred = new DeferredDelete(send(frame));
// push to list
push(deferred, "Cannot send remove queue frame");
push(deferred);
// done
return *deferred;
@ -462,7 +471,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
Deferred &ChannelImpl::setQos(uint16_t prefetchCount)
{
// send a qos frame
return send(BasicQosFrame(_id, prefetchCount, false), "Cannot send basic QOS frame");
return push(BasicQosFrame(_id, prefetchCount, false));
}
/**
@ -494,7 +503,7 @@ DeferredConsumer& ChannelImpl::consume(const std::string &queue, const std::stri
auto *deferred = new DeferredConsumer(this, send(frame));
// push to list
push(deferred, "Cannot send basic consume frame");
push(deferred);
// done
return *deferred;
@ -527,7 +536,7 @@ DeferredCancel &ChannelImpl::cancel(const std::string &tag, int flags)
auto *deferred = new DeferredCancel(this, send(frame));
// push to list
push(deferred, "Cannot send basic cancel frame");
push(deferred);
// done
return *deferred;
@ -567,7 +576,7 @@ bool ChannelImpl::reject(uint64_t deliveryTag, int flags)
Deferred &ChannelImpl::recover(int flags)
{
// send a nack frame
return send(BasicRecoverFrame(_id, flags & requeue), "Cannot send basic recover frame");
return push(BasicRecoverFrame(_id, flags & requeue));
}
/**
@ -584,24 +593,6 @@ bool ChannelImpl::send(const Frame &frame)
return _connection->send(frame);
}
/**
* Send a frame over the channel and get a deferred handler for it.
*
* @param frame frame to send
* @param message the message to trigger if the frame cannot be send at all
*/
Deferred &ChannelImpl::send(const Frame &frame, const char *message)
{
// send the frame, and create deferred object
auto *deferred = new Deferred(send(frame));
// push to list
push(deferred, message);
// done
return *deferred;
}
/**
* Report the received message
*/

View File

@ -22,8 +22,6 @@ Deferred *DeferredConsumer::reportSuccess(const std::string &name) const
// we now know the name, so we can install the message callback on the channel
_channel->install(name, _messageCallback);
// @todo when a consumer stops, we should uninstall the message callback
// skip if no special callback was installed
if (!_consumeCallback) return Deferred::reportSuccess();