Removed and implemented many @todo tags from the source code, and added extra safety checks if some ignorant user decides to destruct channels or objects while they are still in use, implemented correct closing handshake for both connections and channels
This commit is contained in:
parent
d1ab8b179a
commit
a9e6045414
|
|
@ -29,7 +29,7 @@ public:
|
|||
|
||||
/**
|
||||
* An error has occured on the channel
|
||||
* The channel is no longer usable after an error has occured on it
|
||||
* The channel is no longer usable after an error has occured on it.
|
||||
* @param channel the channel on which the error occured
|
||||
* @param message human readable error message
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -27,9 +27,9 @@ private:
|
|||
|
||||
/**
|
||||
* Pointer to the connection
|
||||
* @var Connection
|
||||
* @var ConnectionImpl
|
||||
*/
|
||||
Connection *_connection;
|
||||
ConnectionImpl *_connection;
|
||||
|
||||
/**
|
||||
* The handler that is notified about events
|
||||
|
|
@ -43,12 +43,19 @@ private:
|
|||
*/
|
||||
uint16_t _id;
|
||||
|
||||
/**
|
||||
* Monitor to check if the connection is still alive
|
||||
* @var Monitor
|
||||
*/
|
||||
Monitor _monitor;
|
||||
|
||||
/**
|
||||
* State of the channel object
|
||||
* @var enum
|
||||
*/
|
||||
enum {
|
||||
state_connected,
|
||||
state_closing,
|
||||
state_closed
|
||||
} _state = state_connected;
|
||||
|
||||
|
|
@ -224,7 +231,9 @@ public:
|
|||
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
|
||||
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
|
||||
*
|
||||
* @todo implement to onReturned() method
|
||||
* If the mandatory or immediate flag is set, and the message could not immediately
|
||||
* be published, the message will be returned to the client, and will eventually
|
||||
* end up in your ChannelHandler::onReturned() method.
|
||||
*
|
||||
* @param exchange the exchange to publish to
|
||||
* @param routingkey the routing key
|
||||
|
|
@ -301,9 +310,9 @@ public:
|
|||
/**
|
||||
* Send a frame over the channel
|
||||
* @param frame frame to send
|
||||
* @return size_t number of bytes sent
|
||||
* @return bool was frame succesfully sent?
|
||||
*/
|
||||
size_t send(const Frame &frame);
|
||||
bool send(const Frame &frame);
|
||||
|
||||
/**
|
||||
* Report to the handler that the channel is closed
|
||||
|
|
|
|||
|
|
@ -40,18 +40,40 @@ public:
|
|||
* After this method is called, the connection no longer is in a valid
|
||||
* state and can no longer be used.
|
||||
*
|
||||
* This method has an empty default implementation, although you are very
|
||||
* much advised to do implement it. Because when an error occurs, the connection
|
||||
* is no longer usable, so you probably want to know.
|
||||
*
|
||||
* @param connection The connection that entered the error state
|
||||
* @param message Error message
|
||||
*/
|
||||
virtual void onError(Connection *connection, const std::string &message) = 0;
|
||||
virtual void onError(Connection *connection, const std::string &message) {}
|
||||
|
||||
/**
|
||||
* Method that is called when the login attempt succeeded. After this method
|
||||
* was called, the connection is ready to use
|
||||
* was called, the connection is ready to use. This is the first method
|
||||
* that is normally called after you've constructed the connection object.
|
||||
*
|
||||
* According to the AMQP protocol, you must wait for the connection to become
|
||||
* ready (and this onConnected method to be called) before you can start
|
||||
* using the Connection object. However, this AMQP library will cache all
|
||||
* methods that you call before the connection is ready, so in reality there
|
||||
* is no reason to wait.
|
||||
*
|
||||
* @param connection The connection that can now be used
|
||||
*/
|
||||
virtual void onConnected(Connection *connection) = 0;
|
||||
virtual void onConnected(Connection *connection) {}
|
||||
|
||||
/**
|
||||
* Method that is called when the connection was closed.
|
||||
*
|
||||
* This is the counter part of a call to Connection::close() and it confirms
|
||||
* that the connection was correctly closed.
|
||||
*
|
||||
* @param connection The connection that was closed and that is now unusable
|
||||
*/
|
||||
virtual void onClosed(Connection *connection) {}
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -37,10 +37,10 @@ protected:
|
|||
* @var enum
|
||||
*/
|
||||
enum {
|
||||
state_invalid, // object is in an invalid state
|
||||
state_protocol, // protocol headers are being passed
|
||||
state_handshake, // busy with the handshake to open the connection
|
||||
state_connected, // connection is set up and ready for communication
|
||||
state_closing, // connection is busy closing (we have sent the close frame)
|
||||
state_closed // connection is closed
|
||||
} _state = state_protocol;
|
||||
|
||||
|
|
@ -236,9 +236,9 @@ public:
|
|||
* This is an internal method that you normally do not have to call yourself
|
||||
*
|
||||
* @param frame the frame to send
|
||||
* @return number of bytes sent
|
||||
* @return bool
|
||||
*/
|
||||
size_t send(const Frame &frame);
|
||||
bool send(const Frame &frame);
|
||||
|
||||
/**
|
||||
* Get a channel by its identifier
|
||||
|
|
@ -262,17 +262,23 @@ public:
|
|||
*/
|
||||
void reportError(const std::string &message)
|
||||
{
|
||||
// close everything
|
||||
// @todo is this not duplicate?
|
||||
close();
|
||||
|
||||
// set connection state to closed
|
||||
_state = state_closed;
|
||||
|
||||
// inform handler
|
||||
_handler->onError(_parent, message);
|
||||
}
|
||||
|
||||
// @Todo: notify all channels of closed connection
|
||||
/**
|
||||
* Report that the connection is closed
|
||||
*/
|
||||
void reportClosed()
|
||||
{
|
||||
// change state
|
||||
_state = state_closed;
|
||||
|
||||
// inform the handler
|
||||
_handler->onClosed(_parent);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -280,7 +286,6 @@ public:
|
|||
*/
|
||||
friend class Connection;
|
||||
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -55,8 +55,6 @@ public:
|
|||
* Get the type ID that is used to identify this type of
|
||||
* field in a field table
|
||||
* @return char
|
||||
*
|
||||
* @todo check if all derived classes use the 'override' keyword
|
||||
*/
|
||||
virtual char typeID() const = 0;
|
||||
|
||||
|
|
|
|||
|
|
@ -44,13 +44,8 @@ public:
|
|||
// get the size
|
||||
T size(frame);
|
||||
|
||||
// read data
|
||||
const char *data = frame.nextData(size.value());
|
||||
|
||||
// @todo what if this fails?
|
||||
|
||||
// allocate string
|
||||
_data = std::string((char*) data, (size_t) size.value());
|
||||
_data = std::string(frame.nextData(size.value()), (size_t) size.value());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -61,9 +56,6 @@ public:
|
|||
/**
|
||||
* Create a new instance of this object
|
||||
* @return Field*
|
||||
*
|
||||
* @todo can this be protected?
|
||||
* @todo check if all clone methods have a override keyword
|
||||
*/
|
||||
virtual Field *clone() const override
|
||||
{
|
||||
|
|
|
|||
|
|
@ -74,8 +74,6 @@ public:
|
|||
/**
|
||||
* Return the method ID
|
||||
* @return uint16_t
|
||||
*
|
||||
* @todo check if all other implementations use override keyword
|
||||
*/
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ public:
|
|||
* Return the method ID
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 10;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ public:
|
|||
|
||||
/**
|
||||
* Method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
|
|
@ -68,7 +69,7 @@ public:
|
|||
ChannelImpl *channel = connection->channel(this->channel());
|
||||
|
||||
// channel does not exist
|
||||
if(!channel) return false;
|
||||
if (!channel) return false;
|
||||
|
||||
// report that the channel is closed
|
||||
channel->reportClosed();
|
||||
|
|
|
|||
|
|
@ -38,8 +38,6 @@ public:
|
|||
/**
|
||||
* Class id
|
||||
* @return uint16_t
|
||||
*
|
||||
* @todo check if override keyword is used in all places
|
||||
*/
|
||||
virtual uint16_t classID() const override
|
||||
{
|
||||
|
|
|
|||
|
|
@ -50,11 +50,12 @@ namespace AMQP {
|
|||
*/
|
||||
ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler *handler) :
|
||||
_parent(parent),
|
||||
_connection(connection),
|
||||
_handler(handler)
|
||||
_connection(&connection->_implementation),
|
||||
_handler(handler),
|
||||
_monitor(_connection)
|
||||
{
|
||||
// add the channel to the connection
|
||||
_id = connection->_implementation.add(this);
|
||||
_id = _connection->add(this);
|
||||
|
||||
// check if the id is valid
|
||||
if (_id == 0)
|
||||
|
|
@ -84,17 +85,11 @@ ChannelImpl::~ChannelImpl()
|
|||
if (_message) delete _message;
|
||||
_message = nullptr;
|
||||
|
||||
// remove this channel from the connection
|
||||
_connection->_implementation.remove(this);
|
||||
|
||||
// leap out if already disconnected
|
||||
if (!connected()) return;
|
||||
// remove this channel from the connection (but not if the connection is already destructed)
|
||||
if (_monitor.valid()) _connection->remove(this);
|
||||
|
||||
// close the channel now
|
||||
// @todo is this ok?
|
||||
close();
|
||||
|
||||
// do we have
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -110,14 +105,8 @@ ChannelImpl::~ChannelImpl()
|
|||
*/
|
||||
bool ChannelImpl::pause()
|
||||
{
|
||||
// must be connected
|
||||
if (!connected()) return false;
|
||||
|
||||
// send a flow frame
|
||||
send(ChannelFlowFrame(_id, false));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(ChannelFlowFrame(_id, false));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -127,14 +116,8 @@ bool ChannelImpl::pause()
|
|||
*/
|
||||
bool ChannelImpl::resume()
|
||||
{
|
||||
// must be connected
|
||||
if (!connected()) return false;
|
||||
|
||||
// send a flow frame
|
||||
send(ChannelFlowFrame(_id, true));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(ChannelFlowFrame(_id, true));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -143,14 +126,8 @@ bool ChannelImpl::resume()
|
|||
*/
|
||||
bool ChannelImpl::startTransaction()
|
||||
{
|
||||
// must be connected
|
||||
if (!connected()) return false;
|
||||
|
||||
// send a flow frame
|
||||
send(TransactionSelectFrame(_id));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(TransactionSelectFrame(_id));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -159,14 +136,8 @@ bool ChannelImpl::startTransaction()
|
|||
*/
|
||||
bool ChannelImpl::commitTransaction()
|
||||
{
|
||||
// must be connected
|
||||
if (!connected()) return false;
|
||||
|
||||
// send a flow frame
|
||||
send(TransactionCommitFrame(_id));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(TransactionCommitFrame(_id));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -175,14 +146,8 @@ bool ChannelImpl::commitTransaction()
|
|||
*/
|
||||
bool ChannelImpl::rollbackTransaction()
|
||||
{
|
||||
// must be connected
|
||||
if (!connected()) return false;
|
||||
|
||||
// send a flow frame
|
||||
send(TransactionRollbackFrame(_id));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(TransactionRollbackFrame(_id));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -191,14 +156,17 @@ bool ChannelImpl::rollbackTransaction()
|
|||
*/
|
||||
bool ChannelImpl::close()
|
||||
{
|
||||
// must be connected
|
||||
if (!connected()) return false;
|
||||
// channel could be dead after send operation, we need to monitor that
|
||||
Monitor monitor(this);
|
||||
|
||||
// send a flow frame
|
||||
send(ChannelCloseFrame(_id));
|
||||
if (!send(ChannelCloseFrame(_id))) return false;
|
||||
|
||||
// now it is closed
|
||||
_state = state_closed;
|
||||
// leap out if channel was destructed
|
||||
if (!monitor.valid()) return true;
|
||||
|
||||
// now it is closing
|
||||
_state = state_closing;
|
||||
|
||||
// done
|
||||
return true;
|
||||
|
|
@ -214,19 +182,15 @@ bool ChannelImpl::close()
|
|||
*/
|
||||
bool ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments)
|
||||
{
|
||||
// must be connected
|
||||
if(!connected()) return false;
|
||||
|
||||
// convert exchange type
|
||||
std::string exchangeType;
|
||||
if(type == ExchangeType::fanout) exchangeType = "fanout";
|
||||
if(type == ExchangeType::direct) exchangeType = "direct";
|
||||
if(type == ExchangeType::topic) exchangeType = "topic";
|
||||
if(type == ExchangeType::headers)exchangeType = "headers";
|
||||
// send declare exchange frame
|
||||
send(ExchangeDeclareFrame(_id, name, exchangeType, (flags & passive) != 0, (flags & durable) != 0, (flags & nowait) != 0, arguments));
|
||||
if (type == ExchangeType::fanout) exchangeType = "fanout";
|
||||
if (type == ExchangeType::direct) exchangeType = "direct";
|
||||
if (type == ExchangeType::topic) exchangeType = "topic";
|
||||
if (type == ExchangeType::headers)exchangeType = "headers";
|
||||
|
||||
// done
|
||||
return true;
|
||||
// send declare exchange frame
|
||||
return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -240,14 +204,8 @@ bool ChannelImpl::declareExchange(const std::string &name, ExchangeType type, in
|
|||
*/
|
||||
bool ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
|
||||
{
|
||||
// must be connected
|
||||
if(!connected()) return false;
|
||||
|
||||
// send exchange bind frame
|
||||
send(ExchangeBindFrame(_id, target, source, routingkey, (flags & nowait) != 0, arguments));
|
||||
|
||||
//done
|
||||
return true;
|
||||
return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -261,14 +219,8 @@ bool ChannelImpl::bindExchange(const std::string &source, const std::string &tar
|
|||
*/
|
||||
bool ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments)
|
||||
{
|
||||
// must be connected
|
||||
if (!connected()) return false;
|
||||
|
||||
// send exchange unbind frame
|
||||
send(ExchangeUnbindFrame(_id, target, source, routingkey, (flags & nowait) != 0, arguments));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -279,14 +231,8 @@ bool ChannelImpl::unbindExchange(const std::string &source, const std::string &t
|
|||
*/
|
||||
bool ChannelImpl::removeExchange(const std::string &name, int flags)
|
||||
{
|
||||
// must be connected
|
||||
if (!connected()) return false;
|
||||
|
||||
// send delete exchange frame
|
||||
send(ExchangeDeleteFrame(_id, name, (flags & ifunused) != 0, (flags & nowait) != 0));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -298,14 +244,8 @@ bool ChannelImpl::removeExchange(const std::string &name, int flags)
|
|||
*/
|
||||
bool ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments)
|
||||
{
|
||||
// must be connected
|
||||
if (!connected()) return false;
|
||||
|
||||
// send the queuedeclareframe
|
||||
send(QueueDeclareFrame(_id, name, (flags & passive) != 0, (flags & durable) != 0, (flags & durable) != 0, (flags & autodelete) != 0, (flags & nowait) != 0, arguments));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & durable, flags & autodelete, flags & nowait, arguments));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -319,14 +259,8 @@ bool ChannelImpl::declareQueue(const std::string &name, int flags, const Table &
|
|||
*/
|
||||
bool ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments)
|
||||
{
|
||||
// must be connected
|
||||
if(!connected()) return false;
|
||||
|
||||
// send the bind queue frame
|
||||
send(QueueBindFrame(_id, queueName, exchangeName, routingkey, (flags & nowait) != 0, arguments));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -339,14 +273,8 @@ bool ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &
|
|||
*/
|
||||
bool ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments)
|
||||
{
|
||||
// must be connected
|
||||
if(!connected()) return false;
|
||||
|
||||
// send the unbind queue frame
|
||||
send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -357,14 +285,8 @@ bool ChannelImpl::unbindQueue(const std::string &exchange, const std::string &qu
|
|||
*/
|
||||
bool ChannelImpl::purgeQueue(const std::string &name, int flags)
|
||||
{
|
||||
// must be connected
|
||||
if(!connected()) return false;
|
||||
|
||||
// send the queue purge frame
|
||||
send(QueuePurgeFrame(_id, name, (flags & nowait) != 0));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(QueuePurgeFrame(_id, name, flags & nowait));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -375,14 +297,8 @@ bool ChannelImpl::purgeQueue(const std::string &name, int flags)
|
|||
*/
|
||||
bool ChannelImpl::removeQueue(const std::string &name, int flags)
|
||||
{
|
||||
// must be connected
|
||||
if(!connected()) return false;
|
||||
|
||||
// send the remove queue frame
|
||||
send(QueueDeleteFrame(_id, name, (flags & ifunused) != 0,(flags & ifempty) != 0,(flags & nowait) != 0));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -393,8 +309,6 @@ bool ChannelImpl::removeQueue(const std::string &name, int flags)
|
|||
* - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method
|
||||
* - immediate if set, a message that could not immediately be consumed is returned to the onReturned method
|
||||
*
|
||||
* @todo implement to onReturned() method
|
||||
*
|
||||
* @param exchange the exchange to publish to
|
||||
* @param routingkey the routing key
|
||||
* @param flags optional flags (see above)
|
||||
|
|
@ -404,18 +318,26 @@ bool ChannelImpl::removeQueue(const std::string &name, int flags)
|
|||
*/
|
||||
bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope)
|
||||
{
|
||||
// @todo prevent crash when connection is destructed
|
||||
// we are going to send out multiple frames, each one will trigger a call to the handler,
|
||||
// which in turn could destruct the channel object, we need to monitor that
|
||||
Monitor monitor(this);
|
||||
|
||||
// @todo do not copy the entire buffer to individual frames
|
||||
|
||||
// send the publish frame
|
||||
send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate));
|
||||
if (!send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate))) return false;
|
||||
|
||||
// channel still valid?
|
||||
if (!monitor.valid()) return false;
|
||||
|
||||
// send header
|
||||
send(BasicHeaderFrame(_id, envelope));
|
||||
if (!send(BasicHeaderFrame(_id, envelope))) return false;
|
||||
|
||||
// channel and connection still valid?
|
||||
if (!monitor.valid() || !_monitor.valid()) return false;
|
||||
|
||||
// the max payload size is the max frame size minus the bytes for headers and trailer
|
||||
uint32_t maxpayload = _connection->_implementation.maxPayload();
|
||||
uint32_t maxpayload = _connection->maxPayload();
|
||||
uint32_t bytessent = 0;
|
||||
|
||||
// the buffer
|
||||
|
|
@ -429,7 +351,10 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
|
|||
uint32_t chunksize = std::min(maxpayload, bytesleft);
|
||||
|
||||
// send out a body frame
|
||||
send(BodyFrame(_id, data + bytessent, chunksize));
|
||||
if (!send(BodyFrame(_id, data + bytessent, chunksize))) return false;
|
||||
|
||||
// channel still valid?
|
||||
if (!monitor.valid()) return false;
|
||||
|
||||
// update counters
|
||||
bytessent += chunksize;
|
||||
|
|
@ -448,10 +373,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
|
|||
bool ChannelImpl::setQos(uint16_t prefetchCount)
|
||||
{
|
||||
// send a qos frame
|
||||
send(BasicQosFrame(_id, prefetchCount, false));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(BasicQosFrame(_id, prefetchCount, false));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -465,10 +387,7 @@ bool ChannelImpl::setQos(uint16_t prefetchCount)
|
|||
bool ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments)
|
||||
{
|
||||
// send a consume frame
|
||||
send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -479,10 +398,7 @@ bool ChannelImpl::consume(const std::string &queue, const std::string &tag, int
|
|||
bool ChannelImpl::cancel(const std::string &tag, int flags)
|
||||
{
|
||||
// send a cancel frame
|
||||
send(BasicCancelFrame(_id, tag, flags & nowait));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(BasicCancelFrame(_id, tag, flags & nowait));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -494,10 +410,7 @@ bool ChannelImpl::cancel(const std::string &tag, int flags)
|
|||
bool ChannelImpl::ack(uint64_t deliveryTag, int flags)
|
||||
{
|
||||
// send an ack frame
|
||||
send(BasicAckFrame(_id, deliveryTag, flags & multiple));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(BasicAckFrame(_id, deliveryTag, flags & multiple));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -509,10 +422,7 @@ bool ChannelImpl::ack(uint64_t deliveryTag, int flags)
|
|||
bool ChannelImpl::reject(uint64_t deliveryTag, int flags)
|
||||
{
|
||||
// send a nack frame
|
||||
send(BasicNackFrame(_id, deliveryTag, flags & multiple, flags & requeue));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(BasicNackFrame(_id, deliveryTag, flags & multiple, flags & requeue));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -523,21 +433,21 @@ bool ChannelImpl::reject(uint64_t deliveryTag, int flags)
|
|||
bool ChannelImpl::recover(int flags)
|
||||
{
|
||||
// send a nack frame
|
||||
send(BasicRecoverFrame(_id, flags & requeue));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return send(BasicRecoverFrame(_id, flags & requeue));
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a frame over the channel
|
||||
* @param frame frame to send
|
||||
* @return size_t number of bytes sent
|
||||
* @return bool was the frame sent?
|
||||
*/
|
||||
size_t ChannelImpl::send(const Frame &frame)
|
||||
bool ChannelImpl::send(const Frame &frame)
|
||||
{
|
||||
// send to tcp connection
|
||||
return _connection->_implementation.send(frame);
|
||||
// skip if channel is not connected
|
||||
if (_state != state_connected) return false;
|
||||
|
||||
// send to tcp connection (first check if connection object was not destructed)
|
||||
return _monitor.valid() && _connection->send(frame);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ public:
|
|||
* Method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 50;
|
||||
}
|
||||
|
|
@ -146,11 +146,15 @@ public:
|
|||
*/
|
||||
virtual bool process(ConnectionImpl *connection) override
|
||||
{
|
||||
// @todo connection could be destructed after frame was sent
|
||||
// we need the monitor because the connection could be destructed in the meantime
|
||||
Monitor monitor(connection);
|
||||
|
||||
// send back the ok frame
|
||||
connection->send(ConnectionCloseOKFrame());
|
||||
|
||||
// check if connection still exists
|
||||
if (!monitor.valid()) return false;
|
||||
|
||||
// no need to check for a channel, the error is connection wide
|
||||
// report the error on the connection
|
||||
connection->reportError(text());
|
||||
|
|
|
|||
|
|
@ -54,6 +54,19 @@ public:
|
|||
{
|
||||
return 51;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the frame
|
||||
* @param connection
|
||||
*/
|
||||
virtual bool process(ConnectionImpl *connection) override
|
||||
{
|
||||
// report that it is closed
|
||||
connection->reportClosed();
|
||||
|
||||
// done
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@
|
|||
*/
|
||||
#include "includes.h"
|
||||
#include "protocolheaderframe.h"
|
||||
#include "connectioncloseokframe.h"
|
||||
#include "connectioncloseframe.h"
|
||||
|
||||
/**
|
||||
* set namespace
|
||||
|
|
@ -29,11 +31,8 @@ namespace AMQP {
|
|||
ConnectionImpl::ConnectionImpl(Connection *parent, ConnectionHandler *handler, const Login &login, const std::string &vhost) :
|
||||
_parent(parent), _handler(handler), _login(login), _vhost(vhost)
|
||||
{
|
||||
// we need a protocol header
|
||||
ProtocolHeaderFrame header;
|
||||
|
||||
// send out the protocol header
|
||||
send(header);
|
||||
// we need to send a protocol header
|
||||
send(ProtocolHeaderFrame());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -41,10 +40,7 @@ ConnectionImpl::ConnectionImpl(Connection *parent, ConnectionHandler *handler, c
|
|||
*/
|
||||
ConnectionImpl::~ConnectionImpl()
|
||||
{
|
||||
// still connected
|
||||
if (_state == state_invalid) return;
|
||||
|
||||
// still in a connected state - should we send the close frame?
|
||||
// close the connection in a nice fashion
|
||||
close();
|
||||
}
|
||||
|
||||
|
|
@ -107,7 +103,8 @@ void ConnectionImpl::remove(ChannelImpl *channel)
|
|||
*/
|
||||
size_t ConnectionImpl::parse(char *buffer, size_t size)
|
||||
{
|
||||
// @todo do not parse if already in an error state
|
||||
// do not parse if already in an error state
|
||||
if (_state == state_closed) return 0;
|
||||
|
||||
// number of bytes processed
|
||||
size_t processed = 0;
|
||||
|
|
@ -156,18 +153,30 @@ size_t ConnectionImpl::parse(char *buffer, size_t size)
|
|||
*/
|
||||
bool ConnectionImpl::close()
|
||||
{
|
||||
// leap out if not yet connected
|
||||
if (_state != state_connected) return false;
|
||||
// leap out if already closed or closing
|
||||
if (_state == state_closed || _state == state_closing) return false;
|
||||
|
||||
// after the send operation the object could be dead
|
||||
Monitor monitor(this);
|
||||
|
||||
// loop over all channels
|
||||
for (auto iter = _channels.begin(); iter != _channels.end(); iter++)
|
||||
{
|
||||
// close the channel
|
||||
iter->second->close();
|
||||
|
||||
// we could be dead now
|
||||
if (!monitor.valid()) return true;
|
||||
}
|
||||
|
||||
// send the close frame
|
||||
if (!send(ConnectionCloseFrame(0, "shutdown"))) return false;
|
||||
|
||||
// leap out if object no longer is alive
|
||||
if (!monitor.valid()) return true;
|
||||
|
||||
// we're in a new state
|
||||
_state = state_invalid;
|
||||
_state = state_closing;
|
||||
|
||||
// done
|
||||
return true;
|
||||
|
|
@ -212,10 +221,13 @@ void ConnectionImpl::setConnected()
|
|||
/**
|
||||
* Send a frame over the connection
|
||||
* @param frame The frame to send
|
||||
* @return size_t Number of bytes sent
|
||||
* @return bool Was the frame succesfully sent
|
||||
*/
|
||||
size_t ConnectionImpl::send(const Frame &frame)
|
||||
bool ConnectionImpl::send(const Frame &frame)
|
||||
{
|
||||
// its not possible to send anything if closed or closing down
|
||||
if (_state == state_closing || _state == state_closed) return false;
|
||||
|
||||
// we need an output buffer
|
||||
OutBuffer buffer(frame.totalSize());
|
||||
|
||||
|
|
@ -226,19 +238,19 @@ size_t ConnectionImpl::send(const Frame &frame)
|
|||
if (frame.needsSeparator()) buffer.add((uint8_t)206);
|
||||
|
||||
// are we still setting up the connection?
|
||||
if ((_state == state_protocol || _state == state_handshake) && !frame.partOfHandshake())
|
||||
{
|
||||
// the connection is still being set up, so we need to delay the message sending
|
||||
_queue.push(std::move(buffer));
|
||||
}
|
||||
else
|
||||
if (_state == state_connected || frame.partOfHandshake())
|
||||
{
|
||||
// send the buffer
|
||||
_handler->onData(_parent, buffer.data(), buffer.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
// the connection is still being set up, so we need to delay the message sending
|
||||
_queue.push(std::move(buffer));
|
||||
}
|
||||
|
||||
// done
|
||||
return buffer.size();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -85,8 +85,9 @@ public:
|
|||
|
||||
/**
|
||||
* Method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 40;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -173,18 +173,15 @@ public:
|
|||
*/
|
||||
virtual bool process(ConnectionImpl *connection) override
|
||||
{
|
||||
// @todo we must still be in protocol handshake mode
|
||||
|
||||
|
||||
// the peer properties
|
||||
Table properties;
|
||||
|
||||
// fill the peer properties
|
||||
properties["product"] = "Copernica AMQP library";
|
||||
properties["version"] = "0.1";
|
||||
properties["platform"] = "Ubuntu";
|
||||
properties["version"] = "Unknown";
|
||||
properties["platform"] = "Unknown";
|
||||
properties["copyright"] = "Copyright 2014 Copernica BV";
|
||||
properties["information"] = "";
|
||||
properties["information"] = "http://www.copernica.com";
|
||||
|
||||
// move connection to handshake mode
|
||||
connection->setProtocolOk();
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ public:
|
|||
/**
|
||||
* Method id
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 11;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -131,9 +131,6 @@ public:
|
|||
*/
|
||||
virtual bool process(ConnectionImpl *connection) override
|
||||
{
|
||||
// @todo this is only allowed when the connection is set up
|
||||
|
||||
|
||||
// remember this in the connection
|
||||
connection->setCapacity(channelMax(), frameMax());
|
||||
|
||||
|
|
@ -147,10 +144,7 @@ public:
|
|||
if (!monitor.valid()) return true;
|
||||
|
||||
// and finally we start to open the frame
|
||||
connection->send(ConnectionOpenFrame(connection->vhost()));
|
||||
|
||||
// done
|
||||
return true;
|
||||
return connection->send(ConnectionOpenFrame(connection->vhost()));
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ public:
|
|||
* Method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 31;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ public:
|
|||
* Method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 10;
|
||||
}
|
||||
|
|
@ -120,7 +120,7 @@ public:
|
|||
* The exchange name
|
||||
* @return string
|
||||
*/
|
||||
const std::string& name()
|
||||
const std::string& name() const
|
||||
{
|
||||
return _name;
|
||||
}
|
||||
|
|
@ -129,7 +129,7 @@ public:
|
|||
* The exchange type
|
||||
* @return string
|
||||
*/
|
||||
const std::string& type()
|
||||
const std::string &exchangeType() const
|
||||
{
|
||||
return _type;
|
||||
}
|
||||
|
|
@ -138,7 +138,7 @@ public:
|
|||
* Passive declaration, do not create exchange if it does not exist
|
||||
* @return bool
|
||||
*/
|
||||
bool passive()
|
||||
bool passive() const
|
||||
{
|
||||
return _bools.get(0);
|
||||
}
|
||||
|
|
@ -147,7 +147,7 @@ public:
|
|||
* Durable exchange
|
||||
* @return bool
|
||||
*/
|
||||
bool durable()
|
||||
bool durable() const
|
||||
{
|
||||
return _bools.get(1);
|
||||
}
|
||||
|
|
@ -156,7 +156,7 @@ public:
|
|||
* Do not wait for a response
|
||||
* @return bool
|
||||
*/
|
||||
bool noWait()
|
||||
bool noWait() const
|
||||
{
|
||||
return _bools.get(4);
|
||||
}
|
||||
|
|
@ -165,7 +165,7 @@ public:
|
|||
* Additional arguments. Implementation dependent.
|
||||
* @return Table
|
||||
*/
|
||||
const Table& arguments()
|
||||
const Table& arguments() const
|
||||
{
|
||||
return _arguments;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ public:
|
|||
* returns the method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 20;
|
||||
}
|
||||
|
|
@ -97,7 +97,7 @@ public:
|
|||
* returns the exchange name
|
||||
* @return string
|
||||
*/
|
||||
const std::string& name()
|
||||
const std::string& name() const
|
||||
{
|
||||
return _name;
|
||||
}
|
||||
|
|
@ -106,7 +106,7 @@ public:
|
|||
* returns whether to delete if unused
|
||||
* @return bool
|
||||
*/
|
||||
bool ifUnused()
|
||||
bool ifUnused() const
|
||||
{
|
||||
return _bools.get(0);
|
||||
}
|
||||
|
|
@ -115,7 +115,7 @@ public:
|
|||
* returns whether to wait for a response
|
||||
* @return bool
|
||||
*/
|
||||
bool noWait()
|
||||
bool noWait() const
|
||||
{
|
||||
return _bools.get(1);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ public:
|
|||
* Returns the method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 20;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ public:
|
|||
* returns the method id
|
||||
* @return string
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 10;
|
||||
}
|
||||
|
|
@ -112,7 +112,7 @@ public:
|
|||
* return the queue name
|
||||
* @return string
|
||||
*/
|
||||
const std::string& name()
|
||||
const std::string& name() const
|
||||
{
|
||||
return _name;
|
||||
}
|
||||
|
|
@ -121,7 +121,7 @@ public:
|
|||
* returns value of passive declaration, do not create queue if it does not exist
|
||||
* @return bool
|
||||
*/
|
||||
bool passive()
|
||||
bool passive() const
|
||||
{
|
||||
return _bools.get(0);
|
||||
}
|
||||
|
|
@ -130,7 +130,7 @@ public:
|
|||
* returns whether the queue is durable
|
||||
* @return bool
|
||||
*/
|
||||
bool durable()
|
||||
bool durable() const
|
||||
{
|
||||
return _bools.get(1);
|
||||
}
|
||||
|
|
@ -139,7 +139,7 @@ public:
|
|||
* returns whether the queue is exclusive
|
||||
* @return bool
|
||||
*/
|
||||
bool exclusive()
|
||||
bool exclusive() const
|
||||
{
|
||||
return _bools.get(2);
|
||||
}
|
||||
|
|
@ -148,7 +148,7 @@ public:
|
|||
* returns whether the queue is deleted if unused
|
||||
* @return bool
|
||||
*/
|
||||
bool autoDelete()
|
||||
bool autoDelete() const
|
||||
{
|
||||
return _bools.get(3);
|
||||
}
|
||||
|
|
@ -166,7 +166,7 @@ public:
|
|||
* returns additional arguments. Implementation dependant.
|
||||
* @return Table
|
||||
*/
|
||||
const Table& arguments()
|
||||
const Table& arguments() const
|
||||
{
|
||||
return _arguments;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ public:
|
|||
* returns the method id
|
||||
* @returns uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 40;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ public:
|
|||
* The method ID
|
||||
* @return method id
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 30;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ public:
|
|||
* returns the method id
|
||||
* @returns uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 50;
|
||||
}
|
||||
|
|
@ -115,7 +115,7 @@ public:
|
|||
* returns the queue name
|
||||
* @returns string
|
||||
*/
|
||||
const std::string& name()
|
||||
const std::string& name() const
|
||||
{
|
||||
return _name;
|
||||
}
|
||||
|
|
@ -124,7 +124,7 @@ public:
|
|||
* returns the exchange name
|
||||
* @returns string
|
||||
*/
|
||||
const std::string& exchange()
|
||||
const std::string& exchange() const
|
||||
{
|
||||
return _exchange;
|
||||
}
|
||||
|
|
@ -133,7 +133,7 @@ public:
|
|||
* returns the routingKey
|
||||
* @returns string
|
||||
*/
|
||||
const std::string& routingKey()
|
||||
const std::string& routingKey() const
|
||||
{
|
||||
return _routingKey;
|
||||
}
|
||||
|
|
@ -142,7 +142,7 @@ public:
|
|||
* returns the additional arguments
|
||||
* @returns Table
|
||||
*/
|
||||
const Table& arguments()
|
||||
const Table& arguments() const
|
||||
{
|
||||
return _arguments;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,18 +14,6 @@ namespace AMQP {
|
|||
*/
|
||||
class TransactionCommitFrame : public TransactionFrame
|
||||
{
|
||||
protected:
|
||||
/**
|
||||
* Encode a frame on a string buffer
|
||||
*
|
||||
* @param buffer buffer to write frame to
|
||||
*/
|
||||
virtual void fill(OutBuffer& buffer) const override
|
||||
{
|
||||
// call base
|
||||
TransactionFrame::fill(buffer);
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Destructor
|
||||
|
|
@ -54,7 +42,7 @@ public:
|
|||
* return the method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 20;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,18 +14,6 @@ namespace AMQP {
|
|||
*/
|
||||
class TransactionRollbackFrame : public TransactionFrame
|
||||
{
|
||||
protected:
|
||||
/**
|
||||
* Encode a frame on a string buffer
|
||||
*
|
||||
* @param buffer buffer to write frame to
|
||||
*/
|
||||
virtual void fill(OutBuffer& buffer) const override
|
||||
{
|
||||
// call base
|
||||
TransactionFrame::fill(buffer);
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Destructor
|
||||
|
|
@ -55,7 +43,7 @@ public:
|
|||
* return the method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 30;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,18 +14,6 @@ namespace AMQP {
|
|||
*/
|
||||
class TransactionSelectFrame : public TransactionFrame
|
||||
{
|
||||
protected:
|
||||
/**
|
||||
* Encode a frame on a string buffer
|
||||
*
|
||||
* @param buffer buffer to write frame to
|
||||
*/
|
||||
virtual void fill(OutBuffer& buffer) const override
|
||||
{
|
||||
// call base
|
||||
TransactionFrame::fill(buffer);
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Decode a transaction select frame from a received frame
|
||||
|
|
@ -55,7 +43,7 @@ public:
|
|||
* return the method id
|
||||
* @return uint16_t
|
||||
*/
|
||||
uint16_t methodID() const
|
||||
virtual uint16_t methodID() const override
|
||||
{
|
||||
return 10;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue