diff --git a/examples/libev.cpp b/examples/libev.cpp index 2188637..7bfd1af 100644 --- a/examples/libev.cpp +++ b/examples/libev.cpp @@ -75,6 +75,7 @@ int main() // make a connection AMQP::Address address("amqp://guest:guest@localhost/"); +// AMQP::Address address("amqps://guest:guest@localhost/"); AMQP::TcpConnection connection(&handler, address); // we need a channel too diff --git a/src/connectionimpl.cpp b/src/connectionimpl.cpp index 332e106..ab9f781 100644 --- a/src/connectionimpl.cpp +++ b/src/connectionimpl.cpp @@ -3,7 +3,7 @@ * * Implementation of an AMQP connection * - * @copyright 2014 - 2017 Copernica BV + * @copyright 2014 - 2018 Copernica BV */ #include "includes.h" #include "protocolheaderframe.h" @@ -147,7 +147,7 @@ uint64_t ConnectionImpl::parse(const Buffer &buffer) // data we need for the next frame, otherwise we need at least 7 // bytes for processing the header of the next frame _expected = receivedFrame.header() ? (uint32_t)receivedFrame.totalSize() : 7; - + // we're ready for now return processed; } diff --git a/src/linux_tcp/openssl.cpp b/src/linux_tcp/openssl.cpp index 655e17c..f038284 100644 --- a/src/linux_tcp/openssl.cpp +++ b/src/linux_tcp/openssl.cpp @@ -110,6 +110,21 @@ int SSL_set_fd(SSL *ssl, int fd) return func(ssl, fd); } +/** + * The number of bytes availabe in the ssl struct that have been read + * from the socket, but that have not been returned the SSL_read() + * @param ssl SSL object + * @return int number of bytes + */ +int SSL_pending(const SSL *ssl) +{ + // create a function + static Function func("SSL_pending"); + + // call the openssl function + return func(ssl); +} + /** * Free an allocated ssl context * @param ctx diff --git a/src/linux_tcp/openssl.h b/src/linux_tcp/openssl.h index 9878ca1..e3461e2 100644 --- a/src/linux_tcp/openssl.h +++ b/src/linux_tcp/openssl.h @@ -40,6 +40,7 @@ int SSL_do_handshake(SSL *ssl); int SSL_read(SSL *ssl, void *buf, int num); int SSL_write(SSL *ssl, const void *buf, int num); int SSL_shutdown(SSL *ssl); +int SSL_pending(const SSL *ssl); int SSL_set_fd(SSL *ssl, int fd); int SSL_get_shutdown(const SSL *ssl); int SSL_get_error(const SSL *ssl, int ret); diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index 8f34f76..7d4b90e 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -59,7 +59,7 @@ private: * Are we now busy with sending or receiving? * @var enum */ - enum { + enum State { state_idle, state_sending, state_receiving @@ -131,9 +131,6 @@ private: // if we still have an outgoing buffer we want to send out data if (_out) { - // we still have a buffer with outgoing data - _state = state_sending; - // let's wait until the socket becomes writable _handler->monitor(_connection, _socket, readable | writable); } @@ -150,9 +147,6 @@ private: } else { - // outgoing buffer is empty, we're idle again waiting for further input - _state = state_idle; - // let's wait until the socket becomes readable _handler->monitor(_connection, _socket, readable); } @@ -164,14 +158,18 @@ private: /** * Method to repeat the previous call\ * @param monitor monitor to check if connection object still exists + * @param state the state that we were in * @param result result of an earlier SSL_get_error call * @return TcpState* */ - TcpState *repeat(const Monitor &monitor, int error) + TcpState *repeat(const Monitor &monitor, enum State state, int error) { // check the error switch (error) { case SSL_ERROR_WANT_READ: + // remember state + _state = state; + // the operation must be repeated when readable _handler->monitor(_connection, _socket, readable); @@ -179,6 +177,9 @@ private: return monitor.valid() ? this : nullptr; case SSL_ERROR_WANT_WRITE: + // remember state + _state = state; + // wait until socket becomes writable again _handler->monitor(_connection, _socket, readable | writable); @@ -188,7 +189,7 @@ private: case SSL_ERROR_NONE: // we're ready for the next instruction from userspace _state = state_idle; - + // turns out no error occured, an no action has to be rescheduled _handler->monitor(_connection, _socket, _out || _closed ? readable | writable : readable); @@ -224,7 +225,7 @@ private: // parse the buffer auto processed = _connection->parse(buffer); - + // "this" could be removed by now, check this if (!monitor.valid()) return nullptr; @@ -235,7 +236,7 @@ private: _in = std::move(buffer); // do we have to reallocate? - if (!_reallocate) return proceed(); + if (!_reallocate) return this; // reallocate the buffer _in.reallocate(_reallocate); @@ -244,8 +245,60 @@ private: _reallocate = 0; // done + return this; + } + + /** + * Perform a write operation + * @param monitor + * @return TcpState* + */ + TcpState *write(const Monitor &monitor) + { + // assume default state + _state = state_idle; + + // try to send more data from the outgoing buffer + auto result = _out.sendto(_ssl); + + // if this is a success, we can proceed with the event loop + if (result > 0) return proceed(); + + // the operation failed, we may have to repeat our call + return repeat(monitor, state_sending, OpenSSL::SSL_get_error(_ssl, result)); + } + + /** + * Perform a receive operation + * @param monitor + * @return TcpState + */ + TcpState *receive(const Monitor &monitor) + { + // start a loop + do + { + // assume default state + _state = state_idle; + + // read data from ssl into the buffer + auto result = _in.receivefrom(_ssl, _connection->expected()); + + // if this is a failure, we are going to repeat the operation + if (result <= 0) return repeat(monitor, state_receiving, OpenSSL::SSL_get_error(_ssl, result)); + + // go process the received data + auto *nextstate = parse(monitor, result); + + // leap out if we moved to a different state + if (nextstate != this) return nextstate; + } + while (OpenSSL::SSL_pending(_ssl) > 0); + + // go to the next state return proceed(); } + public: /** @@ -295,29 +348,22 @@ public: // the socket must be the one this connection writes to if (fd != _socket) return this; - // are we busy with sending or receiving data? - if (_state == state_sending) - { - // try to send more data from the outgoing buffer - auto result = _out.sendto(_ssl); - - // if this is a success, we can proceed with the event loop - if (result > 0) return proceed(); - - // the operation failed, we may have to repeat our call - else return repeat(monitor, OpenSSL::SSL_get_error(_ssl, result)); - } - else - { - // read data from ssl into the buffer - auto result = _in.receivefrom(_ssl, _connection->expected()); - - // if this is a success, we may have to update the monitor - if (result > 0) return parse(monitor, result); - - // the operation failed, we may have to repeat our call - else return repeat(monitor, OpenSSL::SSL_get_error(_ssl, result)); - } + // if we were busy with a write operation, we have to repeat that + if (_state == state_sending) return write(monitor); + + // same is true for read operations, they should also be repeated + if (_state == state_receiving) return receive(monitor); + + // if the socket is readable, we are going to receive data + if (flags & readable) return receive(monitor); + + // socket is not readable (so it must be writable), do we have data to write? + if (_out) return write(monitor); + + // the only scenario in which we can end up here is the socket should be + // closed, but instead of moving to the shutdown-state right, we call proceed() + // because that function is a little more careful + return proceed(); } /** @@ -336,14 +382,17 @@ public: // keep looping while we have an outgoing buffer while (_out) { + // move to the idle-state + _state = state_idle; + // try to send more data from the outgoing buffer auto result = _out.sendto(_ssl); // was this a success? if (result > 0) { - // parse the buffer - auto *nextstate = parse(monitor, result); + // proceed to the next state + auto *nextstate = proceed(); // leap out if we move to a different state if (nextstate != this) return nextstate; @@ -353,8 +402,8 @@ public: // error was returned, so we must investigate what is going on auto error = OpenSSL::SSL_get_error(_ssl, result); - // get the next state given this error - auto *nextstate = repeat(monitor, error); + // get the next state given the error + auto *nextstate = repeat(monitor, state_sending, error); // leap out if we move to a different state if (nextstate != this) return nextstate; @@ -385,9 +434,6 @@ public: // for that operation to complete before we can move on if (_state != state_idle) return; - // object is now busy sending - _state = state_sending; - // let's wait until the socket becomes writable _handler->monitor(_connection, _socket, readable | writable); } diff --git a/src/linux_tcp/sslshutdown.h b/src/linux_tcp/sslshutdown.h index 7083ea1..64791cf 100644 --- a/src/linux_tcp/sslshutdown.h +++ b/src/linux_tcp/sslshutdown.h @@ -190,6 +190,9 @@ public: // close the connection auto result = OpenSSL::SSL_shutdown(_ssl); + // on result==0 we need an additional call + while (result == 0) result = OpenSSL::SSL_shutdown(_ssl); + // if this is a success, we can proceed with the event loop if (result > 0) return proceed(monitor); @@ -212,6 +215,9 @@ public: { // close the connection auto result = OpenSSL::SSL_shutdown(_ssl); + + // on result==0 we need an additional call + while (result == 0) result = OpenSSL::SSL_shutdown(_ssl); // if this is a success, we can proceed with the event loop if (result > 0) return proceed(monitor);