From f3955bcd5195b1ba35dffe38856dd99bf8dca9e8 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Sat, 31 Oct 2015 18:26:04 +0100 Subject: [PATCH] implemented TcpConnection, so that users of the library do not have to setup their own tcp connections --- include/flags.h | 2 + include/tcpconnection.h | 99 +++++++++++ include/tcphandler.h | 74 ++++++++ src/addressinfo.h | 102 +++++++++++ src/flags.cpp | 6 + src/includes.h | 9 +- src/pipe.h | 85 +++++++++ src/tcpbuffer.h | 373 ++++++++++++++++++++++++++++++++++++++++ src/tcpclosed.h | 51 ++++++ src/tcpconnected.h | 167 ++++++++++++++++++ src/tcpconnection.cpp | 122 +++++++++++++ src/tcpresolver.h | 197 +++++++++++++++++++++ src/tcpstate.h | 115 +++++++++++++ 13 files changed, 1401 insertions(+), 1 deletion(-) create mode 100644 include/tcpconnection.h create mode 100644 include/tcphandler.h create mode 100644 src/addressinfo.h create mode 100644 src/pipe.h create mode 100644 src/tcpbuffer.h create mode 100644 src/tcpclosed.h create mode 100644 src/tcpconnected.h create mode 100644 src/tcpconnection.cpp create mode 100644 src/tcpresolver.h create mode 100644 src/tcpstate.h diff --git a/include/flags.h b/include/flags.h index c2de72f..644fbc8 100644 --- a/include/flags.h +++ b/include/flags.h @@ -31,6 +31,8 @@ extern const int immediate; extern const int redelivered; extern const int multiple; extern const int requeue; +extern const int readable; +extern const int writable; /** * End of namespace diff --git a/include/tcpconnection.h b/include/tcpconnection.h new file mode 100644 index 0000000..bd063b5 --- /dev/null +++ b/include/tcpconnection.h @@ -0,0 +1,99 @@ +/** + * TcpConnection.h + * + * Extended Connection object that creates a TCP connection for the + * IO between the client application and the RabbitMQ server. + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Forward declarations + */ +class TcpState; + +/** + * Class definition + */ +class TcpConnection : public Connection, private ConnectionHandler +{ +private: + /** + * The state of the TCP connection - this state objecs changes based on + * the state of the connection (resolving, connected or closed) + * + * @var TcpState + */ + TcpState *_state = nullptr; + + + /** + * Method that is called by the connection when data needs to be sent over the network + * @param connection The connection that created this output + * @param buffer Data to send + * @param size Size of the buffer + */ + virtual void onData(Connection *connection, const char *buffer, size_t size) override; + + /** + * Method called when the connection ends up in an error state + * @param connection The connection that entered the error state + * @param message Error message + */ + virtual void onError(Connection *connection, const char *message) override; + + /** + * Method that is called when the connection is established + * @param connection The connection that can now be used + */ + virtual void onConnected(Connection *connection) override; + + /** + * Method that is called when the connection was closed. + * @param connection The connection that was closed and that is now unusable + */ + virtual void onClosed(Connection *connection) override; + + +public: + /** + * Constructor + * @param handler User implemented handler object + * @param hostname The address to connect to + */ + TcpConnection(TcpHandler *handler, const Address &address); + + /** + * Destructor + */ + virtual ~TcpConnection(); + + /** + * Process the TCP connection + * + * This method should be called when the filedescriptor that is registered + * in the event loop becomes active. You should pass in a flag holding the + * flags AMQP::readable or AMQP::writable to indicate whether the descriptor + * was readable or writable, or bitwise-or if it was both + * + * @param fd The filedescriptor that became readable or writable + * @param events What sort of events occured? + */ + void process(int fd, int flags); +}; + +/** + * End of namespace + */ +} diff --git a/include/tcphandler.h b/include/tcphandler.h new file mode 100644 index 0000000..7d31379 --- /dev/null +++ b/include/tcphandler.h @@ -0,0 +1,74 @@ +/** + * TcpHandler.h + * + * Interface to be implemented by the caller of the AMQP library in case + * the "Tcp" class is being used as alternative for the ConnectionHandler + * class. + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Forward declarations + */ +class TcpConnection; + +/** + * Class definition + */ +class TcpHandler +{ +public: + /** + * Method that is called when the TCP connection ends up in a connected state + * @param connection The TCP connection + */ + virtual void onConnected(TcpConnection *connection) {} + + /** + * Method that is called when the TCP connection ends up in an error state + * @param connection The TCP connection + * @param message Error message + */ + virtual void onError(TcpConnection *connection, const char *message) {} + + /** + * Method that is called when the TCP connection is closed + * @param connection The TCP connection + */ + virtual void onClosed(TcpConnection *connection) {} + + /** + * Monitor a filedescriptor for readability or writability + * + * When a TCP connection is opened, it creates a non-blocking socket + * connection. This method is called to inform you about this socket, + * so that you can include it in the event loop. When the socket becomes + * active, you should call the "process()" method in the Tcp class. + * + * The flags is AMQP::readable if the filedescriptor should be monitored + * for readability, AMQP::writable if it is to be monitored for writability, + * or AMQP::readable | AMQP::writable if it has to be checked for both. + * If flags has value 0, the filedescriptor should be removed from the + * event loop. + * + * @param connection The TCP connection object that is reporting + * @param fd The filedescriptor to be monitored + * @param flags Should the object be monitored for readability or writability? + */ + virtual void monitor(TcpConnection *connection, int fd, int flags) = 0; + + +}; + +/** + * End of namespace + */ +} + diff --git a/src/addressinfo.h b/src/addressinfo.h new file mode 100644 index 0000000..983bb99 --- /dev/null +++ b/src/addressinfo.h @@ -0,0 +1,102 @@ +/** + * AddressInfo.h + * + * Utility wrapper arround "getAddressInfo()" + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Include guard + */ +namespace AMQP { + +/** + * Class definition + */ +class AddressInfo +{ +private: + /** + * The addresses + * @var struct AddressInfo + */ + struct addrinfo *_info = nullptr; + + /** + * Vector of addrinfo pointers + * @var std::vector + */ + std::vector _v; + +public: + /** + * Constructor + * @param hostname + * @param port + */ + AddressInfo(const char *hostname, uint16_t port = 5672) + { + // store portnumber in buffer + auto portnumber = std::to_string(port); + + // info about the lookup + struct addrinfo hints; + + // set everything to zero + memset(&hints, 0, sizeof(struct AddressInfo)); + + // set hints + hints.ai_family = AF_UNSPEC; // allow IPv4 or IPv6 + hints.ai_socktype = SOCK_STREAM; // datagram socket/ + + // get address of the server + auto code = getaddrinfo(hostname, portnumber.data(), &hints, &_info); + + // was there an error + if (code != 0) throw std::runtime_error(gai_strerror(code)); + + // keep looping + for (auto *current = _info; current; current = current->ai_next) + { + // store in vector + _v.push_back(current); + } + } + + /** + * Destructor + */ + virtual ~AddressInfo() + { + // free address info + freeaddrinfo(_info); + } + + /** + * Size of the array + * @return size_t + */ + size_t size() const + { + return _v.size(); + } + + /** + * Get reference to struct + * @param index + * @return struct addrinfo* + */ + const struct addrinfo *operator[](int index) const + { + // expose vector + return _v[index]; + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/flags.cpp b/src/flags.cpp index cce926d..946b51c 100644 --- a/src/flags.cpp +++ b/src/flags.cpp @@ -33,6 +33,12 @@ const int redelivered = 0x2000; const int multiple = 0x4000; const int requeue = 0x8000; +/** + * Flags for event loops + */ +const int readable = 0x1; +const int writable = 0x2; + /** * End of namespace */ diff --git a/src/includes.h b/src/includes.h index 866eaba..5f3f53d 100644 --- a/src/includes.h +++ b/src/includes.h @@ -21,6 +21,10 @@ #include #include #include +#include +#include +#include +#include // forward declarations #include "../include/classes.h" @@ -62,9 +66,12 @@ #include "../include/channelimpl.h" #include "../include/channel.h" #include "../include/login.h" +#include "../include/address.h" #include "../include/connectionhandler.h" #include "../include/connectionimpl.h" #include "../include/connection.h" +#include "../include/tcphandler.h" +#include "../include/tcpconnection.h" // classes that are very commonly used #include "exception.h" @@ -79,6 +86,6 @@ #include "queueframe.h" #include "basicframe.h" #include "transactionframe.h" - +#include "addressinfo.h" diff --git a/src/pipe.h b/src/pipe.h new file mode 100644 index 0000000..11168d9 --- /dev/null +++ b/src/pipe.h @@ -0,0 +1,85 @@ +/** + * Pipe.h + * + * Pipe of two filedescriptors, used to communicate between master and child thread + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include +#include + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class Pipe +{ +private: + /** + * The two filedescriptors that make up the pipe + * @var int[] + */ + int _fds[2]; + +public: + /** + * Constructor + */ + Pipe() + { + // construct the pipe + if (pipe2(_fds, O_CLOEXEC) == 0) return; + + // something went wrong + throw std::runtime_error(strerror(errno)); + } + + /** + * Destructor + */ + virtual ~Pipe() + { + // close the two filedescriptors + close(_fds[0]); + close(_fds[1]); + } + + /** + * Expose the filedescriptors + * @return int + */ + int in() const { return _fds[0]; } + int out() const { return _fds[1]; } + + /** + * Notify the pipe, so that the other thread wakes up + */ + void notify() + { + // one byte to send + char byte = 0; + + // send one byte over the pipe - this will wake up the other thread + write(_fds[1], &byte, 1); + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/tcpbuffer.h b/src/tcpbuffer.h new file mode 100644 index 0000000..48cf70b --- /dev/null +++ b/src/tcpbuffer.h @@ -0,0 +1,373 @@ +/** + * TcpBuffer.h + * + * When data could not be sent out immediately, it is buffered in a temporary + * output buffer. This is the implementation of that buffer + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpBuffer : public Buffer +{ +private: + /** + * All output buffers + * @var std::deque + */ + mutable std::deque> _buffers; + + /** + * Number of bytes in first buffer that is no longer in use + * @var size_t + */ + size_t _skip = 0; + + /** + * Total number of bytes in the buffer + * @var size_t + */ + size_t _size = 0; + +public: + /** + * Regular constructor + */ + TcpBuffer() {} + + /** + * No copy'ing allowed + * @param that + */ + TcpBuffer(const TcpBuffer &that) = delete; + + /** + * Move operator + * @param that + */ + TcpBuffer(TcpBuffer &&that) : + _buffers(std::move(that._buffers)), + _skip(that._skip), + _size(that._size) + { + // reset other object + that._skip = 0; + that._size = 0; + } + + /** + * Does the buffer exist (is it non-empty) + * @return bool + */ + operator bool () const + { + // there must be a size + return _size > 0; + } + + /** + * Is the buffer empty + * @return bool + */ + bool operator!() const + { + // size should be zero + return _size == 0; + } + + /** + * Total size of the buffer + * @return size_t + */ + virtual size_t size() const override + { + // this simply is a member + return _size; + } + + /** + * Get access to a single byte + * + * No safety checks are necessary: this method will only be called + * for bytes that actually exist + * + * @param pos position in the buffer + * @return char value of the byte in the buffer + */ + virtual char byte(size_t pos) const override + { + // incorporate the skipped bytes + pos += _skip; + + // iterate over the parts + for (const auto &buffer : _buffers) + { + // is the byte within this buffer? + if (buffer.size() > pos) return buffer[pos]; + + // prepare for next iteration + pos -= buffer.size(); + } + + // normally unreachable + return 0; + } + + /** + * Get access to the raw data + * @param pos position in the buffer + * @param size number of continuous bytes + * @return char* + */ + virtual const char *data(size_t pos, size_t size) const override + { + // incorporate the skipped bytes + pos += _skip; + + // the buffer into which all data is going to be merged + std::vector *result = nullptr; + + // amount of data that we still have to process + size_t togo = _size + _skip; + + // number of trailing empty buffers + size_t empty = 0; + + // iterate over the parts + for (auto &buffer : _buffers) + { + // are we already merging? + if (result) + { + // merge buffer + result->insert(result->end(), buffer.begin(), buffer.end()); + + // one more empty buffer + ++empty; + } + + // does the data start within this buffer? + else if (buffer.size() > pos) + { + // remember that this is buffer into which all data is going to be merged + result = &buffer; + + // reserve enough space + result->reserve(togo); + } + + // data does not start in this part + else + { + // prepare for next iteration + pos -= buffer.size(); + } + } + + // remove empty buffers + if (empty > 0) _buffers.resize(_buffers.size() - empty); + + // done + return result->data(); + } + + /** + * Copy bytes to a buffer + * + * No safety checks are necessary: this method will only be called + * for bytes that actually exist + * + * @param pos position in the buffer + * @param size number of bytes to copy + * @param output buffer to copy into + * @return void* pointer to buffer + */ + virtual void *copy(size_t pos, size_t size, void *output) const override + { + // incorporate the skipped bytes + pos += _skip; + + // number of bytes already copied + size_t copied = 0; + + // iterate over the parts + for (const auto &buffer : _buffers) + { + // is the byte within this buffer? + if (buffer.size() > pos) + { + // number of bytes to copy + size_t tocopy = std::min(buffer.size() - pos, size); + + // copy data to the buffer + memcpy((char *)output + copied, buffer.data() + pos, tocopy); + + // update counters + copied += tocopy; + + // for next iteration we can start on position zero of the next buffer + pos = 0; size -= tocopy; + + // are we alread done? + if (size == 0) return output; + } + else + { + // prepare for next iteration + pos -= buffer.size(); + } + } + + // normally unreachable + return output; + } + + /** + * Add data to the buffer + * @param buffer + * @param size + */ + void add(const char *buffer, size_t size) + { + // add element + _buffers.emplace_back(buffer, buffer + size); + + // update total size + _size += size; + } + + /** + * Shrink the buffer with a number of bytes + * @param toremove + */ + void shrink(size_t toremove) + { + // are we removing everything? + if (toremove >= _size) + { + // reset all + _buffers.clear(); + _skip = _size = 0; + } + else + { + // keep looping + while (toremove > 0) + { + // access to the first buffer + const auto &first = _buffers.front(); + + // actual used bytes in first buffer + size_t bytes = first.size() - _skip; + + // can we remove the first buffer completely? + if (toremove >= bytes) + { + // we're going to remove the first item, update sizes + _size -= bytes; + _skip = 0; + + // number of bytes that still have to be removed + toremove -= bytes; + + // remove first buffer + _buffers.pop_front(); + } + else + { + // we should remove the first buffer partially + _skip += toremove; + _size -= toremove; + + // done + toremove = 0; + } + } + } + } + + /** + * Send the buffer to a socket + * @param socket + */ + void sendto(int socket) + { + // keep looping + while (_size > 0) + { + // we're going to fill a lot of buffers (64 should normally be enough) + struct iovec buffer[64]; + + // index counter + size_t index = 0; + + // iterate over the buffers + for (const auto &str : _buffers) + { + // fill buffer + buffer[index].iov_base = (void *)(index == 0 ? str.data() + _skip : str.data()); + buffer[index].iov_len = index == 0 ? str.size() - _skip : str.size(); + + // update counter for next iteration + if (++index >= 64) break; + } + + // send the data + auto result = writev(socket, (const struct iovec *)&buffer, index); + + // skip on error + if (result <= 0) return; + + // shrink the buffer + if (result > 0) shrink(result); + } + } + + /** + * Receive data from a socket + * @param socket + * @return ssize_t + */ + ssize_t receivefrom(int socket) + { + // find out how many bytes are available + int available = 0; + + // check the number of bytes that are available - in case of an error or + // when the buffer is very small, we use a lower limit of 512 bytes + if (ioctl(socket, FIONREAD, &available) != 0 || available < 512) return available = 512; + + // add a new buffer + _buffers.emplace_back(available); + + // read the actual buffer + auto &buffer = _buffers.back(); + + // read data into the buffer + return read(socket, buffer.data(), available); + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/tcpclosed.h b/src/tcpclosed.h new file mode 100644 index 0000000..c762014 --- /dev/null +++ b/src/tcpclosed.h @@ -0,0 +1,51 @@ +/** + * TcpClosed.h + * + * Class that is used when the TCP connection ends up in a closed state + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpClosed : public TcpState +{ +public: + /** + * Constructor + * @param connection The parent TcpConnection object + * @param handler User supplied handler + */ + TcpClosed(TcpConnection *connection, TcpHandler *handler) : + TcpState(connection, handler) {} + + /** + * Constructor + * @param state The to-be-copied state + */ + TcpClosed(const TcpState *state) : + TcpState(state) {} + + /** + * Destructor + */ + virtual ~TcpClosed() = default; +}; + +/** + * End of namespace + */ +} + diff --git a/src/tcpconnected.h b/src/tcpconnected.h new file mode 100644 index 0000000..41ce26c --- /dev/null +++ b/src/tcpconnected.h @@ -0,0 +1,167 @@ +/** + * TcpConnected.h + * + * The actual tcp connection - this is the "_impl" of a tcp-connection after + * the hostname was resolved into an IP address + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "tcpbuffer.h" + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpConnected : public TcpState +{ +private: + /** + * The socket file descriptor + * @var int + */ + int _socket; + + /** + * The outgoing buffer + * @var TcpBuffer + */ + TcpBuffer _out; + + /** + * An incoming buffer + * @var TcpBuffer + */ + TcpBuffer _in; + + +public: + /** + * Constructor + * @param connection Parent TCP connection object + * @param socket The socket filedescriptor + * @param buffer The buffer that was already built + * @param handler User-supplied handler object + */ + TcpConnected(TcpConnection *connection, int socket, TcpBuffer &&buffer, TcpHandler *handler) : + TcpState(connection, handler), + _socket(socket), + _out(std::move(buffer)) + { + // if there is already an output buffer, we have to send out that first + if (_out) _out.sendto(_socket); + + // tell the handler to monitor the socket, if there is an out + _handler->monitor(_connection, _socket, _out ? readable | writable : readable); + } + + /** + * Destructor + */ + virtual ~TcpConnected() + { + // we no longer have to monitor the socket + _handler->monitor(_connection, _socket, 0); + + // close the socket + close(_socket); + } + + /** + * Process the filedescriptor in the object + * @param fd Filedescriptor that is active + * @param flags AMQP::readable and/or AMQP::writable + * @return New state object + */ + virtual TcpState *process(int fd, int flags) override + { + // must be the socket + if (fd != _socket) return this; + + // can we write more data to the socket? + if (flags & writable) + { + // send out the buffered data + _out.sendto(_socket); + + // if buffer is empty by now, we no longer have to check for + // writability, but only for readability + if (!_out) _handler->monitor(_connection, _socket, readable); + } + + // should we check for readability too? + if (flags & readable) + { + // read data from buffer + auto result = _in.receivefrom(_socket); + + // is the connection in an error state? + if (result < 0 && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) + { + // we have an error - report this to the user + _handler->onError(_connection, strerror(errno)); + + // @todo object might no longer exist after this error + + // we have a new state + return new TcpClosed(this); + } + else + { + // parse the buffer + auto processed = _connection->parse(_in); + + // shrink buffer + _in.shrink(processed); + } + } + + // keep same object + return this; + } + + /** + * Send data over the connection + * @param buffer buffer to send + * @param size size of the buffer + */ + virtual void send(const char *buffer, size_t size) + { + // is there already a buffer of data that can not be sent? + if (_out) return _out.add(buffer, size); + + // there is no buffer, send the data right away + auto result = write(_socket, buffer, size); + + // number of bytes sent + size_t bytes = result < 0 ? 0 : result; + + // ok if all data was sent + if (bytes >= size) return; + + // add the data to the buffer + _out.add(buffer + bytes, size - bytes); + + // start monitoring the socket to find out when it is writable + _handler->monitor(_connection, _socket, readable | writable); + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/tcpconnection.cpp b/src/tcpconnection.cpp new file mode 100644 index 0000000..f90f0b6 --- /dev/null +++ b/src/tcpconnection.cpp @@ -0,0 +1,122 @@ +/** + * TcpConnection.cpp + * + * Implementation file for the TCP connection + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Dependencies + */ +#include "includes.h" +#include "tcpresolver.h" + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Constructor + * @param handler User implemented handler object + * @param address AMQP address object + */ +TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) : + Connection(this, address.login(), address.vhost()), + _state(new TcpResolver(this, address.hostname(), address.port(), handler)) {} + +/** + * Destructor + */ +TcpConnection::~TcpConnection() +{ + // remove the state object + delete _state; +} + +/** + * Process the TCP connection + * This method should be called when the filedescriptor that is registered + * in the event loop becomes active. You should pass in a flag holding the + * flags AMQP::readable or AMQP::writable to indicate whether the descriptor + * was readable or writable, or bitwise-or if it was both + * @param fd The filedescriptor that became readable or writable + * @param events What sort of events occured? + */ +void TcpConnection::process(int fd, int flags) +{ + // pass on the the state, that returns a new impl + auto *result = _state->process(fd, flags); + + // skip if the same state is continued to be used + if (result == _state) return; + + // remove old state + delete _state; + + // replace it with the new implementation + _state = result; +} + +/** + * Method that is called by the connection when data needs to be sent over the network + * @param connection The connection that created this output + * @param buffer Data to send + * @param size Size of the buffer + */ +void TcpConnection::onData(Connection *connection, const char *buffer, size_t size) +{ + // send the data over the connecction + _state->send(buffer, size); +} + +/** + * Method called when the connection ends up in an error state + * @param connection The connection that entered the error state + * @param message Error message + */ +void TcpConnection::onError(Connection *connection, const char *message) +{ + // current object is going to be removed, wrap it in a unique pointer to enforce that + std::unique_ptr ptr(_state); + + // object is now in a closed state + _state = new TcpClosed(_state); + + // tell the implementation to report the error + ptr->reportError(message); +} + +/** + * Method that is called when the connection is established + * @param connection The connection that can now be used + */ +void TcpConnection::onConnected(Connection *connection) +{ + // tell the implementation to report the status + _state->reportConnected(); +} + +/** + * Method that is called when the connection was closed. + * @param connection The connection that was closed and that is now unusable + */ +void TcpConnection::onClosed(Connection *connection) +{ + // current object is going to be removed, wrap it in a unique pointer to enforce that + std::unique_ptr ptr(_state); + + // object is now in a closed state + _state = new TcpClosed(_state); + + // tell the implementation to report the error + ptr->reportClosed(); +} + +/** + * End of namespace + */ +} + diff --git a/src/tcpresolver.h b/src/tcpresolver.h new file mode 100644 index 0000000..53489f1 --- /dev/null +++ b/src/tcpresolver.h @@ -0,0 +1,197 @@ +/** + * TcpResolver.h + * + * Class that is used for the DNS lookup of the hostname of the RabbitMQ + * server, and to make the initial connection + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "pipe.h" +#include "tcpstate.h" +#include "tcpclosed.h" +#include "tcpconnected.h" +#include + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpResolver : public TcpState +{ +private: + /** + * The hostname that we're trying to resolve + * @var std::string + */ + std::string _hostname; + + /** + * The portnumber to connect to + * @var uint16_t + */ + uint16_t _port; + + /** + * A pipe that is used to send back the socket that is connected to RabbitMQ + * @var Pipe + */ + Pipe _pipe; + + /** + * Non-blocking socket that is connected to RabbitMQ + * @var int + */ + int _socket = -1; + + /** + * Possible error that occured + * @var std::string + */ + std::string _error; + + /** + * Data that was sent to the connection, while busy resolving the hostname + * @var TcpBuffer + */ + TcpBuffer _buffer; + + /** + * Thread in which the DNS lookup occurs + * @var std::thread + */ + std::thread _thread; + + + /** + * Run the thread + */ + void run() + { + // prevent exceptions + try + { + // get address info + AddressInfo addresses(_hostname.data(), _port); + + // iterate over the addresses + for (size_t i = 0; i < addresses.size(); ++i) + { + // create the socket + _socket = socket(addresses[i]->ai_family, addresses[i]->ai_socktype, addresses[i]->ai_protocol); + + // move on on failure + if (_socket < -1) continue; + + // connect to the socket + if (connect(_socket, addresses[i]->ai_addr, addresses[i]->ai_addrlen) == 0) break; + + // log the error for the time being + _error = strerror(errno); + + // close socket because connect failed + close(_socket); + + // socket no longer is valid + _socket = -1; + } + + // connection succeeded, mark socket as non-blocking + if (_socket >= 0) fcntl(_socket, F_SETFL, O_NONBLOCK); + } + catch (const std::runtime_error &error) + { + // address could not be resolved, we ignore this for now, but store the error + _error = error.what(); + } + + // notify the master thread by sending a byte over the pipe + _pipe.notify(); + } + +public: + /** + * Constructor + * @param connection Parent connection object + * @param hostname The hostname for the lookup + * @param portnumber The portnumber for the lookup + * @param handler User implemented handler object + */ + TcpResolver(TcpConnection *connection, const std::string &hostname, uint16_t port, TcpHandler *handler) : + TcpState(connection, handler), + _hostname(hostname), + _port(port) + { + // tell the event loop to monitor the filedescriptor of the pipe + handler->monitor(connection, _pipe.in(), readable); + + // we can now start the thread (must be started after filedescriptor is monitored!) + std::thread thread(std::bind(&TcpResolver::run, this)); + + // store thread in member + _thread.swap(thread); + } + + /** + * Destructor + */ + virtual ~TcpResolver() + { + // stop monitoring the pipe filedescriptor + _handler->monitor(_connection, _pipe.in(), 0); + } + + /** + * Wait for the resolver to be ready + * @param fd The filedescriptor that is active + * @param flags Flags to indicate that fd is readable and/or writable + * @return New implementation object + */ + virtual TcpState *process(int fd, int flags) override + { + // only works if the incoming pipe is readable + if (fd != _pipe.in() || !(flags & readable)) return this; + + // wait for the thread to be ready + _thread.join(); + + // do we have a valid socket? + if (_socket >= 0) return new TcpConnected(_connection, _socket, std::move(_buffer), _handler); + + // report error + _handler->onError(_connection, _error.data()); + + // create dummy implementation + return new TcpClosed(_connection, _handler); + } + + /** + * Send data over the connection + * @param buffer buffer to send + * @param size size of the buffer + */ + virtual void send(const char *buffer, size_t size) + { + // add data to buffer + _buffer.add(buffer, size); + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/tcpstate.h b/src/tcpstate.h new file mode 100644 index 0000000..7cf4de0 --- /dev/null +++ b/src/tcpstate.h @@ -0,0 +1,115 @@ +/** + * TcpState.h + * + * Base class / interface of the various states of the TCP connection + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpState +{ +protected: + /** + * Parent TcpConnection object as is seen by the user + * @var TcpConnection + */ + TcpConnection *_connection; + + /** + * User-supplied handler + * @var TcpHandler + */ + TcpHandler *_handler; + +protected: + /** + * Protected constructor + * @param connection Original TCP connection object + * @param handler User-supplied handler class + */ + TcpState(TcpConnection *connection, TcpHandler *handler) : + _connection(connection), _handler(handler) {} + + /** + * Protected "copy" constructor + * @param state Original TcpState object + */ + TcpState(const TcpState *state) : + _connection(state->_connection), _handler(state->_handler) {} + +public: + /** + * Virtual destructor + */ + virtual ~TcpState() = default; + + /** + * Process the filedescriptor in the object + * @param fd The filedescriptor that is active + * @param flags AMQP::readable and/or AMQP::writable + * @return New implementation object + */ + virtual TcpState *process(int fd, int flags) + { + // default implementation does nothing and preserves same implementation + return this; + } + + /** + * Send data over the connection + * @param buffer buffer to send + * @param size size of the buffer + */ + virtual void send(const char *buffer, size_t size) + { + // default does nothing + } + + /** + * Report to the handler that the object is in an error state + * @param error + */ + void reportError(const char *error) + { + // pass to handler + _handler->onError(_connection, error); + } + + /** + * Report to the handler that the connection is ready for use + */ + void reportConnected() + { + // pass to handler + _handler->onConnected(_connection); + } + + /** + * Report to the handler that the connection was nicely closed + */ + void reportClosed() + { + // pass to handler + _handler->onClosed(_connection); + } +}; + +/** + * End of namespace + */ +} +