work in progress on ssl support for the AMQP-CPP library
This commit is contained in:
parent
9c1a09a711
commit
e46dfcf3b8
|
|
@ -68,7 +68,6 @@ int main()
|
||||||
AMQP::Address address("amqps://guest:guest@localhost/");
|
AMQP::Address address("amqps://guest:guest@localhost/");
|
||||||
AMQP::TcpConnection connection(&handler, address);
|
AMQP::TcpConnection connection(&handler, address);
|
||||||
|
|
||||||
|
|
||||||
// we need a channel too
|
// we need a channel too
|
||||||
AMQP::TcpChannel channel(&connection);
|
AMQP::TcpChannel channel(&connection);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
* An AMQP address in the "amqp://user:password@hostname:port/vhost" notation
|
* An AMQP address in the "amqp://user:password@hostname:port/vhost" notation
|
||||||
*
|
*
|
||||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||||
* @copyright 2015 Copernica BV
|
* @copyright 2015 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -66,10 +66,11 @@ public:
|
||||||
// position of the last byte
|
// position of the last byte
|
||||||
const char *last = data + size;
|
const char *last = data + size;
|
||||||
|
|
||||||
// must start with amqp:// or ampqs://
|
// must start with ampqs:// to have a secure connection (and we also assign a different default port)
|
||||||
if (strncmp(data, "amqps://", 8) == 0) _secure = true;
|
if ((_secure = strncmp(data, "amqps://", 8) == 0)) _port = 5671;
|
||||||
else if (strncmp(data, "amqp://", 7) == 0) _secure = false;
|
|
||||||
else throw std::runtime_error("AMQP address should start with \"amqp://\" or \"amqps://\"");
|
// otherwise protocol must be amqp://
|
||||||
|
else if (strncmp(data, "amqp://", 7) != 0) throw std::runtime_error("AMQP address should start with \"amqp://\" or \"amqps://\"");
|
||||||
|
|
||||||
// begin of the string was parsed
|
// begin of the string was parsed
|
||||||
data += _secure ? 8 : 7;
|
data += _secure ? 8 : 7;
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
* the hostname was resolved into an IP address
|
* the hostname was resolved into an IP address
|
||||||
*
|
*
|
||||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||||
* @copyright 2015 - 2016 Copernica BV
|
* @copyright 2015 - 2018 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
#include "tcpoutbuffer.h"
|
#include "tcpoutbuffer.h"
|
||||||
#include "tcpinbuffer.h"
|
#include "tcpinbuffer.h"
|
||||||
|
#include "wait.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
|
|
@ -83,30 +84,6 @@ private:
|
||||||
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait until the socket is writable
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
bool wait4writable()
|
|
||||||
{
|
|
||||||
// we need the fd-sets
|
|
||||||
fd_set readables, writables, exceptions;
|
|
||||||
|
|
||||||
// initialize all the sets
|
|
||||||
FD_ZERO(&readables);
|
|
||||||
FD_ZERO(&writables);
|
|
||||||
FD_ZERO(&exceptions);
|
|
||||||
|
|
||||||
// add the one socket
|
|
||||||
FD_SET(_socket, &writables);
|
|
||||||
|
|
||||||
// wait for the socket
|
|
||||||
auto result = select(_socket + 1, &readables, &writables, &exceptions, nullptr);
|
|
||||||
|
|
||||||
// check for success
|
|
||||||
return result == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
|
@ -242,11 +219,14 @@ public:
|
||||||
*/
|
*/
|
||||||
virtual TcpState *flush() override
|
virtual TcpState *flush() override
|
||||||
{
|
{
|
||||||
|
// create an object to wait for the filedescriptor to becomes active
|
||||||
|
Wait wait(_socket);
|
||||||
|
|
||||||
// keep running until the out buffer is empty
|
// keep running until the out buffer is empty
|
||||||
while (_out)
|
while (_out)
|
||||||
{
|
{
|
||||||
// poll the socket, is it already writable?
|
// poll the socket, is it already writable?
|
||||||
if (!wait4writable()) return this;
|
if (!wait.writable()) return this;
|
||||||
|
|
||||||
// socket is writable, send as much data as possible
|
// socket is writable, send as much data as possible
|
||||||
auto *newstate = process(_socket, writable);
|
auto *newstate = process(_socket, writable);
|
||||||
|
|
|
||||||
|
|
@ -107,6 +107,20 @@ public:
|
||||||
// done
|
// done
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receive data from a socket
|
||||||
|
* @param ssl ssl wrapped socket to read from
|
||||||
|
* @param expected number of bytes that the library expects
|
||||||
|
* @return ssize_t
|
||||||
|
*/
|
||||||
|
/*
|
||||||
|
ssize_t receivefrom(SSL *ssl, uint32_t expected)
|
||||||
|
{
|
||||||
|
// @todo implementation
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shrink the buffer (in practice this is always called with the full buffer size)
|
* Shrink the buffer (in practice this is always called with the full buffer size)
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@
|
||||||
*/
|
*/
|
||||||
#include <sys/ioctl.h>
|
#include <sys/ioctl.h>
|
||||||
#include <sys/uio.h>
|
#include <sys/uio.h>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* FIONREAD on Solaris is defined elsewhere
|
* FIONREAD on Solaris is defined elsewhere
|
||||||
*/
|
*/
|
||||||
|
|
@ -195,10 +195,36 @@ public:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fill an iovec buffer
|
||||||
|
* @param buffers the buffers to be filled
|
||||||
|
* @param count number of buffers available
|
||||||
|
* @return size_t the number of buffers that were filled
|
||||||
|
*/
|
||||||
|
size_t fill(struct iovec buffers[], size_t count) const
|
||||||
|
{
|
||||||
|
// index counter
|
||||||
|
size_t index = 0;
|
||||||
|
|
||||||
|
// iterate over the buffers
|
||||||
|
for (const auto &str : _buffers)
|
||||||
|
{
|
||||||
|
// fill buffer
|
||||||
|
buffers[index].iov_base = (void *)(index == 0 ? str.data() + _skip : str.data());
|
||||||
|
buffers[index].iov_len = index == 0 ? str.size() - _skip : str.size();
|
||||||
|
|
||||||
|
// update counter for next iteration
|
||||||
|
if (++index >= count) return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
// done
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send the buffer to a socket
|
* Send the buffer to a socket
|
||||||
* @param socket
|
* @param socket the socket to send data to
|
||||||
* @return ssize_t
|
* @return ssize_t number of bytes sent (or the same result as sendmsg() in case of an error)
|
||||||
*/
|
*/
|
||||||
ssize_t sendto(int socket)
|
ssize_t sendto(int socket)
|
||||||
{
|
{
|
||||||
|
|
@ -211,20 +237,6 @@ public:
|
||||||
// we're going to fill a lot of buffers (64 should normally be enough)
|
// we're going to fill a lot of buffers (64 should normally be enough)
|
||||||
struct iovec buffer[64];
|
struct iovec buffer[64];
|
||||||
|
|
||||||
// index counter
|
|
||||||
size_t index = 0;
|
|
||||||
|
|
||||||
// iterate over the buffers
|
|
||||||
for (const auto &str : _buffers)
|
|
||||||
{
|
|
||||||
// fill buffer
|
|
||||||
buffer[index].iov_base = (void *)(index == 0 ? str.data() + _skip : str.data());
|
|
||||||
buffer[index].iov_len = index == 0 ? str.size() - _skip : str.size();
|
|
||||||
|
|
||||||
// update counter for next iteration
|
|
||||||
if (++index >= 64) break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the message header
|
// create the message header
|
||||||
struct msghdr header;
|
struct msghdr header;
|
||||||
|
|
||||||
|
|
@ -233,7 +245,10 @@ public:
|
||||||
|
|
||||||
// save the buffers in the message header
|
// save the buffers in the message header
|
||||||
header.msg_iov = buffer;
|
header.msg_iov = buffer;
|
||||||
header.msg_iovlen = index;
|
header.msg_iovlen = fill(buffer, 64);
|
||||||
|
|
||||||
|
// do nothing if no buffers were filled
|
||||||
|
if (header.msg_iovlen == 0) break;
|
||||||
|
|
||||||
// send the data
|
// send the data
|
||||||
auto result = sendmsg(socket, &header, AMQP_CPP_MSG_NOSIGNAL);
|
auto result = sendmsg(socket, &header, AMQP_CPP_MSG_NOSIGNAL);
|
||||||
|
|
@ -251,6 +266,37 @@ public:
|
||||||
// done
|
// done
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send the buffer to an SSL connection
|
||||||
|
* @param ssl the ssl context to send data to
|
||||||
|
* @return ssize_t number of bytes sent, or the return value of ssl_write
|
||||||
|
*/
|
||||||
|
/*
|
||||||
|
ssize_t sendto(SSL *ssl)
|
||||||
|
{
|
||||||
|
// we're going to fill a lot of buffers (for ssl only one buffer at a time can be sent)
|
||||||
|
struct iovec buffer[1];
|
||||||
|
|
||||||
|
// fill the buffers, and leap out if there is no data
|
||||||
|
auto buffers = fill(buffer, 1);
|
||||||
|
|
||||||
|
std::cout << "buffercount = " << buffers << std::endl;
|
||||||
|
|
||||||
|
if (buffers == 0) return 0;
|
||||||
|
|
||||||
|
// send the data
|
||||||
|
auto result = SSL_write(ssl, buffer[0].iov_base, buffer[0].iov_len);
|
||||||
|
|
||||||
|
// @todo do we have to move to the next buffer to prevent that this buffer is further filled?
|
||||||
|
|
||||||
|
// on success we shrink the buffer
|
||||||
|
if (result > 0) shrink(result);
|
||||||
|
|
||||||
|
// done
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
*/
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@
|
||||||
#include "tcpstate.h"
|
#include "tcpstate.h"
|
||||||
#include "tcpclosed.h"
|
#include "tcpclosed.h"
|
||||||
#include "tcpconnected.h"
|
#include "tcpconnected.h"
|
||||||
#include "tcpsslhandshake.h"
|
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -195,7 +194,7 @@ public:
|
||||||
if (_socket >= 0)
|
if (_socket >= 0)
|
||||||
{
|
{
|
||||||
// if we need a secure connection, we move to the tls handshake
|
// if we need a secure connection, we move to the tls handshake
|
||||||
if (_secure) return new TcpSslHandshake(_connection, _socket, std::move(_buffer), _handler);
|
//if (_secure) return new TcpSslHandshake(_connection, _socket, std::move(_buffer), _handler);
|
||||||
|
|
||||||
// otherwise we have a valid regular tcp connection
|
// otherwise we have a valid regular tcp connection
|
||||||
return new TcpConnected(_connection, _socket, std::move(_buffer), _handler);
|
return new TcpConnected(_connection, _socket, std::move(_buffer), _handler);
|
||||||
|
|
@ -221,7 +220,7 @@ public:
|
||||||
if (_socket >= 0)
|
if (_socket >= 0)
|
||||||
{
|
{
|
||||||
// if we need a secure connection, we move to the tls handshake
|
// if we need a secure connection, we move to the tls handshake
|
||||||
if (_secure) return new TcpSslHandshake(_connection, _socket, std::move(_buffer), _handler);
|
//if (_secure) return new TcpSslHandshake(_connection, _socket, std::move(_buffer), _handler);
|
||||||
|
|
||||||
// otherwise we have a valid regular tcp connection
|
// otherwise we have a valid regular tcp connection
|
||||||
return new TcpConnected(_connection, _socket, std::move(_buffer), _handler);
|
return new TcpConnected(_connection, _socket, std::move(_buffer), _handler);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,290 @@
|
||||||
|
/**
|
||||||
|
* TcpSslConnected.h
|
||||||
|
*
|
||||||
|
* The actual tcp connection over SSL
|
||||||
|
*
|
||||||
|
* @copyright 2018 copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Include guard
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dependencies
|
||||||
|
*/
|
||||||
|
#include "tcpoutbuffer.h"
|
||||||
|
#include "tcpinbuffer.h"
|
||||||
|
#include "wait.h"
|
||||||
|
#include <openssl/ssl.h>
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up namespace
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class definition
|
||||||
|
*/
|
||||||
|
class TcpSslConnected: public TcpState, private Watchable
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* The SSL context
|
||||||
|
* @var SSL*
|
||||||
|
*/
|
||||||
|
SSL *_ssl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Socket file descriptor
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
int _socket;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The outgoing buffer
|
||||||
|
* @var TcpBuffer
|
||||||
|
*/
|
||||||
|
TcpOutBuffer _out;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The incoming buffer
|
||||||
|
* @var TcpInBuffer
|
||||||
|
*/
|
||||||
|
TcpInBuffer _in;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Are we now busy with sending or receiving?
|
||||||
|
* @var enum
|
||||||
|
*/
|
||||||
|
enum {
|
||||||
|
state_idle,
|
||||||
|
state_sending,
|
||||||
|
state_receiving
|
||||||
|
} _state;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to report an error
|
||||||
|
* @return bool Was an error reported?
|
||||||
|
*/
|
||||||
|
bool reportError()
|
||||||
|
{
|
||||||
|
// some errors are ok and do not (necessarily) mean that we're disconnected
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return false;
|
||||||
|
|
||||||
|
// we have an error - report this to the user
|
||||||
|
_handler->onError(_connection, strerror(errno));
|
||||||
|
|
||||||
|
// done
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct the next state
|
||||||
|
* @param monitor Object that monitors whether connection still exists
|
||||||
|
* @return TcpState*
|
||||||
|
*/
|
||||||
|
TcpState *nextState(const Monitor &monitor)
|
||||||
|
{
|
||||||
|
// if the object is still in a valid state, we can move to the close-state,
|
||||||
|
// otherwise there is no point in moving to a next state
|
||||||
|
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Proceed with the previous operation, possibly changing the monitor
|
||||||
|
* @return TcpState*
|
||||||
|
*/
|
||||||
|
TcpState *proceed()
|
||||||
|
{
|
||||||
|
// if we still have an outgoing buffer we want to send out data
|
||||||
|
if (_out)
|
||||||
|
{
|
||||||
|
// we still have a buffer with outgoing data
|
||||||
|
_state = state_sending;
|
||||||
|
|
||||||
|
// let's wait until the socket becomes writable
|
||||||
|
_handler->monitor(_connection, _socket, readable);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// outgoing buffer is empty, we're idle again waiting for further input
|
||||||
|
_state = state_idle;
|
||||||
|
|
||||||
|
// let's wait until the socket becomes readable
|
||||||
|
_handler->monitor(_connection, _socket, readable);
|
||||||
|
}
|
||||||
|
|
||||||
|
// done
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to repeat the previous call
|
||||||
|
* @param result result of an earlier openssl operation
|
||||||
|
* @return TcpState*
|
||||||
|
*/
|
||||||
|
TcpState *repeat(int result)
|
||||||
|
{
|
||||||
|
// error was returned, so we must investigate what is going on
|
||||||
|
auto error = SSL_get_error(_ssl, result);
|
||||||
|
|
||||||
|
std::cout << "error = " << error << std::endl;
|
||||||
|
|
||||||
|
// check the error
|
||||||
|
switch (error) {
|
||||||
|
case SSL_ERROR_WANT_READ:
|
||||||
|
// the operation must be repeated when readable
|
||||||
|
std::cout << "want read" << std::endl;
|
||||||
|
|
||||||
|
_handler->monitor(_connection, _socket, readable);
|
||||||
|
return this;
|
||||||
|
|
||||||
|
case SSL_ERROR_WANT_WRITE:
|
||||||
|
// wait until socket becomes writable again
|
||||||
|
std::cout << "want write" << std::endl;
|
||||||
|
|
||||||
|
_handler->monitor(_connection, _socket, writable);
|
||||||
|
return this;
|
||||||
|
|
||||||
|
default:
|
||||||
|
std::cout << "something else" << std::endl;
|
||||||
|
|
||||||
|
// @todo check how to handle this
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param connection Parent TCP connection object
|
||||||
|
* @param socket The socket filedescriptor
|
||||||
|
* @param ssl The SSL structure
|
||||||
|
* @param buffer The buffer that was already built
|
||||||
|
* @param handler User-supplied handler object
|
||||||
|
*/
|
||||||
|
TcpSslConnected(TcpConnection *connection, int socket, SSL *ssl, TcpOutBuffer &&buffer, TcpHandler *handler) :
|
||||||
|
TcpState(connection, handler),
|
||||||
|
_ssl(ssl),
|
||||||
|
_socket(socket),
|
||||||
|
_out(std::move(buffer)),
|
||||||
|
_in(4096),
|
||||||
|
_state(_out ? state_sending : state_idle)
|
||||||
|
{
|
||||||
|
std::cout << "ssl-connected" << std::endl;
|
||||||
|
|
||||||
|
// tell the handler to monitor the socket if there is an out
|
||||||
|
_handler->monitor(_connection, _socket, _state == state_sending ? writable : readable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destructor
|
||||||
|
*/
|
||||||
|
virtual ~TcpSslConnected() noexcept
|
||||||
|
{
|
||||||
|
// skip if handler is already forgotten
|
||||||
|
if (_handler == nullptr) return;
|
||||||
|
|
||||||
|
// we no longer have to monitor the socket
|
||||||
|
_handler->monitor(_connection, _socket, 0);
|
||||||
|
|
||||||
|
// close the socket
|
||||||
|
close(_socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The filedescriptor of this connection
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
virtual int fileno() const override { return _socket; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process the filedescriptor in the object
|
||||||
|
* @param fd The filedescriptor that is active
|
||||||
|
* @param flags AMQP::readable and/or AMQP::writable
|
||||||
|
* @return New implementation object
|
||||||
|
*/
|
||||||
|
virtual TcpState *process(int fd, int flags)
|
||||||
|
{
|
||||||
|
std::cout << "process call in ssl-connected" << std::endl;
|
||||||
|
|
||||||
|
std::cout << fd << " - " << _socket << std::endl;
|
||||||
|
|
||||||
|
|
||||||
|
// the socket must be the one this connection writes to
|
||||||
|
if (fd != _socket) return this;
|
||||||
|
|
||||||
|
// because the object might soon be destructed, we create a monitor to check this
|
||||||
|
Monitor monitor(this);
|
||||||
|
|
||||||
|
// are we busy with sending or receiving data?
|
||||||
|
if (_state == state_sending)
|
||||||
|
{
|
||||||
|
std::cout << "busy sending" << std::endl;
|
||||||
|
|
||||||
|
// try to send more data from the outgoing buffer
|
||||||
|
auto result = _out.sendto(_ssl);
|
||||||
|
|
||||||
|
std::cout << "result = " << result << std::endl;
|
||||||
|
|
||||||
|
// if this is a success, we may have to update the monitor
|
||||||
|
if (result > 0) return proceed();
|
||||||
|
|
||||||
|
// the operation failed, we may have to repeat our call
|
||||||
|
else return repeat(result);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// read data from ssl into the buffer
|
||||||
|
auto result = _in.receivefrom(_ssl, _connection->expected());
|
||||||
|
|
||||||
|
// if this is a success, we may have to update the monitor
|
||||||
|
// @todo also parse the buffer
|
||||||
|
if (result > 0) return proceed();
|
||||||
|
|
||||||
|
// the operation failed, we may have to repeat our call
|
||||||
|
else return repeat(result);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// we're busy with receiving data
|
||||||
|
// @todo check this
|
||||||
|
|
||||||
|
std::cout << "receive data" << std::endl;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// keep same object
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send data over the connection
|
||||||
|
* @param buffer buffer to send
|
||||||
|
* @param size size of the buffer
|
||||||
|
*/
|
||||||
|
virtual void send(const char *buffer, size_t size)
|
||||||
|
{
|
||||||
|
// put the data in the outgoing buffer
|
||||||
|
_out.add(buffer, size);
|
||||||
|
|
||||||
|
// if we're already busy with sending or receiving, we first have to wait
|
||||||
|
// for that operation to complete before we can move on
|
||||||
|
if (_state != state_idle) return;
|
||||||
|
|
||||||
|
// let's wait until the socket becomes writable
|
||||||
|
_handler->monitor(_connection, _socket, writable);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespace
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -16,10 +16,12 @@
|
||||||
* Dependencies
|
* Dependencies
|
||||||
*/
|
*/
|
||||||
#include "tcpoutbuffer.h"
|
#include "tcpoutbuffer.h"
|
||||||
|
#include "tcpsslconnected.h"
|
||||||
|
#include "wait.h"
|
||||||
|
|
||||||
|
#include <openssl/err.h>
|
||||||
#include <openssl/ssl.h>
|
#include <openssl/ssl.h>
|
||||||
#include <copernica/dynamic.h>
|
#include <copernica/dynamic.h>
|
||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -33,7 +35,12 @@ namespace AMQP {
|
||||||
class TcpSslHandshake : public TcpState, private Watchable
|
class TcpSslHandshake : public TcpState, private Watchable
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
|
/**
|
||||||
|
* SSL context
|
||||||
|
* @var SSL_CTX
|
||||||
|
*/
|
||||||
|
SSL_CTX *ctx;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* SSL structure
|
* SSL structure
|
||||||
* @var SSL
|
* @var SSL
|
||||||
|
|
@ -53,7 +60,6 @@ private:
|
||||||
TcpOutBuffer _out;
|
TcpOutBuffer _out;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method to report an error
|
* Helper method to report an error
|
||||||
* @return TcpState*
|
* @return TcpState*
|
||||||
|
|
@ -67,7 +73,7 @@ private:
|
||||||
_handler->onError(_connection, "failed to setup ssl connection");
|
_handler->onError(_connection, "failed to setup ssl connection");
|
||||||
|
|
||||||
// close the socket
|
// close the socket
|
||||||
close(_socket);
|
close(_socket);
|
||||||
|
|
||||||
// done, go to the closed state
|
// done, go to the closed state
|
||||||
return new TcpClosed(_connection, _handler);
|
return new TcpClosed(_connection, _handler);
|
||||||
|
|
@ -85,30 +91,6 @@ private:
|
||||||
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
return monitor.valid() ? new TcpClosed(this) : nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait until the socket is writable
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
bool wait4writable()
|
|
||||||
{
|
|
||||||
// we need the fd-sets
|
|
||||||
fd_set readables, writables, exceptions;
|
|
||||||
|
|
||||||
// initialize all the sets
|
|
||||||
FD_ZERO(&readables);
|
|
||||||
FD_ZERO(&writables);
|
|
||||||
FD_ZERO(&exceptions);
|
|
||||||
|
|
||||||
// add the one socket
|
|
||||||
FD_SET(_socket, &writables);
|
|
||||||
|
|
||||||
// wait for the socket
|
|
||||||
auto result = select(_socket + 1, &readables, &writables, &exceptions, nullptr);
|
|
||||||
|
|
||||||
// check for success
|
|
||||||
return result == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
|
|
@ -126,22 +108,29 @@ public:
|
||||||
TcpState(connection, handler),
|
TcpState(connection, handler),
|
||||||
_socket(socket),
|
_socket(socket),
|
||||||
_out(std::move(buffer))
|
_out(std::move(buffer))
|
||||||
{
|
{
|
||||||
|
// init the SSL library
|
||||||
SSL_library_init();
|
SSL_library_init();
|
||||||
|
|
||||||
|
// create ssl context
|
||||||
|
ctx = SSL_CTX_new(TLS_client_method());
|
||||||
|
|
||||||
// create ssl object
|
// create ssl object
|
||||||
_ssl = SSL_new(SSL_CTX_new(TLS_client_method()));
|
_ssl = SSL_new(ctx);
|
||||||
|
|
||||||
// leap out on error
|
// leap out on error
|
||||||
if (_ssl == nullptr) throw std::runtime_error("ERROR: SSL structure is null");
|
if (_ssl == nullptr) throw std::runtime_error("ERROR: SSL structure is null");
|
||||||
|
|
||||||
// we will be using the ssl context as a client
|
// we will be using the ssl context as a client
|
||||||
// @todo check return value
|
|
||||||
SSL_set_connect_state(_ssl);
|
SSL_set_connect_state(_ssl);
|
||||||
|
|
||||||
|
|
||||||
// associate the ssl context with the socket filedescriptor
|
// associate the ssl context with the socket filedescriptor
|
||||||
// @todo check return value
|
int set_fd_ret = SSL_set_fd(_ssl, socket);
|
||||||
SSL_set_fd(_ssl, socket);
|
if (set_fd_ret == 0) {
|
||||||
|
reportError();
|
||||||
|
std::cout << "error while setting file descriptor" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
// we are going to wait until the socket becomes writable before we start the handshake
|
// we are going to wait until the socket becomes writable before we start the handshake
|
||||||
_handler->monitor(_connection, _socket, writable);
|
_handler->monitor(_connection, _socket, writable);
|
||||||
|
|
@ -153,7 +142,8 @@ public:
|
||||||
virtual ~TcpSslHandshake() noexcept
|
virtual ~TcpSslHandshake() noexcept
|
||||||
{
|
{
|
||||||
// close the socket
|
// close the socket
|
||||||
close(_socket);
|
// @todo only if really closed
|
||||||
|
//close(_socket);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -175,38 +165,57 @@ public:
|
||||||
|
|
||||||
// start the ssl handshake
|
// start the ssl handshake
|
||||||
int result = SSL_do_handshake(_ssl);
|
int result = SSL_do_handshake(_ssl);
|
||||||
|
|
||||||
// if the connection succeeds, we can move to the ssl-connected state
|
// if the connection succeeds, we can move to the ssl-connected state
|
||||||
// @todo we need the sslconnected state object
|
if (result == 1) return new TcpSslConnected(_connection, _socket, _ssl, std::move(_out), _handler);
|
||||||
if (result == 1) return this; // new TcpSslConnected(connection, socket, _ssl, std::move(_out), _handler);
|
|
||||||
|
|
||||||
// if there is a failure, we must close down the connection
|
// if there is a failure, we must close down the connection
|
||||||
if (result == 0) return reportError();
|
if (result <= 0)
|
||||||
|
{
|
||||||
// -1 was returned, so we must investigate what is going on
|
// error was returned, so we must investigate what is going on
|
||||||
auto error = SSL_get_error(_ssl, result);
|
auto error = SSL_get_error(_ssl, result);
|
||||||
|
|
||||||
|
// check the error
|
||||||
|
switch (error) {
|
||||||
|
case SSL_ERROR_WANT_READ:
|
||||||
|
// the handshake must be repeated when socket is readable, wait for that
|
||||||
|
std::cout << "wait for readability" << std::endl;
|
||||||
|
_handler->monitor(_connection, _socket, readable);
|
||||||
|
break;
|
||||||
|
|
||||||
// check the error
|
case SSL_ERROR_WANT_WRITE:
|
||||||
switch (error) {
|
// the handshake must be repeated when socket is readable, wait for that
|
||||||
case SSL_ERROR_WANT_READ:
|
std::cout << "wait for writability" << std::endl;
|
||||||
// the handshake must be repeated when socket is readable, wait for that
|
_handler->monitor(_connection, _socket, writable);
|
||||||
std::cout << "wait for readability" << std::endl;
|
break;
|
||||||
_handler->monitor(_connection, _socket, readable);
|
|
||||||
break;
|
case SSL_ERROR_WANT_ACCEPT:
|
||||||
|
// the BIO was not connected yet, the SSL function should be called again
|
||||||
case SSL_ERROR_WANT_WRITE:
|
std::cout << "wait for acceptability" << ERR_error_string(ERR_get_error(), nullptr) << std::endl;
|
||||||
// the handshake must be repeated when socket is readable, wait for that
|
_handler->monitor(_connection, _socket, writable);
|
||||||
std::cout << "wait for writability" << std::endl;
|
|
||||||
_handler->monitor(_connection, _socket, writable);
|
break;
|
||||||
break;
|
case SSL_ERROR_WANT_X509_LOOKUP:
|
||||||
|
std::cout << "SSL_ERROR_WANT_X509_LOOKUP" << ERR_error_string(ERR_get_error(), nullptr) << std::endl;
|
||||||
default:
|
_handler->monitor(_connection, _socket, writable);
|
||||||
// @todo implement handling other error states
|
|
||||||
std::cout << "unknown error state " << error << std::endl;
|
break;
|
||||||
// @todo we have to close the connection
|
case SSL_ERROR_SYSCALL:
|
||||||
return reportError();
|
std::cout << "SSL_ERROR_SYSCALL: " << ERR_error_string(ERR_get_error(), nullptr) << std::endl;
|
||||||
|
_handler->monitor(_connection, _socket, writable);
|
||||||
|
|
||||||
|
break;
|
||||||
|
case SSL_ERROR_SSL:
|
||||||
|
std::cout << "SSL_ERROR_SSL" << ERR_error_string(ERR_get_error(), nullptr) << std::endl;
|
||||||
|
_handler->monitor(_connection, _socket, writable);
|
||||||
|
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
std::cout << "unknown error state " << error << std::endl;
|
||||||
|
return reportError();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// keep same object
|
// keep same object
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
@ -218,9 +227,8 @@ public:
|
||||||
*/
|
*/
|
||||||
virtual void send(const char *buffer, size_t size) override
|
virtual void send(const char *buffer, size_t size) override
|
||||||
{
|
{
|
||||||
|
// the handshake is still busy, outgoing data must be cached
|
||||||
// @todo because the handshake is still busy, outgoing data must be cached
|
_out.add(buffer, size);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -229,37 +237,48 @@ public:
|
||||||
*/
|
*/
|
||||||
virtual TcpState *flush() override
|
virtual TcpState *flush() override
|
||||||
{
|
{
|
||||||
// @todo implementation?
|
// create an object to wait for the filedescriptor to becomes active
|
||||||
return nullptr;
|
Wait wait(_socket);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Report that heartbeat negotiation is going on
|
|
||||||
* @param heartbeat suggested heartbeat
|
|
||||||
* @return uint16_t accepted heartbeat
|
|
||||||
*/
|
|
||||||
virtual uint16_t reportNegotiate(uint16_t heartbeat) override
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* @todo what should we do here?
|
|
||||||
|
|
||||||
// remember that we have to reallocated (_in member can not be accessed because it is moved away)
|
// keep looping
|
||||||
_reallocate = _connection->maxFrame();
|
while (true)
|
||||||
|
{
|
||||||
// pass to base
|
// start the ssl handshake
|
||||||
return TcpState::reportNegotiate(heartbeat);
|
int result = SSL_do_handshake(_ssl);
|
||||||
*/
|
|
||||||
|
// if the connection succeeds, we can move to the ssl-connected state
|
||||||
return 0;
|
if (result == 1) return new TcpSslConnected(_connection, _socket, _ssl, std::move(_out), _handler);
|
||||||
}
|
|
||||||
|
// error was returned, so we must investigate what is going on
|
||||||
|
auto error = SSL_get_error(_ssl, result);
|
||||||
|
|
||||||
|
// check the error
|
||||||
|
switch (error) {
|
||||||
|
case SSL_ERROR_WANT_READ:
|
||||||
|
// wait for the socket to become readable
|
||||||
|
if (!wait.readable()) return reportError();
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SSL_ERROR_WANT_WRITE:
|
||||||
|
// wait for the socket to become writable
|
||||||
|
if (!wait.writable()) return reportError();
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
// report an error
|
||||||
|
return reportError();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// keep same object (we never reach this code)
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report to the handler that the connection was nicely closed
|
* Report to the handler that the connection was nicely closed
|
||||||
*/
|
*/
|
||||||
virtual void reportClosed() override
|
virtual void reportClosed() override
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
|
|
||||||
// we no longer have to monitor the socket
|
// we no longer have to monitor the socket
|
||||||
_handler->monitor(_connection, _socket, 0);
|
_handler->monitor(_connection, _socket, 0);
|
||||||
|
|
||||||
|
|
@ -277,7 +296,6 @@ public:
|
||||||
|
|
||||||
// notify to handler
|
// notify to handler
|
||||||
handler->onClosed(_connection);
|
handler->onClosed(_connection);
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,93 @@
|
||||||
|
/**
|
||||||
|
* Wait.h
|
||||||
|
*
|
||||||
|
* Class to wait for a socket to become readable and/or writable
|
||||||
|
*
|
||||||
|
* @copyright 2018 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Include guard
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of namespace
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class definition
|
||||||
|
*/
|
||||||
|
class Wait
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* Set with just one filedescriptor
|
||||||
|
* @var fd_set
|
||||||
|
*/
|
||||||
|
fd_set _set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current socket // @todo what is it exactly?
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
int _socket;
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param fd the filedescriptor that we're waiting on
|
||||||
|
*/
|
||||||
|
Wait(int fd)
|
||||||
|
{
|
||||||
|
_socket = fd;
|
||||||
|
|
||||||
|
// initialize the set
|
||||||
|
FD_ZERO(&_set);
|
||||||
|
|
||||||
|
// add the one socket
|
||||||
|
FD_SET(_socket, &_set);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destructor
|
||||||
|
*/
|
||||||
|
virtual ~Wait() = default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until the filedescriptor becomes readable
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool readable()
|
||||||
|
{
|
||||||
|
// wait for the socket
|
||||||
|
return select(_socket + 1, &_set, nullptr, nullptr, nullptr) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until the filedescriptor becomes writable
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool writable()
|
||||||
|
{
|
||||||
|
// wait for the socket
|
||||||
|
return select(_socket + 1, nullptr, &_set, nullptr, nullptr) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until a filedescriptor becomes active (readable or writable)
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool active()
|
||||||
|
{
|
||||||
|
// wait for the socket
|
||||||
|
return select(_socket + 1, &_set, &_set, nullptr, nullptr) > 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespace
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
Loading…
Reference in New Issue