simplified and improved the close procedure
This commit is contained in:
parent
64c876e65a
commit
428219ad83
|
|
@ -52,14 +52,6 @@ private:
|
||||||
Connection _connection;
|
Connection _connection;
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Assign a new state
|
|
||||||
* @param monitor
|
|
||||||
* @param state
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
bool assign(const Monitor &monitor, TcpState *state);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method that is called when the heartbeat frequency is negotiated.
|
* Method that is called when the heartbeat frequency is negotiated.
|
||||||
* @param connection The connection that suggested a heartbeat interval
|
* @param connection The connection that suggested a heartbeat interval
|
||||||
|
|
|
||||||
|
|
@ -119,7 +119,7 @@ private:
|
||||||
_parent->onIdle(this, _socket, readable);
|
_parent->onIdle(this, _socket, readable);
|
||||||
|
|
||||||
// allow chaining
|
// allow chaining
|
||||||
return monitor.valid() ? this : nullptr;
|
return this;
|
||||||
|
|
||||||
case SSL_ERROR_WANT_WRITE:
|
case SSL_ERROR_WANT_WRITE:
|
||||||
// remember state
|
// remember state
|
||||||
|
|
@ -129,17 +129,14 @@ private:
|
||||||
_parent->onIdle(this, _socket, readable | writable);
|
_parent->onIdle(this, _socket, readable | writable);
|
||||||
|
|
||||||
// allow chaining
|
// allow chaining
|
||||||
return monitor.valid() ? this : nullptr;
|
return this;
|
||||||
|
|
||||||
case SSL_ERROR_NONE:
|
case SSL_ERROR_NONE:
|
||||||
// we're ready for the next instruction from userspace
|
// we're ready for the next instruction from userspace
|
||||||
_state = state_idle;
|
_state = state_idle;
|
||||||
|
|
||||||
// turns out no error occured, an no action has to be rescheduled
|
// if already closed, proceed to next state
|
||||||
_parent->onIdle(this, _socket, _out || _closed ? readable | writable : readable);
|
return proceed();
|
||||||
|
|
||||||
// allow chaining
|
|
||||||
return monitor.valid() ? this : nullptr;
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// report an error to user-space
|
// report an error to user-space
|
||||||
|
|
@ -352,6 +349,9 @@ public:
|
||||||
*/
|
*/
|
||||||
virtual void send(const char *buffer, size_t size) override
|
virtual void send(const char *buffer, size_t size) override
|
||||||
{
|
{
|
||||||
|
// do nothing if already busy closing
|
||||||
|
if (_closed) return;
|
||||||
|
|
||||||
// put the data in the outgoing buffer
|
// put the data in the outgoing buffer
|
||||||
_out.add(buffer, size);
|
_out.add(buffer, size);
|
||||||
|
|
||||||
|
|
@ -365,18 +365,17 @@ public:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gracefully close the connection
|
* Gracefully close the connection
|
||||||
* @return TcpState The next state
|
|
||||||
*/
|
*/
|
||||||
virtual TcpState *close() override
|
virtual void close() override
|
||||||
{
|
{
|
||||||
// remember that the object is going to be closed
|
// remember that the object is going to be closed
|
||||||
_closed = true;
|
_closed = true;
|
||||||
|
|
||||||
// if the previous operation is still in progress we can wait for that
|
// if the previous operation is still in progress we can wait for that
|
||||||
if (_state != state_idle) return this;
|
if (_state != state_idle) return;
|
||||||
|
|
||||||
// the connection can be closed right now, move to the next state
|
// let's wait until the socket becomes writable (because then we can start the shutdown)
|
||||||
return new SslShutdown(this, std::move(_ssl));
|
_parent->onIdle(this, _socket, readable | writable);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -135,9 +135,14 @@ public:
|
||||||
// are we in an error state?
|
// are we in an error state?
|
||||||
if (result < 0 && reportError()) return finalState(monitor);
|
if (result < 0 && reportError()) return finalState(monitor);
|
||||||
|
|
||||||
// if buffer is empty by now, we no longer have to check for
|
// if we still have a buffer, we keep on monitoring
|
||||||
// writability, but only for readability
|
if (_out) return this;
|
||||||
if (!_out) _parent->onIdle(this, _socket, readable);
|
|
||||||
|
// if we do not expect to send more data, we can close the connection for writing
|
||||||
|
if (_closed) shutdown(_socket, SHUT_WR);
|
||||||
|
|
||||||
|
// check for readability (to find more data, or to be notified that connection is gone)
|
||||||
|
_parent->onIdle(this, _socket, readable);
|
||||||
}
|
}
|
||||||
|
|
||||||
// should we check for readability too?
|
// should we check for readability too?
|
||||||
|
|
@ -185,6 +190,9 @@ public:
|
||||||
*/
|
*/
|
||||||
virtual void send(const char *buffer, size_t size) override
|
virtual void send(const char *buffer, size_t size) override
|
||||||
{
|
{
|
||||||
|
// we stop sending when connection is closed
|
||||||
|
if (_closed) return;
|
||||||
|
|
||||||
// is there already a buffer of data that can not be sent?
|
// is there already a buffer of data that can not be sent?
|
||||||
if (_out) return _out.add(buffer, size);
|
if (_out) return _out.add(buffer, size);
|
||||||
|
|
||||||
|
|
@ -206,16 +214,18 @@ public:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gracefully close the connection
|
* Gracefully close the connection
|
||||||
* @return TcpState The next state
|
|
||||||
*/
|
*/
|
||||||
virtual TcpState *close() override
|
virtual void close() override
|
||||||
{
|
{
|
||||||
// do nothing if already closed
|
// do nothing if already closed
|
||||||
if (_closed) return this;
|
if (_closed) return;
|
||||||
|
|
||||||
// remember that the connection is closed
|
// remember that the connection is closed
|
||||||
_closed = true;
|
_closed = true;
|
||||||
|
|
||||||
|
// wait until the outgoing buffer is all gone
|
||||||
|
if (_out) return;
|
||||||
|
|
||||||
// we will shutdown the socket in a very elegant way, we notify the peer
|
// we will shutdown the socket in a very elegant way, we notify the peer
|
||||||
// that we will not be sending out more write operations
|
// that we will not be sending out more write operations
|
||||||
shutdown(_socket, SHUT_WR);
|
shutdown(_socket, SHUT_WR);
|
||||||
|
|
@ -223,9 +233,6 @@ public:
|
||||||
// we still monitor the socket for readability to see if our close call was
|
// we still monitor the socket for readability to see if our close call was
|
||||||
// confirmed by the peer
|
// confirmed by the peer
|
||||||
_parent->onIdle(this, _socket, readable);
|
_parent->onIdle(this, _socket, readable);
|
||||||
|
|
||||||
// start the tcp shutdown
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -57,30 +57,6 @@ std::size_t TcpConnection::queued() const
|
||||||
return _state->queued();
|
return _state->queued();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Assign a new state
|
|
||||||
* @param monitor
|
|
||||||
* @param state
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
bool TcpConnection::assign(const Monitor &monitor, TcpState *state)
|
|
||||||
{
|
|
||||||
// not possible if object is already destructed
|
|
||||||
if (!monitor.valid()) return false;
|
|
||||||
|
|
||||||
// destruct the old state first (this could destruct "this")
|
|
||||||
_state.reset(nullptr);
|
|
||||||
|
|
||||||
// leap out if object was destructed
|
|
||||||
if (!monitor.valid()) return false;
|
|
||||||
|
|
||||||
// assign the new state
|
|
||||||
_state.reset(state);
|
|
||||||
|
|
||||||
// done
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the TCP connection
|
* Process the TCP connection
|
||||||
* This method should be called when the filedescriptor that is registered
|
* This method should be called when the filedescriptor that is registered
|
||||||
|
|
@ -95,20 +71,23 @@ void TcpConnection::process(int fd, int flags)
|
||||||
// monitor the object for destruction, because you never know what the user
|
// monitor the object for destruction, because you never know what the user
|
||||||
Monitor monitor(this);
|
Monitor monitor(this);
|
||||||
|
|
||||||
// store the old state
|
|
||||||
auto *oldstate = _state.get();
|
|
||||||
|
|
||||||
// pass on the the state, that returns a new impl
|
// pass on the the state, that returns a new impl
|
||||||
auto *newstate = _state->process(monitor, fd, flags);
|
auto *newstate = _state->process(monitor, fd, flags);
|
||||||
|
|
||||||
// if the state did not change, we do not have to update a member,
|
// if the state did not change, we do not have to update a member,
|
||||||
// when the newstate is nullptr, the object is (being) destructed
|
// when the newstate is nullptr, the object is (being) destructed
|
||||||
// and we do not have to do anything else either
|
// and we do not have to do anything else either
|
||||||
if (oldstate == newstate || newstate == nullptr) return;
|
if (newstate == nullptr || newstate == _state.get()) return;
|
||||||
|
|
||||||
|
// wrap the new state in a unique-ptr so that so that the old state
|
||||||
|
// is not destructed before the new one is assigned
|
||||||
|
std::unique_ptr<TcpState> ptr(newstate);
|
||||||
|
|
||||||
|
// swap the two pointers (this ensures that the last operation of this
|
||||||
|
// method is to destruct the old state, which possible results in calls
|
||||||
|
// to user-space and the destruction of "this"
|
||||||
|
_state.swap(ptr);
|
||||||
|
|
||||||
// in a bizarre set of circumstances, the user may have implemented the
|
|
||||||
// handler in such a way that the connection object was destructed
|
|
||||||
if (!assign(monitor, newstate)) delete newstate;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -125,15 +104,6 @@ bool TcpConnection::close(bool immediate)
|
||||||
// fail the connection / report the error to user-space
|
// fail the connection / report the error to user-space
|
||||||
_connection.fail("connection prematurely closed by client");
|
_connection.fail("connection prematurely closed by client");
|
||||||
|
|
||||||
// construct a monitor to check if object is still alive
|
|
||||||
Monitor monitor(this);
|
|
||||||
|
|
||||||
// get rid of the old state
|
|
||||||
_state.reset(nullptr);
|
|
||||||
|
|
||||||
// leap out if object was destructed
|
|
||||||
if (!monitor.valid()) return true;
|
|
||||||
|
|
||||||
// change the state
|
// change the state
|
||||||
_state.reset(new TcpClosed(this));
|
_state.reset(new TcpClosed(this));
|
||||||
|
|
||||||
|
|
@ -181,17 +151,11 @@ void TcpConnection::onError(Connection *connection, const char *message)
|
||||||
// tell this to the user
|
// tell this to the user
|
||||||
_handler->onError(this, message);
|
_handler->onError(this, message);
|
||||||
|
|
||||||
// remember the old state (this is necessary because _state may be modified by user-code)
|
// object could be destructed by user-space
|
||||||
auto *oldstate = _state.get();
|
if (!monitor.valid()) return;
|
||||||
|
|
||||||
// tell the state that the connection should be closed asap
|
// tell the state that the connection should be closed asap
|
||||||
auto *newstate = _state->close();
|
_state->close();
|
||||||
|
|
||||||
// leap out if nothing changes
|
|
||||||
if (newstate == nullptr || newstate == oldstate) return;
|
|
||||||
|
|
||||||
// assign the new state
|
|
||||||
_state.reset(newstate);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -200,20 +164,8 @@ void TcpConnection::onError(Connection *connection, const char *message)
|
||||||
*/
|
*/
|
||||||
void TcpConnection::onClosed(Connection *connection)
|
void TcpConnection::onClosed(Connection *connection)
|
||||||
{
|
{
|
||||||
// monitor to check if "this" is destructed
|
|
||||||
Monitor monitor(this);
|
|
||||||
|
|
||||||
// remember the old state (this is necessary because _state may be modified by user-code)
|
|
||||||
auto *oldstate = _state.get();
|
|
||||||
|
|
||||||
// tell the state that the connection should be closed asap
|
// tell the state that the connection should be closed asap
|
||||||
auto *newstate = _state->close();
|
_state->close();
|
||||||
|
|
||||||
// leap out if nothing changes
|
|
||||||
if (newstate == nullptr || newstate == oldstate) return;
|
|
||||||
|
|
||||||
// assign the new state
|
|
||||||
_state.reset(newstate);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -228,7 +180,8 @@ void TcpConnection::onError(TcpState *state, const char *message, bool connected
|
||||||
// we wait for the subsequent call to the onClosed() method
|
// we wait for the subsequent call to the onClosed() method
|
||||||
if (connected) return _handler->onError(this, message);
|
if (connected) return _handler->onError(this, message);
|
||||||
|
|
||||||
// monitor to check if "this" is destructed
|
// no extra onClosed() call is expected, so we have to report multiple things
|
||||||
|
// to user-space, we use a monitor to check if "this" is destructed in the middle
|
||||||
Monitor monitor(this);
|
Monitor monitor(this);
|
||||||
|
|
||||||
// tell the handler
|
// tell the handler
|
||||||
|
|
|
||||||
|
|
@ -196,6 +196,18 @@ public:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear the buffer
|
||||||
|
*/
|
||||||
|
void clear()
|
||||||
|
{
|
||||||
|
// clear all buffers
|
||||||
|
_buffers.clear();
|
||||||
|
|
||||||
|
// reset members
|
||||||
|
_skip = _size = 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fill an iovec buffer
|
* Fill an iovec buffer
|
||||||
* @param buffers the buffers to be filled
|
* @param buffers the buffers to be filled
|
||||||
|
|
|
||||||
|
|
@ -93,10 +93,9 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gracefully close the connection
|
* Gracefully start closing the connection
|
||||||
* @return TcpState The next state
|
|
||||||
*/
|
*/
|
||||||
virtual TcpState *close() { return this; }
|
virtual void close() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Install max-frame size
|
* Install max-frame size
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue