diff --git a/include/channelhandler.h b/include/channelhandler.h index 480041b..ad0c001 100644 --- a/include/channelhandler.h +++ b/include/channelhandler.h @@ -29,7 +29,7 @@ public: /** * An error has occured on the channel - * The channel is no longer usable after an error has occured on it + * The channel is no longer usable after an error has occured on it. * @param channel the channel on which the error occured * @param message human readable error message */ diff --git a/include/channelimpl.h b/include/channelimpl.h index 13e7b47..9aaaf3f 100644 --- a/include/channelimpl.h +++ b/include/channelimpl.h @@ -27,9 +27,9 @@ private: /** * Pointer to the connection - * @var Connection + * @var ConnectionImpl */ - Connection *_connection; + ConnectionImpl *_connection; /** * The handler that is notified about events @@ -43,12 +43,19 @@ private: */ uint16_t _id; + /** + * Monitor to check if the connection is still alive + * @var Monitor + */ + Monitor _monitor; + /** * State of the channel object * @var enum */ enum { state_connected, + state_closing, state_closed } _state = state_connected; @@ -224,7 +231,9 @@ public: * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method * - * @todo implement to onReturned() method + * If the mandatory or immediate flag is set, and the message could not immediately + * be published, the message will be returned to the client, and will eventually + * end up in your ChannelHandler::onReturned() method. * * @param exchange the exchange to publish to * @param routingkey the routing key @@ -301,9 +310,9 @@ public: /** * Send a frame over the channel * @param frame frame to send - * @return size_t number of bytes sent + * @return bool was frame succesfully sent? */ - size_t send(const Frame &frame); + bool send(const Frame &frame); /** * Report to the handler that the channel is closed diff --git a/include/connectionhandler.h b/include/connectionhandler.h index ec3142a..5854149 100644 --- a/include/connectionhandler.h +++ b/include/connectionhandler.h @@ -40,18 +40,40 @@ public: * After this method is called, the connection no longer is in a valid * state and can no longer be used. * + * This method has an empty default implementation, although you are very + * much advised to do implement it. Because when an error occurs, the connection + * is no longer usable, so you probably want to know. + * * @param connection The connection that entered the error state * @param message Error message */ - virtual void onError(Connection *connection, const std::string &message) = 0; + virtual void onError(Connection *connection, const std::string &message) {} /** * Method that is called when the login attempt succeeded. After this method - * was called, the connection is ready to use + * was called, the connection is ready to use. This is the first method + * that is normally called after you've constructed the connection object. + * + * According to the AMQP protocol, you must wait for the connection to become + * ready (and this onConnected method to be called) before you can start + * using the Connection object. However, this AMQP library will cache all + * methods that you call before the connection is ready, so in reality there + * is no reason to wait. * * @param connection The connection that can now be used */ - virtual void onConnected(Connection *connection) = 0; + virtual void onConnected(Connection *connection) {} + + /** + * Method that is called when the connection was closed. + * + * This is the counter part of a call to Connection::close() and it confirms + * that the connection was correctly closed. + * + * @param connection The connection that was closed and that is now unusable + */ + virtual void onClosed(Connection *connection) {} + }; /** diff --git a/include/connectionimpl.h b/include/connectionimpl.h index 0d6e9a9..7d23f98 100644 --- a/include/connectionimpl.h +++ b/include/connectionimpl.h @@ -37,10 +37,10 @@ protected: * @var enum */ enum { - state_invalid, // object is in an invalid state state_protocol, // protocol headers are being passed state_handshake, // busy with the handshake to open the connection - state_connected, // connection is set up and ready for communication + state_connected, // connection is set up and ready for communication + state_closing, // connection is busy closing (we have sent the close frame) state_closed // connection is closed } _state = state_protocol; @@ -236,9 +236,9 @@ public: * This is an internal method that you normally do not have to call yourself * * @param frame the frame to send - * @return number of bytes sent + * @return bool */ - size_t send(const Frame &frame); + bool send(const Frame &frame); /** * Get a channel by its identifier @@ -262,17 +262,23 @@ public: */ void reportError(const std::string &message) { - // close everything - // @todo is this not duplicate? - close(); - // set connection state to closed _state = state_closed; // inform handler _handler->onError(_parent, message); - - // @Todo: notify all channels of closed connection + } + + /** + * Report that the connection is closed + */ + void reportClosed() + { + // change state + _state = state_closed; + + // inform the handler + _handler->onClosed(_parent); } /** @@ -280,7 +286,6 @@ public: */ friend class Connection; - }; /** diff --git a/include/field.h b/include/field.h index c03960a..41fbdca 100644 --- a/include/field.h +++ b/include/field.h @@ -55,8 +55,6 @@ public: * Get the type ID that is used to identify this type of * field in a field table * @return char - * - * @todo check if all derived classes use the 'override' keyword */ virtual char typeID() const = 0; diff --git a/include/stringfield.h b/include/stringfield.h index 44d987f..65cdf05 100644 --- a/include/stringfield.h +++ b/include/stringfield.h @@ -44,13 +44,8 @@ public: // get the size T size(frame); - // read data - const char *data = frame.nextData(size.value()); - - // @todo what if this fails? - // allocate string - _data = std::string((char*) data, (size_t) size.value()); + _data = std::string(frame.nextData(size.value()), (size_t) size.value()); } /** @@ -61,9 +56,6 @@ public: /** * Create a new instance of this object * @return Field* - * - * @todo can this be protected? - * @todo check if all clone methods have a override keyword */ virtual Field *clone() const override { diff --git a/src/basiccancelokframe.h b/src/basiccancelokframe.h index cbb99da..0841379 100644 --- a/src/basiccancelokframe.h +++ b/src/basiccancelokframe.h @@ -74,8 +74,6 @@ public: /** * Return the method ID * @return uint16_t - * - * @todo check if all other implementations use override keyword */ virtual uint16_t methodID() const override { diff --git a/src/basicqosframe.h b/src/basicqosframe.h index 6bf3049..8b9c924 100644 --- a/src/basicqosframe.h +++ b/src/basicqosframe.h @@ -82,17 +82,17 @@ public: virtual ~BasicQosFrame() {} /** - * Return the method ID - * @return uint16_t + * Return the method ID + * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 10; } /** - * Return the prefetch count - * @return int16_t + * Return the prefetch count + * @return int16_t */ int16_t prefetchCount() const { @@ -100,8 +100,8 @@ public: } /** - * returns the value of global - * @return boolean + * returns the value of global + * @return boolean */ bool global() const { diff --git a/src/channelcloseokframe.h b/src/channelcloseokframe.h index 8a17089..5eb5cda 100644 --- a/src/channelcloseokframe.h +++ b/src/channelcloseokframe.h @@ -51,6 +51,7 @@ public: /** * Method id + * @return uint16_t */ virtual uint16_t methodID() const override { @@ -68,7 +69,7 @@ public: ChannelImpl *channel = connection->channel(this->channel()); // channel does not exist - if(!channel) return false; + if (!channel) return false; // report that the channel is closed channel->reportClosed(); diff --git a/src/channelframe.h b/src/channelframe.h index cdd9d1b..436cf1c 100644 --- a/src/channelframe.h +++ b/src/channelframe.h @@ -38,8 +38,6 @@ public: /** * Class id * @return uint16_t - * - * @todo check if override keyword is used in all places */ virtual uint16_t classID() const override { diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 3b499f1..aaeee2c 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -50,11 +50,12 @@ namespace AMQP { */ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler *handler) : _parent(parent), - _connection(connection), - _handler(handler) + _connection(&connection->_implementation), + _handler(handler), + _monitor(_connection) { // add the channel to the connection - _id = connection->_implementation.add(this); + _id = _connection->add(this); // check if the id is valid if (_id == 0) @@ -84,17 +85,11 @@ ChannelImpl::~ChannelImpl() if (_message) delete _message; _message = nullptr; - // remove this channel from the connection - _connection->_implementation.remove(this); - - // leap out if already disconnected - if (!connected()) return; + // remove this channel from the connection (but not if the connection is already destructed) + if (_monitor.valid()) _connection->remove(this); // close the channel now - // @todo is this ok? close(); - - // do we have } /** @@ -110,14 +105,8 @@ ChannelImpl::~ChannelImpl() */ bool ChannelImpl::pause() { - // must be connected - if (!connected()) return false; - // send a flow frame - send(ChannelFlowFrame(_id, false)); - - // done - return true; + return send(ChannelFlowFrame(_id, false)); } /** @@ -127,14 +116,8 @@ bool ChannelImpl::pause() */ bool ChannelImpl::resume() { - // must be connected - if (!connected()) return false; - // send a flow frame - send(ChannelFlowFrame(_id, true)); - - // done - return true; + return send(ChannelFlowFrame(_id, true)); } /** @@ -143,14 +126,8 @@ bool ChannelImpl::resume() */ bool ChannelImpl::startTransaction() { - // must be connected - if (!connected()) return false; - // send a flow frame - send(TransactionSelectFrame(_id)); - - // done - return true; + return send(TransactionSelectFrame(_id)); } /** @@ -159,14 +136,8 @@ bool ChannelImpl::startTransaction() */ bool ChannelImpl::commitTransaction() { - // must be connected - if (!connected()) return false; - // send a flow frame - send(TransactionCommitFrame(_id)); - - // done - return true; + return send(TransactionCommitFrame(_id)); } /** @@ -175,14 +146,8 @@ bool ChannelImpl::commitTransaction() */ bool ChannelImpl::rollbackTransaction() { - // must be connected - if (!connected()) return false; - // send a flow frame - send(TransactionRollbackFrame(_id)); - - // done - return true; + return send(TransactionRollbackFrame(_id)); } /** @@ -191,14 +156,17 @@ bool ChannelImpl::rollbackTransaction() */ bool ChannelImpl::close() { - // must be connected - if (!connected()) return false; - - // send a flow frame - send(ChannelCloseFrame(_id)); + // channel could be dead after send operation, we need to monitor that + Monitor monitor(this); - // now it is closed - _state = state_closed; + // send a flow frame + if (!send(ChannelCloseFrame(_id))) return false; + + // leap out if channel was destructed + if (!monitor.valid()) return true; + + // now it is closing + _state = state_closing; // done return true; @@ -214,19 +182,15 @@ bool ChannelImpl::close() */ bool ChannelImpl::declareExchange(const std::string &name, ExchangeType type, int flags, const Table &arguments) { - // must be connected - if(!connected()) return false; - + // convert exchange type std::string exchangeType; - if(type == ExchangeType::fanout) exchangeType = "fanout"; - if(type == ExchangeType::direct) exchangeType = "direct"; - if(type == ExchangeType::topic) exchangeType = "topic"; - if(type == ExchangeType::headers)exchangeType = "headers"; + if (type == ExchangeType::fanout) exchangeType = "fanout"; + if (type == ExchangeType::direct) exchangeType = "direct"; + if (type == ExchangeType::topic) exchangeType = "topic"; + if (type == ExchangeType::headers)exchangeType = "headers"; + // send declare exchange frame - send(ExchangeDeclareFrame(_id, name, exchangeType, (flags & passive) != 0, (flags & durable) != 0, (flags & nowait) != 0, arguments)); - - // done - return true; + return send(ExchangeDeclareFrame(_id, name, exchangeType, flags & passive, flags & durable, flags & nowait, arguments)); } /** @@ -240,14 +204,8 @@ bool ChannelImpl::declareExchange(const std::string &name, ExchangeType type, in */ bool ChannelImpl::bindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { - // must be connected - if(!connected()) return false; - // send exchange bind frame - send(ExchangeBindFrame(_id, target, source, routingkey, (flags & nowait) != 0, arguments)); - - //done - return true; + return send(ExchangeBindFrame(_id, target, source, routingkey, flags & nowait, arguments)); } /** @@ -261,14 +219,8 @@ bool ChannelImpl::bindExchange(const std::string &source, const std::string &tar */ bool ChannelImpl::unbindExchange(const std::string &source, const std::string &target, const std::string &routingkey, int flags, const Table &arguments) { - // must be connected - if (!connected()) return false; - // send exchange unbind frame - send(ExchangeUnbindFrame(_id, target, source, routingkey, (flags & nowait) != 0, arguments)); - - // done - return true; + return send(ExchangeUnbindFrame(_id, target, source, routingkey, flags & nowait, arguments)); } /** @@ -279,14 +231,8 @@ bool ChannelImpl::unbindExchange(const std::string &source, const std::string &t */ bool ChannelImpl::removeExchange(const std::string &name, int flags) { - // must be connected - if (!connected()) return false; - // send delete exchange frame - send(ExchangeDeleteFrame(_id, name, (flags & ifunused) != 0, (flags & nowait) != 0)); - - // done - return true; + return send(ExchangeDeleteFrame(_id, name, flags & ifunused, flags & nowait)); } /** @@ -298,14 +244,8 @@ bool ChannelImpl::removeExchange(const std::string &name, int flags) */ bool ChannelImpl::declareQueue(const std::string &name, int flags, const Table &arguments) { - // must be connected - if (!connected()) return false; - // send the queuedeclareframe - send(QueueDeclareFrame(_id, name, (flags & passive) != 0, (flags & durable) != 0, (flags & durable) != 0, (flags & autodelete) != 0, (flags & nowait) != 0, arguments)); - - // done - return true; + return send(QueueDeclareFrame(_id, name, flags & passive, flags & durable, flags & durable, flags & autodelete, flags & nowait, arguments)); } /** @@ -319,14 +259,8 @@ bool ChannelImpl::declareQueue(const std::string &name, int flags, const Table & */ bool ChannelImpl::bindQueue(const std::string &exchangeName, const std::string &queueName, const std::string &routingkey, int flags, const Table &arguments) { - // must be connected - if(!connected()) return false; - // send the bind queue frame - send(QueueBindFrame(_id, queueName, exchangeName, routingkey, (flags & nowait) != 0, arguments)); - - // done - return true; + return send(QueueBindFrame(_id, queueName, exchangeName, routingkey, flags & nowait, arguments)); } /** @@ -339,14 +273,8 @@ bool ChannelImpl::bindQueue(const std::string &exchangeName, const std::string & */ bool ChannelImpl::unbindQueue(const std::string &exchange, const std::string &queue, const std::string &routingkey, const Table &arguments) { - // must be connected - if(!connected()) return false; - // send the unbind queue frame - send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments)); - - // done - return true; + return send(QueueUnbindFrame(_id, queue, exchange, routingkey, arguments)); } /** @@ -357,14 +285,8 @@ bool ChannelImpl::unbindQueue(const std::string &exchange, const std::string &qu */ bool ChannelImpl::purgeQueue(const std::string &name, int flags) { - // must be connected - if(!connected()) return false; - // send the queue purge frame - send(QueuePurgeFrame(_id, name, (flags & nowait) != 0)); - - // done - return true; + return send(QueuePurgeFrame(_id, name, flags & nowait)); } /** @@ -375,14 +297,8 @@ bool ChannelImpl::purgeQueue(const std::string &name, int flags) */ bool ChannelImpl::removeQueue(const std::string &name, int flags) { - // must be connected - if(!connected()) return false; - // send the remove queue frame - send(QueueDeleteFrame(_id, name, (flags & ifunused) != 0,(flags & ifempty) != 0,(flags & nowait) != 0)); - - // done - return true; + return send(QueueDeleteFrame(_id, name, flags & ifunused, flags & ifempty, flags & nowait)); } /** @@ -393,8 +309,6 @@ bool ChannelImpl::removeQueue(const std::string &name, int flags) * - mandatory if set, an unroutable message will be reported to the channel handler with the onReturned method * - immediate if set, a message that could not immediately be consumed is returned to the onReturned method * - * @todo implement to onReturned() method - * * @param exchange the exchange to publish to * @param routingkey the routing key * @param flags optional flags (see above) @@ -404,18 +318,26 @@ bool ChannelImpl::removeQueue(const std::string &name, int flags) */ bool ChannelImpl::publish(const std::string &exchange, const std::string &routingKey, int flags, const Envelope &envelope) { - // @todo prevent crash when connection is destructed + // we are going to send out multiple frames, each one will trigger a call to the handler, + // which in turn could destruct the channel object, we need to monitor that + Monitor monitor(this); // @todo do not copy the entire buffer to individual frames // send the publish frame - send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate)); + if (!send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate))) return false; + + // channel still valid? + if (!monitor.valid()) return false; // send header - send(BasicHeaderFrame(_id, envelope)); + if (!send(BasicHeaderFrame(_id, envelope))) return false; + + // channel and connection still valid? + if (!monitor.valid() || !_monitor.valid()) return false; // the max payload size is the max frame size minus the bytes for headers and trailer - uint32_t maxpayload = _connection->_implementation.maxPayload(); + uint32_t maxpayload = _connection->maxPayload(); uint32_t bytessent = 0; // the buffer @@ -429,7 +351,10 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin uint32_t chunksize = std::min(maxpayload, bytesleft); // send out a body frame - send(BodyFrame(_id, data + bytessent, chunksize)); + if (!send(BodyFrame(_id, data + bytessent, chunksize))) return false; + + // channel still valid? + if (!monitor.valid()) return false; // update counters bytessent += chunksize; @@ -448,10 +373,7 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin bool ChannelImpl::setQos(uint16_t prefetchCount) { // send a qos frame - send(BasicQosFrame(_id, prefetchCount, false)); - - // done - return true; + return send(BasicQosFrame(_id, prefetchCount, false)); } /** @@ -465,10 +387,7 @@ bool ChannelImpl::setQos(uint16_t prefetchCount) bool ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { // send a consume frame - send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments)); - - // done - return true; + return send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments)); } /** @@ -479,10 +398,7 @@ bool ChannelImpl::consume(const std::string &queue, const std::string &tag, int bool ChannelImpl::cancel(const std::string &tag, int flags) { // send a cancel frame - send(BasicCancelFrame(_id, tag, flags & nowait)); - - // done - return true; + return send(BasicCancelFrame(_id, tag, flags & nowait)); } /** @@ -494,10 +410,7 @@ bool ChannelImpl::cancel(const std::string &tag, int flags) bool ChannelImpl::ack(uint64_t deliveryTag, int flags) { // send an ack frame - send(BasicAckFrame(_id, deliveryTag, flags & multiple)); - - // done - return true; + return send(BasicAckFrame(_id, deliveryTag, flags & multiple)); } /** @@ -509,10 +422,7 @@ bool ChannelImpl::ack(uint64_t deliveryTag, int flags) bool ChannelImpl::reject(uint64_t deliveryTag, int flags) { // send a nack frame - send(BasicNackFrame(_id, deliveryTag, flags & multiple, flags & requeue)); - - // done - return true; + return send(BasicNackFrame(_id, deliveryTag, flags & multiple, flags & requeue)); } /** @@ -523,21 +433,21 @@ bool ChannelImpl::reject(uint64_t deliveryTag, int flags) bool ChannelImpl::recover(int flags) { // send a nack frame - send(BasicRecoverFrame(_id, flags & requeue)); - - // done - return true; + return send(BasicRecoverFrame(_id, flags & requeue)); } /** * Send a frame over the channel * @param frame frame to send - * @return size_t number of bytes sent + * @return bool was the frame sent? */ -size_t ChannelImpl::send(const Frame &frame) +bool ChannelImpl::send(const Frame &frame) { - // send to tcp connection - return _connection->_implementation.send(frame); + // skip if channel is not connected + if (_state != state_connected) return false; + + // send to tcp connection (first check if connection object was not destructed) + return _monitor.valid() && _connection->send(frame); } /** diff --git a/src/connectioncloseframe.h b/src/connectioncloseframe.h index 465a0c0..6bff24a 100644 --- a/src/connectioncloseframe.h +++ b/src/connectioncloseframe.h @@ -98,7 +98,7 @@ public: * Method id * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 50; } @@ -146,11 +146,15 @@ public: */ virtual bool process(ConnectionImpl *connection) override { - // @todo connection could be destructed after frame was sent + // we need the monitor because the connection could be destructed in the meantime + Monitor monitor(connection); // send back the ok frame connection->send(ConnectionCloseOKFrame()); + // check if connection still exists + if (!monitor.valid()) return false; + // no need to check for a channel, the error is connection wide // report the error on the connection connection->reportError(text()); diff --git a/src/connectioncloseokframe.h b/src/connectioncloseokframe.h index c05d470..1e6108b 100644 --- a/src/connectioncloseokframe.h +++ b/src/connectioncloseokframe.h @@ -54,6 +54,19 @@ public: { return 51; } + + /** + * Process the frame + * @param connection + */ + virtual bool process(ConnectionImpl *connection) override + { + // report that it is closed + connection->reportClosed(); + + // done + return true; + } }; /** diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index ad58663..8b99873 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -7,6 +7,8 @@ */ #include "includes.h" #include "protocolheaderframe.h" +#include "connectioncloseokframe.h" +#include "connectioncloseframe.h" /** * set namespace @@ -29,11 +31,8 @@ namespace AMQP { ConnectionImpl::ConnectionImpl(Connection *parent, ConnectionHandler *handler, const Login &login, const std::string &vhost) : _parent(parent), _handler(handler), _login(login), _vhost(vhost) { - // we need a protocol header - ProtocolHeaderFrame header; - - // send out the protocol header - send(header); + // we need to send a protocol header + send(ProtocolHeaderFrame()); } /** @@ -41,10 +40,7 @@ ConnectionImpl::ConnectionImpl(Connection *parent, ConnectionHandler *handler, c */ ConnectionImpl::~ConnectionImpl() { - // still connected - if (_state == state_invalid) return; - - // still in a connected state - should we send the close frame? + // close the connection in a nice fashion close(); } @@ -107,7 +103,8 @@ void ConnectionImpl::remove(ChannelImpl *channel) */ size_t ConnectionImpl::parse(char *buffer, size_t size) { - // @todo do not parse if already in an error state + // do not parse if already in an error state + if (_state == state_closed) return 0; // number of bytes processed size_t processed = 0; @@ -156,18 +153,30 @@ size_t ConnectionImpl::parse(char *buffer, size_t size) */ bool ConnectionImpl::close() { - // leap out if not yet connected - if (_state != state_connected) return false; + // leap out if already closed or closing + if (_state == state_closed || _state == state_closing) return false; + + // after the send operation the object could be dead + Monitor monitor(this); // loop over all channels for (auto iter = _channels.begin(); iter != _channels.end(); iter++) { // close the channel iter->second->close(); + + // we could be dead now + if (!monitor.valid()) return true; } + // send the close frame + if (!send(ConnectionCloseFrame(0, "shutdown"))) return false; + + // leap out if object no longer is alive + if (!monitor.valid()) return true; + // we're in a new state - _state = state_invalid; + _state = state_closing; // done return true; @@ -212,10 +221,13 @@ void ConnectionImpl::setConnected() /** * Send a frame over the connection * @param frame The frame to send - * @return size_t Number of bytes sent + * @return bool Was the frame succesfully sent */ -size_t ConnectionImpl::send(const Frame &frame) +bool ConnectionImpl::send(const Frame &frame) { + // its not possible to send anything if closed or closing down + if (_state == state_closing || _state == state_closed) return false; + // we need an output buffer OutBuffer buffer(frame.totalSize()); @@ -226,19 +238,19 @@ size_t ConnectionImpl::send(const Frame &frame) if (frame.needsSeparator()) buffer.add((uint8_t)206); // are we still setting up the connection? - if ((_state == state_protocol || _state == state_handshake) && !frame.partOfHandshake()) - { - // the connection is still being set up, so we need to delay the message sending - _queue.push(std::move(buffer)); - } - else + if (_state == state_connected || frame.partOfHandshake()) { // send the buffer _handler->onData(_parent, buffer.data(), buffer.size()); } + else + { + // the connection is still being set up, so we need to delay the message sending + _queue.push(std::move(buffer)); + } // done - return buffer.size(); + return true; } /** diff --git a/src/connectionopenframe.h b/src/connectionopenframe.h index 60f1708..ca78dd6 100644 --- a/src/connectionopenframe.h +++ b/src/connectionopenframe.h @@ -85,8 +85,9 @@ public: /** * Method id + * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 40; } diff --git a/src/connectionstartframe.h b/src/connectionstartframe.h index d462531..806bbf0 100644 --- a/src/connectionstartframe.h +++ b/src/connectionstartframe.h @@ -173,18 +173,15 @@ public: */ virtual bool process(ConnectionImpl *connection) override { - // @todo we must still be in protocol handshake mode - - // the peer properties Table properties; // fill the peer properties properties["product"] = "Copernica AMQP library"; - properties["version"] = "0.1"; - properties["platform"] = "Ubuntu"; + properties["version"] = "Unknown"; + properties["platform"] = "Unknown"; properties["copyright"] = "Copyright 2014 Copernica BV"; - properties["information"] = ""; + properties["information"] = "http://www.copernica.com"; // move connection to handshake mode connection->setProtocolOk(); diff --git a/src/connectionstartokframe.h b/src/connectionstartokframe.h index 54fb98f..5ca994e 100644 --- a/src/connectionstartokframe.h +++ b/src/connectionstartokframe.h @@ -97,7 +97,7 @@ public: /** * Method id */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 11; } diff --git a/src/connectiontuneframe.h b/src/connectiontuneframe.h index 4cdb535..eac4b72 100644 --- a/src/connectiontuneframe.h +++ b/src/connectiontuneframe.h @@ -131,9 +131,6 @@ public: */ virtual bool process(ConnectionImpl *connection) override { - // @todo this is only allowed when the connection is set up - - // remember this in the connection connection->setCapacity(channelMax(), frameMax()); @@ -147,10 +144,7 @@ public: if (!monitor.valid()) return true; // and finally we start to open the frame - connection->send(ConnectionOpenFrame(connection->vhost())); - - // done - return true; + return connection->send(ConnectionOpenFrame(connection->vhost())); } }; diff --git a/src/connectiontuneokframe.h b/src/connectiontuneokframe.h index 929f32d..8ba66ea 100644 --- a/src/connectiontuneokframe.h +++ b/src/connectiontuneokframe.h @@ -85,7 +85,7 @@ public: * Method id * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 31; } diff --git a/src/exchangedeclareframe.h b/src/exchangedeclareframe.h index 0926f29..7d6fc2e 100644 --- a/src/exchangedeclareframe.h +++ b/src/exchangedeclareframe.h @@ -111,7 +111,7 @@ public: * Method id * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 10; } @@ -120,7 +120,7 @@ public: * The exchange name * @return string */ - const std::string& name() + const std::string& name() const { return _name; } @@ -129,7 +129,7 @@ public: * The exchange type * @return string */ - const std::string& type() + const std::string &exchangeType() const { return _type; } @@ -138,7 +138,7 @@ public: * Passive declaration, do not create exchange if it does not exist * @return bool */ - bool passive() + bool passive() const { return _bools.get(0); } @@ -147,7 +147,7 @@ public: * Durable exchange * @return bool */ - bool durable() + bool durable() const { return _bools.get(1); } @@ -156,7 +156,7 @@ public: * Do not wait for a response * @return bool */ - bool noWait() + bool noWait() const { return _bools.get(4); } @@ -165,7 +165,7 @@ public: * Additional arguments. Implementation dependent. * @return Table */ - const Table& arguments() + const Table& arguments() const { return _arguments; } diff --git a/src/exchangedeleteframe.h b/src/exchangedeleteframe.h index dd765e7..3077f41 100644 --- a/src/exchangedeleteframe.h +++ b/src/exchangedeleteframe.h @@ -88,7 +88,7 @@ public: * returns the method id * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 20; } @@ -97,7 +97,7 @@ public: * returns the exchange name * @return string */ - const std::string& name() + const std::string& name() const { return _name; } @@ -106,7 +106,7 @@ public: * returns whether to delete if unused * @return bool */ - bool ifUnused() + bool ifUnused() const { return _bools.get(0); } @@ -115,7 +115,7 @@ public: * returns whether to wait for a response * @return bool */ - bool noWait() + bool noWait() const { return _bools.get(1); } diff --git a/src/queuebindframe.h b/src/queuebindframe.h index e7589c3..9a53897 100644 --- a/src/queuebindframe.h +++ b/src/queuebindframe.h @@ -115,7 +115,7 @@ public: * Returns the method id * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 20; } diff --git a/src/queuedeclareframe.h b/src/queuedeclareframe.h index fa7e1cc..bc85f71 100644 --- a/src/queuedeclareframe.h +++ b/src/queuedeclareframe.h @@ -103,7 +103,7 @@ public: * returns the method id * @return string */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 10; } @@ -112,7 +112,7 @@ public: * return the queue name * @return string */ - const std::string& name() + const std::string& name() const { return _name; } @@ -121,7 +121,7 @@ public: * returns value of passive declaration, do not create queue if it does not exist * @return bool */ - bool passive() + bool passive() const { return _bools.get(0); } @@ -130,7 +130,7 @@ public: * returns whether the queue is durable * @return bool */ - bool durable() + bool durable() const { return _bools.get(1); } @@ -139,7 +139,7 @@ public: * returns whether the queue is exclusive * @return bool */ - bool exclusive() + bool exclusive() const { return _bools.get(2); } @@ -148,7 +148,7 @@ public: * returns whether the queue is deleted if unused * @return bool */ - bool autoDelete() + bool autoDelete() const { return _bools.get(3); } @@ -166,7 +166,7 @@ public: * returns additional arguments. Implementation dependant. * @return Table */ - const Table& arguments() + const Table& arguments() const { return _arguments; } diff --git a/src/queuedeleteframe.h b/src/queuedeleteframe.h index 5e6843e..9f5a34e 100644 --- a/src/queuedeleteframe.h +++ b/src/queuedeleteframe.h @@ -90,7 +90,7 @@ public: * returns the method id * @returns uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 40; } diff --git a/src/queuepurgeframe.h b/src/queuepurgeframe.h index 69d2412..d9c92cf 100644 --- a/src/queuepurgeframe.h +++ b/src/queuepurgeframe.h @@ -86,7 +86,7 @@ public: * The method ID * @return method id */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 30; } diff --git a/src/queueunbindframe.h b/src/queueunbindframe.h index a21d66e..e969693 100644 --- a/src/queueunbindframe.h +++ b/src/queueunbindframe.h @@ -106,7 +106,7 @@ public: * returns the method id * @returns uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 50; } @@ -115,7 +115,7 @@ public: * returns the queue name * @returns string */ - const std::string& name() + const std::string& name() const { return _name; } @@ -124,7 +124,7 @@ public: * returns the exchange name * @returns string */ - const std::string& exchange() + const std::string& exchange() const { return _exchange; } @@ -133,7 +133,7 @@ public: * returns the routingKey * @returns string */ - const std::string& routingKey() + const std::string& routingKey() const { return _routingKey; } @@ -142,7 +142,7 @@ public: * returns the additional arguments * @returns Table */ - const Table& arguments() + const Table& arguments() const { return _arguments; } diff --git a/src/transactioncommitframe.h b/src/transactioncommitframe.h index 77569ff..81f11bc 100644 --- a/src/transactioncommitframe.h +++ b/src/transactioncommitframe.h @@ -14,18 +14,6 @@ namespace AMQP { */ class TransactionCommitFrame : public TransactionFrame { -protected: -/** - * Encode a frame on a string buffer - * - * @param buffer buffer to write frame to - */ -virtual void fill(OutBuffer& buffer) const override -{ - // call base - TransactionFrame::fill(buffer); -} - public: /** * Destructor @@ -51,10 +39,10 @@ public: {} /** - * return the method id - * @return uint16_t + * return the method id + * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 20; } diff --git a/src/transactionrollbackframe.h b/src/transactionrollbackframe.h index e41fb0b..ce3d0b0 100644 --- a/src/transactionrollbackframe.h +++ b/src/transactionrollbackframe.h @@ -14,18 +14,6 @@ namespace AMQP { */ class TransactionRollbackFrame : public TransactionFrame { -protected: - /** - * Encode a frame on a string buffer - * - * @param buffer buffer to write frame to - */ - virtual void fill(OutBuffer& buffer) const override - { - // call base - TransactionFrame::fill(buffer); - } - public: /** * Destructor @@ -33,29 +21,29 @@ public: virtual ~TransactionRollbackFrame() {} /** - * Decode a transaction rollback frame from a received frame + * Decode a transaction rollback frame from a received frame * - * @param frame received frame to decode + * @param frame received frame to decode */ TransactionRollbackFrame(ReceivedFrame& frame) : TransactionFrame(frame) {} /** - * Construct a transaction rollback frame + * Construct a transaction rollback frame * - * @param channel channel identifier - * @return newly created transaction rollback frame + * @param channel channel identifier + * @return newly created transaction rollback frame */ TransactionRollbackFrame(uint16_t channel) : TransactionFrame(channel, 0) {} /** - * return the method id - * @return uint16_t + * return the method id + * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 30; } diff --git a/src/transactionselectframe.h b/src/transactionselectframe.h index ef54894..d678313 100644 --- a/src/transactionselectframe.h +++ b/src/transactionselectframe.h @@ -14,33 +14,21 @@ namespace AMQP { */ class TransactionSelectFrame : public TransactionFrame { -protected: - /** - * Encode a frame on a string buffer - * - * @param buffer buffer to write frame to - */ - virtual void fill(OutBuffer& buffer) const override - { - // call base - TransactionFrame::fill(buffer); - } - public: /** - * Decode a transaction select frame from a received frame + * Decode a transaction select frame from a received frame * - * @param frame received frame to decode + * @param frame received frame to decode */ TransactionSelectFrame(ReceivedFrame& frame) : TransactionFrame(frame) {} /** - * Construct a transaction select frame + * Construct a transaction select frame * - * @param channel channel identifier - * @return newly created transaction select frame + * @param channel channel identifier + * @return newly created transaction select frame */ TransactionSelectFrame(uint16_t channel) : TransactionFrame(channel, 0) @@ -55,7 +43,7 @@ public: * return the method id * @return uint16_t */ - uint16_t methodID() const + virtual uint16_t methodID() const override { return 10; }