diff --git a/examples/libev.cpp b/examples/libev.cpp index d299f37..3922363 100644 --- a/examples/libev.cpp +++ b/examples/libev.cpp @@ -68,7 +68,6 @@ int main() AMQP::Address address("amqps://guest:guest@localhost/"); AMQP::TcpConnection connection(&handler, address); - // we need a channel too AMQP::TcpChannel channel(&connection); diff --git a/include/amqpcpp/address.h b/include/amqpcpp/address.h index f77255e..efccf8e 100644 --- a/include/amqpcpp/address.h +++ b/include/amqpcpp/address.h @@ -4,7 +4,7 @@ * An AMQP address in the "amqp://user:password@hostname:port/vhost" notation * * @author Emiel Bruijntjes - * @copyright 2015 Copernica BV + * @copyright 2015 - 2018 Copernica BV */ /** @@ -66,10 +66,11 @@ public: // position of the last byte const char *last = data + size; - // must start with amqp:// or ampqs:// - if (strncmp(data, "amqps://", 8) == 0) _secure = true; - else if (strncmp(data, "amqp://", 7) == 0) _secure = false; - else throw std::runtime_error("AMQP address should start with \"amqp://\" or \"amqps://\""); + // must start with ampqs:// to have a secure connection (and we also assign a different default port) + if ((_secure = strncmp(data, "amqps://", 8) == 0)) _port = 5671; + + // otherwise protocol must be amqp:// + else if (strncmp(data, "amqp://", 7) != 0) throw std::runtime_error("AMQP address should start with \"amqp://\" or \"amqps://\""); // begin of the string was parsed data += _secure ? 8 : 7; diff --git a/src/linux_tcp/tcpconnected.h b/src/linux_tcp/tcpconnected.h index dd9a71e..c523183 100644 --- a/src/linux_tcp/tcpconnected.h +++ b/src/linux_tcp/tcpconnected.h @@ -5,7 +5,7 @@ * the hostname was resolved into an IP address * * @author Emiel Bruijntjes - * @copyright 2015 - 2016 Copernica BV + * @copyright 2015 - 2018 Copernica BV */ /** @@ -18,6 +18,7 @@ */ #include "tcpoutbuffer.h" #include "tcpinbuffer.h" +#include "wait.h" /** * Set up namespace @@ -83,30 +84,6 @@ private: return monitor.valid() ? new TcpClosed(this) : nullptr; } - /** - * Wait until the socket is writable - * @return bool - */ - bool wait4writable() - { - // we need the fd-sets - fd_set readables, writables, exceptions; - - // initialize all the sets - FD_ZERO(&readables); - FD_ZERO(&writables); - FD_ZERO(&exceptions); - - // add the one socket - FD_SET(_socket, &writables); - - // wait for the socket - auto result = select(_socket + 1, &readables, &writables, &exceptions, nullptr); - - // check for success - return result == 0; - } - public: /** * Constructor @@ -242,11 +219,14 @@ public: */ virtual TcpState *flush() override { + // create an object to wait for the filedescriptor to becomes active + Wait wait(_socket); + // keep running until the out buffer is empty while (_out) { // poll the socket, is it already writable? - if (!wait4writable()) return this; + if (!wait.writable()) return this; // socket is writable, send as much data as possible auto *newstate = process(_socket, writable); diff --git a/src/linux_tcp/tcpinbuffer.h b/src/linux_tcp/tcpinbuffer.h index 65971ec..d7ea09c 100644 --- a/src/linux_tcp/tcpinbuffer.h +++ b/src/linux_tcp/tcpinbuffer.h @@ -107,6 +107,20 @@ public: // done return result; } + + /** + * Receive data from a socket + * @param ssl ssl wrapped socket to read from + * @param expected number of bytes that the library expects + * @return ssize_t + */ + /* + ssize_t receivefrom(SSL *ssl, uint32_t expected) + { + // @todo implementation + return 0; + } + */ /** * Shrink the buffer (in practice this is always called with the full buffer size) diff --git a/src/linux_tcp/tcpoutbuffer.h b/src/linux_tcp/tcpoutbuffer.h index 186a9b4..4364c34 100644 --- a/src/linux_tcp/tcpoutbuffer.h +++ b/src/linux_tcp/tcpoutbuffer.h @@ -18,7 +18,7 @@ */ #include #include - + /** * FIONREAD on Solaris is defined elsewhere */ @@ -195,10 +195,36 @@ public: } } + /** + * Fill an iovec buffer + * @param buffers the buffers to be filled + * @param count number of buffers available + * @return size_t the number of buffers that were filled + */ + size_t fill(struct iovec buffers[], size_t count) const + { + // index counter + size_t index = 0; + + // iterate over the buffers + for (const auto &str : _buffers) + { + // fill buffer + buffers[index].iov_base = (void *)(index == 0 ? str.data() + _skip : str.data()); + buffers[index].iov_len = index == 0 ? str.size() - _skip : str.size(); + + // update counter for next iteration + if (++index >= count) return count; + } + + // done + return index; + } + /** * Send the buffer to a socket - * @param socket - * @return ssize_t + * @param socket the socket to send data to + * @return ssize_t number of bytes sent (or the same result as sendmsg() in case of an error) */ ssize_t sendto(int socket) { @@ -211,20 +237,6 @@ public: // we're going to fill a lot of buffers (64 should normally be enough) struct iovec buffer[64]; - // index counter - size_t index = 0; - - // iterate over the buffers - for (const auto &str : _buffers) - { - // fill buffer - buffer[index].iov_base = (void *)(index == 0 ? str.data() + _skip : str.data()); - buffer[index].iov_len = index == 0 ? str.size() - _skip : str.size(); - - // update counter for next iteration - if (++index >= 64) break; - } - // create the message header struct msghdr header; @@ -233,7 +245,10 @@ public: // save the buffers in the message header header.msg_iov = buffer; - header.msg_iovlen = index; + header.msg_iovlen = fill(buffer, 64); + + // do nothing if no buffers were filled + if (header.msg_iovlen == 0) break; // send the data auto result = sendmsg(socket, &header, AMQP_CPP_MSG_NOSIGNAL); @@ -251,6 +266,37 @@ public: // done return total; } + + /** + * Send the buffer to an SSL connection + * @param ssl the ssl context to send data to + * @return ssize_t number of bytes sent, or the return value of ssl_write + */ + /* + ssize_t sendto(SSL *ssl) + { + // we're going to fill a lot of buffers (for ssl only one buffer at a time can be sent) + struct iovec buffer[1]; + + // fill the buffers, and leap out if there is no data + auto buffers = fill(buffer, 1); + + std::cout << "buffercount = " << buffers << std::endl; + + if (buffers == 0) return 0; + + // send the data + auto result = SSL_write(ssl, buffer[0].iov_base, buffer[0].iov_len); + + // @todo do we have to move to the next buffer to prevent that this buffer is further filled? + + // on success we shrink the buffer + if (result > 0) shrink(result); + + // done + return result; + } + */ }; /** diff --git a/src/linux_tcp/tcpresolver.h b/src/linux_tcp/tcpresolver.h index 4a4a852..008bbc6 100644 --- a/src/linux_tcp/tcpresolver.h +++ b/src/linux_tcp/tcpresolver.h @@ -20,7 +20,6 @@ #include "tcpstate.h" #include "tcpclosed.h" #include "tcpconnected.h" -#include "tcpsslhandshake.h" #include /** @@ -195,7 +194,7 @@ public: if (_socket >= 0) { // if we need a secure connection, we move to the tls handshake - if (_secure) return new TcpSslHandshake(_connection, _socket, std::move(_buffer), _handler); + //if (_secure) return new TcpSslHandshake(_connection, _socket, std::move(_buffer), _handler); // otherwise we have a valid regular tcp connection return new TcpConnected(_connection, _socket, std::move(_buffer), _handler); @@ -221,7 +220,7 @@ public: if (_socket >= 0) { // if we need a secure connection, we move to the tls handshake - if (_secure) return new TcpSslHandshake(_connection, _socket, std::move(_buffer), _handler); + //if (_secure) return new TcpSslHandshake(_connection, _socket, std::move(_buffer), _handler); // otherwise we have a valid regular tcp connection return new TcpConnected(_connection, _socket, std::move(_buffer), _handler); diff --git a/src/linux_tcp/tcpsslconnected.h b/src/linux_tcp/tcpsslconnected.h new file mode 100644 index 0000000..232fb0b --- /dev/null +++ b/src/linux_tcp/tcpsslconnected.h @@ -0,0 +1,290 @@ +/** + * TcpSslConnected.h + * + * The actual tcp connection over SSL + * + * @copyright 2018 copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include "tcpoutbuffer.h" +#include "tcpinbuffer.h" +#include "wait.h" +#include + +#include + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class TcpSslConnected: public TcpState, private Watchable +{ +private: + /** + * The SSL context + * @var SSL* + */ + SSL *_ssl; + + /** + * Socket file descriptor + * @var int + */ + int _socket; + + /** + * The outgoing buffer + * @var TcpBuffer + */ + TcpOutBuffer _out; + + /** + * The incoming buffer + * @var TcpInBuffer + */ + TcpInBuffer _in; + + /** + * Are we now busy with sending or receiving? + * @var enum + */ + enum { + state_idle, + state_sending, + state_receiving + } _state; + + + /** + * Helper method to report an error + * @return bool Was an error reported? + */ + bool reportError() + { + // some errors are ok and do not (necessarily) mean that we're disconnected + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false; + + // we have an error - report this to the user + _handler->onError(_connection, strerror(errno)); + + // done + return true; + } + + /** + * Construct the next state + * @param monitor Object that monitors whether connection still exists + * @return TcpState* + */ + TcpState *nextState(const Monitor &monitor) + { + // if the object is still in a valid state, we can move to the close-state, + // otherwise there is no point in moving to a next state + return monitor.valid() ? new TcpClosed(this) : nullptr; + } + + /** + * Proceed with the previous operation, possibly changing the monitor + * @return TcpState* + */ + TcpState *proceed() + { + // if we still have an outgoing buffer we want to send out data + if (_out) + { + // we still have a buffer with outgoing data + _state = state_sending; + + // let's wait until the socket becomes writable + _handler->monitor(_connection, _socket, readable); + } + else + { + // outgoing buffer is empty, we're idle again waiting for further input + _state = state_idle; + + // let's wait until the socket becomes readable + _handler->monitor(_connection, _socket, readable); + } + + // done + return this; + } + + /** + * Method to repeat the previous call + * @param result result of an earlier openssl operation + * @return TcpState* + */ + TcpState *repeat(int result) + { + // error was returned, so we must investigate what is going on + auto error = SSL_get_error(_ssl, result); + + std::cout << "error = " << error << std::endl; + + // check the error + switch (error) { + case SSL_ERROR_WANT_READ: + // the operation must be repeated when readable + std::cout << "want read" << std::endl; + + _handler->monitor(_connection, _socket, readable); + return this; + + case SSL_ERROR_WANT_WRITE: + // wait until socket becomes writable again + std::cout << "want write" << std::endl; + + _handler->monitor(_connection, _socket, writable); + return this; + + default: + std::cout << "something else" << std::endl; + + // @todo check how to handle this + return this; + } + } + + +public: + /** + * Constructor + * @param connection Parent TCP connection object + * @param socket The socket filedescriptor + * @param ssl The SSL structure + * @param buffer The buffer that was already built + * @param handler User-supplied handler object + */ + TcpSslConnected(TcpConnection *connection, int socket, SSL *ssl, TcpOutBuffer &&buffer, TcpHandler *handler) : + TcpState(connection, handler), + _ssl(ssl), + _socket(socket), + _out(std::move(buffer)), + _in(4096), + _state(_out ? state_sending : state_idle) + { + std::cout << "ssl-connected" << std::endl; + + // tell the handler to monitor the socket if there is an out + _handler->monitor(_connection, _socket, _state == state_sending ? writable : readable); + } + + /** + * Destructor + */ + virtual ~TcpSslConnected() noexcept + { + // skip if handler is already forgotten + if (_handler == nullptr) return; + + // we no longer have to monitor the socket + _handler->monitor(_connection, _socket, 0); + + // close the socket + close(_socket); + } + + /** + * The filedescriptor of this connection + * @return int + */ + virtual int fileno() const override { return _socket; } + + /** + * Process the filedescriptor in the object + * @param fd The filedescriptor that is active + * @param flags AMQP::readable and/or AMQP::writable + * @return New implementation object + */ + virtual TcpState *process(int fd, int flags) + { + std::cout << "process call in ssl-connected" << std::endl; + + std::cout << fd << " - " << _socket << std::endl; + + + // the socket must be the one this connection writes to + if (fd != _socket) return this; + + // because the object might soon be destructed, we create a monitor to check this + Monitor monitor(this); + + // are we busy with sending or receiving data? + if (_state == state_sending) + { + std::cout << "busy sending" << std::endl; + + // try to send more data from the outgoing buffer + auto result = _out.sendto(_ssl); + + std::cout << "result = " << result << std::endl; + + // if this is a success, we may have to update the monitor + if (result > 0) return proceed(); + + // the operation failed, we may have to repeat our call + else return repeat(result); + } + else + { + // read data from ssl into the buffer + auto result = _in.receivefrom(_ssl, _connection->expected()); + + // if this is a success, we may have to update the monitor + // @todo also parse the buffer + if (result > 0) return proceed(); + + // the operation failed, we may have to repeat our call + else return repeat(result); + + + + // we're busy with receiving data + // @todo check this + + std::cout << "receive data" << std::endl; + + } + + // keep same object + return this; + } + + /** + * Send data over the connection + * @param buffer buffer to send + * @param size size of the buffer + */ + virtual void send(const char *buffer, size_t size) + { + // put the data in the outgoing buffer + _out.add(buffer, size); + + // if we're already busy with sending or receiving, we first have to wait + // for that operation to complete before we can move on + if (_state != state_idle) return; + + // let's wait until the socket becomes writable + _handler->monitor(_connection, _socket, writable); + } +}; + +/** + * End of namespace + */ +} + diff --git a/src/linux_tcp/tcpsslhandshake.h b/src/linux_tcp/tcpsslhandshake.h index 2070c27..eccb860 100644 --- a/src/linux_tcp/tcpsslhandshake.h +++ b/src/linux_tcp/tcpsslhandshake.h @@ -16,10 +16,12 @@ * Dependencies */ #include "tcpoutbuffer.h" +#include "tcpsslconnected.h" +#include "wait.h" +#include #include #include - #include /** @@ -33,7 +35,12 @@ namespace AMQP { class TcpSslHandshake : public TcpState, private Watchable { private: - + /** + * SSL context + * @var SSL_CTX + */ + SSL_CTX *ctx; + /** * SSL structure * @var SSL @@ -53,7 +60,6 @@ private: TcpOutBuffer _out; - /** * Helper method to report an error * @return TcpState* @@ -67,7 +73,7 @@ private: _handler->onError(_connection, "failed to setup ssl connection"); // close the socket - close(_socket); + close(_socket); // done, go to the closed state return new TcpClosed(_connection, _handler); @@ -85,30 +91,6 @@ private: return monitor.valid() ? new TcpClosed(this) : nullptr; } - /** - * Wait until the socket is writable - * @return bool - */ - bool wait4writable() - { - // we need the fd-sets - fd_set readables, writables, exceptions; - - // initialize all the sets - FD_ZERO(&readables); - FD_ZERO(&writables); - FD_ZERO(&exceptions); - - // add the one socket - FD_SET(_socket, &writables); - - // wait for the socket - auto result = select(_socket + 1, &readables, &writables, &exceptions, nullptr); - - // check for success - return result == 0; - } - public: /** * Constructor @@ -126,22 +108,29 @@ public: TcpState(connection, handler), _socket(socket), _out(std::move(buffer)) - { + { + // init the SSL library SSL_library_init(); + // create ssl context + ctx = SSL_CTX_new(TLS_client_method()); + // create ssl object - _ssl = SSL_new(SSL_CTX_new(TLS_client_method())); - + _ssl = SSL_new(ctx); + // leap out on error if (_ssl == nullptr) throw std::runtime_error("ERROR: SSL structure is null"); - + // we will be using the ssl context as a client - // @todo check return value SSL_set_connect_state(_ssl); + // associate the ssl context with the socket filedescriptor - // @todo check return value - SSL_set_fd(_ssl, socket); + int set_fd_ret = SSL_set_fd(_ssl, socket); + if (set_fd_ret == 0) { + reportError(); + std::cout << "error while setting file descriptor" << std::endl; + } // we are going to wait until the socket becomes writable before we start the handshake _handler->monitor(_connection, _socket, writable); @@ -153,7 +142,8 @@ public: virtual ~TcpSslHandshake() noexcept { // close the socket - close(_socket); + // @todo only if really closed + //close(_socket); } /** @@ -175,38 +165,57 @@ public: // start the ssl handshake int result = SSL_do_handshake(_ssl); - + // if the connection succeeds, we can move to the ssl-connected state - // @todo we need the sslconnected state object - if (result == 1) return this; // new TcpSslConnected(connection, socket, _ssl, std::move(_out), _handler); + if (result == 1) return new TcpSslConnected(_connection, _socket, _ssl, std::move(_out), _handler); // if there is a failure, we must close down the connection - if (result == 0) return reportError(); - - // -1 was returned, so we must investigate what is going on - auto error = SSL_get_error(_ssl, result); + if (result <= 0) + { + // error was returned, so we must investigate what is going on + auto error = SSL_get_error(_ssl, result); + + // check the error + switch (error) { + case SSL_ERROR_WANT_READ: + // the handshake must be repeated when socket is readable, wait for that + std::cout << "wait for readability" << std::endl; + _handler->monitor(_connection, _socket, readable); + break; - // check the error - switch (error) { - case SSL_ERROR_WANT_READ: - // the handshake must be repeated when socket is readable, wait for that - std::cout << "wait for readability" << std::endl; - _handler->monitor(_connection, _socket, readable); - break; - - case SSL_ERROR_WANT_WRITE: - // the handshake must be repeated when socket is readable, wait for that - std::cout << "wait for writability" << std::endl; - _handler->monitor(_connection, _socket, writable); - break; - - default: - // @todo implement handling other error states - std::cout << "unknown error state " << error << std::endl; - // @todo we have to close the connection - return reportError(); + case SSL_ERROR_WANT_WRITE: + // the handshake must be repeated when socket is readable, wait for that + std::cout << "wait for writability" << std::endl; + _handler->monitor(_connection, _socket, writable); + break; + + case SSL_ERROR_WANT_ACCEPT: + // the BIO was not connected yet, the SSL function should be called again + std::cout << "wait for acceptability" << ERR_error_string(ERR_get_error(), nullptr) << std::endl; + _handler->monitor(_connection, _socket, writable); + + break; + case SSL_ERROR_WANT_X509_LOOKUP: + std::cout << "SSL_ERROR_WANT_X509_LOOKUP" << ERR_error_string(ERR_get_error(), nullptr) << std::endl; + _handler->monitor(_connection, _socket, writable); + + break; + case SSL_ERROR_SYSCALL: + std::cout << "SSL_ERROR_SYSCALL: " << ERR_error_string(ERR_get_error(), nullptr) << std::endl; + _handler->monitor(_connection, _socket, writable); + + break; + case SSL_ERROR_SSL: + std::cout << "SSL_ERROR_SSL" << ERR_error_string(ERR_get_error(), nullptr) << std::endl; + _handler->monitor(_connection, _socket, writable); + + break; + default: + std::cout << "unknown error state " << error << std::endl; + return reportError(); + } } - + // keep same object return this; } @@ -218,9 +227,8 @@ public: */ virtual void send(const char *buffer, size_t size) override { - - // @todo because the handshake is still busy, outgoing data must be cached - + // the handshake is still busy, outgoing data must be cached + _out.add(buffer, size); } /** @@ -229,37 +237,48 @@ public: */ virtual TcpState *flush() override { - // @todo implementation? - return nullptr; - } - - /** - * Report that heartbeat negotiation is going on - * @param heartbeat suggested heartbeat - * @return uint16_t accepted heartbeat - */ - virtual uint16_t reportNegotiate(uint16_t heartbeat) override - { - /* - * @todo what should we do here? + // create an object to wait for the filedescriptor to becomes active + Wait wait(_socket); - // remember that we have to reallocated (_in member can not be accessed because it is moved away) - _reallocate = _connection->maxFrame(); - - // pass to base - return TcpState::reportNegotiate(heartbeat); - */ - - return 0; - } + // keep looping + while (true) + { + // start the ssl handshake + int result = SSL_do_handshake(_ssl); + + // if the connection succeeds, we can move to the ssl-connected state + if (result == 1) return new TcpSslConnected(_connection, _socket, _ssl, std::move(_out), _handler); + + // error was returned, so we must investigate what is going on + auto error = SSL_get_error(_ssl, result); + + // check the error + switch (error) { + case SSL_ERROR_WANT_READ: + // wait for the socket to become readable + if (!wait.readable()) return reportError(); + break; + + case SSL_ERROR_WANT_WRITE: + // wait for the socket to become writable + if (!wait.writable()) return reportError(); + break; + + default: + // report an error + return reportError(); + } + } + + // keep same object (we never reach this code) + return this; + } /** * Report to the handler that the connection was nicely closed */ virtual void reportClosed() override { - /* - // we no longer have to monitor the socket _handler->monitor(_connection, _socket, 0); @@ -277,7 +296,6 @@ public: // notify to handler handler->onClosed(_connection); - */ } }; diff --git a/src/linux_tcp/wait.h b/src/linux_tcp/wait.h new file mode 100644 index 0000000..ecd3817 --- /dev/null +++ b/src/linux_tcp/wait.h @@ -0,0 +1,93 @@ +/** + * Wait.h + * + * Class to wait for a socket to become readable and/or writable + * + * @copyright 2018 Copernica BV + */ + + /** + * Include guard + */ +#pragma once + +/** + * Begin of namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class Wait +{ +private: + /** + * Set with just one filedescriptor + * @var fd_set + */ + fd_set _set; + + /** + * The current socket // @todo what is it exactly? + * @var int + */ + int _socket; + +public: + /** + * Constructor + * @param fd the filedescriptor that we're waiting on + */ + Wait(int fd) + { + _socket = fd; + + // initialize the set + FD_ZERO(&_set); + + // add the one socket + FD_SET(_socket, &_set); + } + + /** + * Destructor + */ + virtual ~Wait() = default; + + /** + * Wait until the filedescriptor becomes readable + * @return bool + */ + bool readable() + { + // wait for the socket + return select(_socket + 1, &_set, nullptr, nullptr, nullptr) > 0; + } + + /** + * Wait until the filedescriptor becomes writable + * @return bool + */ + bool writable() + { + // wait for the socket + return select(_socket + 1, nullptr, &_set, nullptr, nullptr) > 0; + } + + /** + * Wait until a filedescriptor becomes active (readable or writable) + * @return bool + */ + bool active() + { + // wait for the socket + return select(_socket + 1, &_set, &_set, nullptr, nullptr) > 0; + } +}; + +/** + * End of namespace + */ +} +