fixed ssl handling: the sslconnected class incorrectly cached the readability/writability state of a socket, even after it already did a read- or write-operation, which could lead to the system getting to read mode when no data was expected
This commit is contained in:
parent
9aafe15620
commit
80ce6327bb
|
|
@ -0,0 +1,127 @@
|
||||||
|
/**
|
||||||
|
* Poll.h
|
||||||
|
*
|
||||||
|
* Class to check or wait for a socket to become readable and/or writable
|
||||||
|
*
|
||||||
|
* @copyright 2018 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Include guard
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin of namespace
|
||||||
|
*/
|
||||||
|
namespace AMQP {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class definition
|
||||||
|
*/
|
||||||
|
class Poll
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* Set with just one filedescriptor
|
||||||
|
* @var fd_set
|
||||||
|
*/
|
||||||
|
fd_set _set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current socket // @todo what is it exactly?
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
int _socket;
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param fd the filedescriptor that we're waiting on
|
||||||
|
*/
|
||||||
|
Poll(int fd) : _socket(fd)
|
||||||
|
{
|
||||||
|
// initialize the set
|
||||||
|
FD_ZERO(&_set);
|
||||||
|
|
||||||
|
// add the one socket
|
||||||
|
FD_SET(_socket, &_set);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* No copying
|
||||||
|
* @param that
|
||||||
|
*/
|
||||||
|
Poll(const Poll &that) = delete;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destructor
|
||||||
|
*/
|
||||||
|
virtual ~Poll() = default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until the filedescriptor becomes readable
|
||||||
|
* @param block block until readable
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool readable(bool block)
|
||||||
|
{
|
||||||
|
// wait for the socket
|
||||||
|
if (block) return select(_socket + 1, &_set, nullptr, nullptr, nullptr) > 0;
|
||||||
|
|
||||||
|
// we do not want to block, so we use a small timeout
|
||||||
|
struct timeval timeout;
|
||||||
|
|
||||||
|
// no timeout at all
|
||||||
|
timeout.tv_sec = timeout.tv_usec = 0;
|
||||||
|
|
||||||
|
// no timeout at all
|
||||||
|
return select(_socket + 1, &_set, nullptr, nullptr, &timeout) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until the filedescriptor becomes writable
|
||||||
|
* @param block block until readable
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool writable(bool block)
|
||||||
|
{
|
||||||
|
// wait for the socket
|
||||||
|
if (block) return select(_socket + 1, nullptr, &_set, nullptr, nullptr) > 0;
|
||||||
|
|
||||||
|
// we do not want to block, so we use a small timeout
|
||||||
|
struct timeval timeout;
|
||||||
|
|
||||||
|
// no timeout at all
|
||||||
|
timeout.tv_sec = timeout.tv_usec = 0;
|
||||||
|
|
||||||
|
// no timeout at all
|
||||||
|
return select(_socket + 1, nullptr, &_set, nullptr, &timeout) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until a filedescriptor becomes active (readable or writable)
|
||||||
|
* @param block block until readable
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool active(bool block)
|
||||||
|
{
|
||||||
|
// wait for the socket
|
||||||
|
if (block) return select(_socket + 1, &_set, &_set, nullptr, nullptr) > 0;
|
||||||
|
|
||||||
|
// we do not want to block, so we use a small timeout
|
||||||
|
struct timeval timeout;
|
||||||
|
|
||||||
|
// no timeout at all
|
||||||
|
timeout.tv_sec = timeout.tv_usec = 0;
|
||||||
|
|
||||||
|
// no timeout at all
|
||||||
|
return select(_socket + 1, &_set, &_set, nullptr, &timeout) > 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* End of namespace
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
#include "tcpoutbuffer.h"
|
#include "tcpoutbuffer.h"
|
||||||
#include "tcpinbuffer.h"
|
#include "tcpinbuffer.h"
|
||||||
#include "wait.h"
|
#include "poll.h"
|
||||||
#include "sslwrapper.h"
|
#include "sslwrapper.h"
|
||||||
#include "sslshutdown.h"
|
#include "sslshutdown.h"
|
||||||
|
|
||||||
|
|
@ -248,13 +248,38 @@ private:
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the socket is readable
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool isReadable() const
|
||||||
|
{
|
||||||
|
// object to poll a socket
|
||||||
|
Poll poll(_socket);
|
||||||
|
|
||||||
|
// wait until socket is readable, but do not block
|
||||||
|
return poll.readable(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the socket is writable
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool isWritable() const
|
||||||
|
{
|
||||||
|
// object to poll a socket
|
||||||
|
Poll poll(_socket);
|
||||||
|
|
||||||
|
// wait until socket is writable, but do not block
|
||||||
|
return poll.writable(false);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform a write operation
|
* Perform a write operation
|
||||||
* @param monitor object to check the existance of the connection object
|
* @param monitor object to check the existance of the connection object
|
||||||
* @param readable is the connection also readable and should we call a read operation afterwards?
|
|
||||||
* @return TcpState*
|
* @return TcpState*
|
||||||
*/
|
*/
|
||||||
TcpState *write(const Monitor &monitor, bool readable)
|
TcpState *write(const Monitor &monitor)
|
||||||
{
|
{
|
||||||
// assume default state
|
// assume default state
|
||||||
_state = state_idle;
|
_state = state_idle;
|
||||||
|
|
@ -280,19 +305,18 @@ private:
|
||||||
// the operation failed, we may have to repeat our call
|
// the operation failed, we may have to repeat our call
|
||||||
return repeat(monitor, state_sending, error);
|
return repeat(monitor, state_sending, error);
|
||||||
}
|
}
|
||||||
while (_out && !readable);
|
while (_out && !isReadable());
|
||||||
|
|
||||||
// proceed with the read operation or the event loop
|
// proceed with the read operation or the event loop
|
||||||
return readable ? receive(monitor, false) : proceed();
|
return isReadable() ? receive(monitor) : proceed();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform a receive operation
|
* Perform a receive operation
|
||||||
* @param monitor object to check the existance of the connection object
|
* @param monitor object to check the existance of the connection object
|
||||||
* @param writable is the socket writable, and should we start a write operation after this operation?
|
|
||||||
* @return TcpState
|
* @return TcpState
|
||||||
*/
|
*/
|
||||||
TcpState *receive(const Monitor &monitor, bool writable)
|
TcpState *receive(const Monitor &monitor)
|
||||||
{
|
{
|
||||||
// we are going to check for errors after the openssl operations, so we make
|
// we are going to check for errors after the openssl operations, so we make
|
||||||
// sure that the error queue is currently completely empty
|
// sure that the error queue is currently completely empty
|
||||||
|
|
@ -319,7 +343,7 @@ private:
|
||||||
while (OpenSSL::SSL_pending(_ssl) > 0);
|
while (OpenSSL::SSL_pending(_ssl) > 0);
|
||||||
|
|
||||||
// proceed with the write operation or the event loop
|
// proceed with the write operation or the event loop
|
||||||
return writable && _out ? write(monitor, false) : proceed();
|
return _out && isWritable() ? write(monitor) : proceed();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -378,16 +402,16 @@ public:
|
||||||
if (fd != _socket) return this;
|
if (fd != _socket) return this;
|
||||||
|
|
||||||
// if we were busy with a write operation, we have to repeat that
|
// if we were busy with a write operation, we have to repeat that
|
||||||
if (_state == state_sending) return write(monitor, flags & readable);
|
if (_state == state_sending) return write(monitor);
|
||||||
|
|
||||||
// same is true for read operations, they should also be repeated
|
// same is true for read operations, they should also be repeated
|
||||||
if (_state == state_receiving) return receive(monitor, flags & writable);
|
if (_state == state_receiving) return receive(monitor);
|
||||||
|
|
||||||
// if the socket is readable, we are going to receive data
|
// if the socket is readable, we are going to receive data
|
||||||
if (flags & readable) return receive(monitor, flags & writable);
|
if (flags & readable) return receive(monitor);
|
||||||
|
|
||||||
// socket is not readable (so it must be writable), do we have data to write?
|
// socket is not readable (so it must be writable), do we have data to write?
|
||||||
if (_out) return write(monitor, false);
|
if (_out) return write(monitor);
|
||||||
|
|
||||||
// the only scenario in which we can end up here is the socket should be
|
// the only scenario in which we can end up here is the socket should be
|
||||||
// closed, but instead of moving to the shutdown-state right, we call proceed()
|
// closed, but instead of moving to the shutdown-state right, we call proceed()
|
||||||
|
|
@ -406,7 +430,7 @@ public:
|
||||||
if (_state == state_receiving) return this;
|
if (_state == state_receiving) return this;
|
||||||
|
|
||||||
// create an object to wait for the filedescriptor to becomes active
|
// create an object to wait for the filedescriptor to becomes active
|
||||||
Wait wait(_socket);
|
Poll poll(_socket);
|
||||||
|
|
||||||
// we are going to check for errors after the openssl operations, so we make
|
// we are going to check for errors after the openssl operations, so we make
|
||||||
// sure that the error queue is currently completely empty
|
// sure that the error queue is currently completely empty
|
||||||
|
|
@ -443,8 +467,8 @@ public:
|
||||||
|
|
||||||
// check the type of error, and wait now
|
// check the type of error, and wait now
|
||||||
switch (error) {
|
switch (error) {
|
||||||
case SSL_ERROR_WANT_READ: wait.readable(); break;
|
case SSL_ERROR_WANT_READ: poll.readable(true); break;
|
||||||
case SSL_ERROR_WANT_WRITE: wait.active(); break;
|
case SSL_ERROR_WANT_WRITE: poll.active(true); break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@
|
||||||
*/
|
*/
|
||||||
#include "tcpoutbuffer.h"
|
#include "tcpoutbuffer.h"
|
||||||
#include "sslconnected.h"
|
#include "sslconnected.h"
|
||||||
#include "wait.h"
|
#include "poll.h"
|
||||||
#include "sslwrapper.h"
|
#include "sslwrapper.h"
|
||||||
#include "sslcontext.h"
|
#include "sslcontext.h"
|
||||||
|
|
||||||
|
|
@ -230,7 +230,7 @@ public:
|
||||||
virtual TcpState *flush(const Monitor &monitor) override
|
virtual TcpState *flush(const Monitor &monitor) override
|
||||||
{
|
{
|
||||||
// create an object to wait for the filedescriptor to becomes active
|
// create an object to wait for the filedescriptor to becomes active
|
||||||
Wait wait(_socket);
|
Poll poll(_socket);
|
||||||
|
|
||||||
// keep looping
|
// keep looping
|
||||||
while (true)
|
while (true)
|
||||||
|
|
@ -249,8 +249,8 @@ public:
|
||||||
|
|
||||||
// if openssl reports that socket readability or writability is needed,
|
// if openssl reports that socket readability or writability is needed,
|
||||||
// we wait for that until this situation is reached
|
// we wait for that until this situation is reached
|
||||||
case SSL_ERROR_WANT_READ: wait.readable(); break;
|
case SSL_ERROR_WANT_READ: poll.readable(true); break;
|
||||||
case SSL_ERROR_WANT_WRITE: wait.active(); break;
|
case SSL_ERROR_WANT_WRITE: poll.active(true); break;
|
||||||
|
|
||||||
// something is wrong, we proceed to the next state
|
// something is wrong, we proceed to the next state
|
||||||
default: return reportError(monitor);
|
default: return reportError(monitor);
|
||||||
|
|
|
||||||
|
|
@ -212,7 +212,7 @@ public:
|
||||||
virtual TcpState *flush(const Monitor &monitor) override
|
virtual TcpState *flush(const Monitor &monitor) override
|
||||||
{
|
{
|
||||||
// create an object to wait for the filedescriptor to becomes active
|
// create an object to wait for the filedescriptor to becomes active
|
||||||
Wait wait(_socket);
|
Poll poll(_socket);
|
||||||
|
|
||||||
// keep looping
|
// keep looping
|
||||||
while (true)
|
while (true)
|
||||||
|
|
@ -234,8 +234,8 @@ public:
|
||||||
|
|
||||||
// if openssl reports that socket readability or writability is needed,
|
// if openssl reports that socket readability or writability is needed,
|
||||||
// we wait for that until this situation is reached
|
// we wait for that until this situation is reached
|
||||||
case SSL_ERROR_WANT_READ: wait.readable(); break;
|
case SSL_ERROR_WANT_READ: poll.readable(true); break;
|
||||||
case SSL_ERROR_WANT_WRITE: wait.active(); break;
|
case SSL_ERROR_WANT_WRITE: poll.active(true); break;
|
||||||
|
|
||||||
// something is wrong, we proceed to the next state
|
// something is wrong, we proceed to the next state
|
||||||
default: return reporterror(monitor);
|
default: return reporterror(monitor);
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@
|
||||||
*/
|
*/
|
||||||
#include "tcpoutbuffer.h"
|
#include "tcpoutbuffer.h"
|
||||||
#include "tcpinbuffer.h"
|
#include "tcpinbuffer.h"
|
||||||
#include "wait.h"
|
#include "poll.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up namespace
|
* Set up namespace
|
||||||
|
|
@ -260,13 +260,13 @@ public:
|
||||||
virtual TcpState *flush(const Monitor &monitor) override
|
virtual TcpState *flush(const Monitor &monitor) override
|
||||||
{
|
{
|
||||||
// create an object to wait for the filedescriptor to becomes active
|
// create an object to wait for the filedescriptor to becomes active
|
||||||
Wait wait(_socket);
|
Poll poll(_socket);
|
||||||
|
|
||||||
// keep running until the out buffer is not empty
|
// keep running until the out buffer is not empty
|
||||||
while (_out)
|
while (_out)
|
||||||
{
|
{
|
||||||
// poll the socket, is it already writable?
|
// poll the socket, is it already writable?
|
||||||
if (!wait.writable()) return this;
|
if (!poll.writable(true)) return this;
|
||||||
|
|
||||||
// socket is writable, send as much data as possible
|
// socket is writable, send as much data as possible
|
||||||
auto *newstate = process(monitor, _socket, writable);
|
auto *newstate = process(monitor, _socket, writable);
|
||||||
|
|
|
||||||
|
|
@ -1,97 +0,0 @@
|
||||||
/**
|
|
||||||
* Wait.h
|
|
||||||
*
|
|
||||||
* Class to wait for a socket to become readable and/or writable
|
|
||||||
*
|
|
||||||
* @copyright 2018 Copernica BV
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Include guard
|
|
||||||
*/
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Begin of namespace
|
|
||||||
*/
|
|
||||||
namespace AMQP {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Class definition
|
|
||||||
*/
|
|
||||||
class Wait
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
/**
|
|
||||||
* Set with just one filedescriptor
|
|
||||||
* @var fd_set
|
|
||||||
*/
|
|
||||||
fd_set _set;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The current socket // @todo what is it exactly?
|
|
||||||
* @var int
|
|
||||||
*/
|
|
||||||
int _socket;
|
|
||||||
|
|
||||||
public:
|
|
||||||
/**
|
|
||||||
* Constructor
|
|
||||||
* @param fd the filedescriptor that we're waiting on
|
|
||||||
*/
|
|
||||||
Wait(int fd) : _socket(fd)
|
|
||||||
{
|
|
||||||
// initialize the set
|
|
||||||
FD_ZERO(&_set);
|
|
||||||
|
|
||||||
// add the one socket
|
|
||||||
FD_SET(_socket, &_set);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* No copying
|
|
||||||
* @param that
|
|
||||||
*/
|
|
||||||
Wait(const Wait &that) = delete;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Destructor
|
|
||||||
*/
|
|
||||||
virtual ~Wait() = default;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait until the filedescriptor becomes readable
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
bool readable()
|
|
||||||
{
|
|
||||||
// wait for the socket
|
|
||||||
return select(_socket + 1, &_set, nullptr, nullptr, nullptr) > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait until the filedescriptor becomes writable
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
bool writable()
|
|
||||||
{
|
|
||||||
// wait for the socket
|
|
||||||
return select(_socket + 1, nullptr, &_set, nullptr, nullptr) > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait until a filedescriptor becomes active (readable or writable)
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
bool active()
|
|
||||||
{
|
|
||||||
// wait for the socket
|
|
||||||
return select(_socket + 1, &_set, &_set, nullptr, nullptr) > 0;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* End of namespace
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
||||||
Loading…
Reference in New Issue