#pragma once /** * Connection implementation * * This is the implementation of the connection - a class that can only be * constructed by the connection class itselves and that has all sorts of * methods that are only useful inside the library * * @copyright 2014 Copernica BV */ /** * Set up namespace */ namespace AMQP { /** * Class definition */ class ConnectionImpl : public Watchable { protected: /** * The parent connection object * @var Connection */ Connection *_parent; /** * The connection handler * @var ConnectionHandler */ ConnectionHandler *_handler; /** * State of the connection * The current state is the last frame sent to the server * @var enum */ enum { state_protocol, // protocol headers are being passed state_handshake, // busy with the handshake to open the connection state_connected, // connection is set up and ready for communication state_closing, // connection is busy closing (we have sent the close frame) state_closed // connection is closed } _state = state_protocol; /** * Has the close() method been called? * @var bool */ bool _closed = false; /** * All channels that are active * @var std::unordered_map> */ std::unordered_map> _channels; /** * The last unused channel ID * @var uint16_t */ uint16_t _nextFreeChannel = 1; /** * Max number of channels (0 for unlimited) * @var uint16_t */ uint16_t _maxChannels = 0; /** * Max frame size * @var uint32_t */ uint32_t _maxFrame = 10000; /** * The login for the server (login, password) * @var Login */ Login _login; /** * Vhost to connect to * @var string */ std::string _vhost; /** * Queued messages that should be sent after the connection has been established * @var queue */ std::queue _queue; /** * Helper method to send the close frame * Return value tells if the connection is still valid * @return bool */ bool sendClose(); /** * Is any channel waiting for an answer on a synchronous call? * @return bool */ bool waitingChannels() const; /** * Is the channel waiting for a response from the peer (server) * @return bool */ bool waiting() const; private: /** * Construct an AMQP object based on full login data * * The first parameter is a handler object. This handler class is * an interface that should be implemented by the caller. * * Note that the constructor is private to ensure that nobody can construct * this class, only the real Connection class via a friend construct * * @param parent Parent connection object * @param handler Connection handler * @param login Login data */ ConnectionImpl(Connection *parent, ConnectionHandler *handler, const Login &login, const std::string &vhost); public: /** * Copy'ing connections is impossible * @param connection */ ConnectionImpl(const ConnectionImpl &connection) = delete; /** * Destructor */ virtual ~ConnectionImpl(); /** * No assignments of other connections * @param connection * @return ConnectionImpl */ ConnectionImpl &operator=(const ConnectionImpl &connection) = delete; /** * What is the state of the connection - is the protocol handshake completed? * @return bool */ bool protocolOk() const { // must be busy doing the connection handshake, or already connected return _state == state_handshake || _state == state_connected; } /** * Mark the protocol as being ok */ void setProtocolOk() { // move on to handshake state if (_state == state_protocol) _state = state_handshake; } /** * Are we fully connected? * @return bool */ bool connected() const { // state must be connected return _state == state_connected; } /** * Mark the connection as connected */ void setConnected(); /** * Retrieve the login data * @return Login */ const Login &login() const { return _login; } /** * Retrieve the vhost * @return string */ const std::string &vhost() const { return _vhost; } /** * Store the max number of channels and max number of frames * @param channels max number of channels * @param size max frame size */ void setCapacity(uint16_t channels, uint32_t size) { _maxChannels = channels; _maxFrame = size; } /** * The max frame size * @return uint32_t */ uint32_t maxFrame() const { return _maxFrame; } /** * The max payload size for body frames * @return uint32_t */ uint32_t maxPayload() const { // 8 bytes for header and end-of-frame byte return _maxFrame - 8; } /** * Add a channel to the connection, and return the channel ID that it * is allowed to use, or 0 when no more ID's are available * @param channel * @return uint16_t */ uint16_t add(const std::shared_ptr &channel); /** * Remove a channel * @param channel */ void remove(const ChannelImpl *channel); /** * Parse the buffer into a recognized frame * * Every time that data comes in on the connection, you should call this method to parse * the incoming data, and let it handle by the AMQP library. This method returns the number * of bytes that were processed. * * If not all bytes could be processed because it only contained a partial frame, you should * call this same method later on when more data is available. The AMQP library does not do * any buffering, so it is up to the caller to ensure that the old data is also passed in that * later call. * * @param buffer buffer to decode * @return number of bytes that were processed */ uint64_t parse(const Buffer &buffer); /** * Close the connection * This will close all channels * @return bool */ bool close(); /** * Send a frame over the connection * * This is an internal method that you normally do not have to call yourself * * @param frame the frame to send * @return bool */ bool send(const Frame &frame); /** * Send buffered data over the connection * * @param buffer the buffer with data to send */ bool send(OutBuffer &&buffer); /** * Get a channel by its identifier * * This method only works if you had already created the channel before. * This is an internal method that you will not need if you cache the channel * object. * * @param number channel identifier * @return channel the channel object, or nullptr if not yet created */ std::shared_ptr channel(int number) { auto iter = _channels.find(number); return iter == _channels.end() ? nullptr : iter->second; } /** * Report an error message * @param message */ void reportError(const char *message) { // set connection state to closed _state = state_closed; // monitor because every callback could invalidate the connection Monitor monitor(this); // all deferred result objects in the channels should report this error too while (!_channels.empty()) { // report the errors _channels.begin()->second->reportError(message, false); // leap out if no longer valid if (!monitor.valid()) return; } // inform handler _handler->onError(_parent, message); } /** * Report that the connection is closed */ void reportClosed() { // change state _state = state_closed; // inform the handler _handler->onClosed(_parent); } /** * Retrieve the amount of channels this connection has * @return std::size_t */ std::size_t channels() const { return _channels.size(); } /** * The actual connection is a friend and can construct this class */ friend class Connection; friend class ChannelImpl; }; /** * End of namespace */ }