the TcpConnection::close() method now supports an "immediate" parameter to skip the official amqp handshake for closing down

This commit is contained in:
Emiel Bruijntjes 2018-10-28 18:13:13 +01:00
parent 8546f52cd0
commit 62a4262a58
9 changed files with 204 additions and 17 deletions

View File

@ -156,16 +156,15 @@ public:
void flush();
/**
* Close the connection
* This closes all channels and the TCP connection
* Close the connection in an elegant fashion. This closes all channels and the
* TCP connection. Note that the connection is not immediately closed: first all
* pending operations are completed, and then an AMQP closing-handshake is
* performed. If you pass a parameter "immediate=true" the connection is
* immediately closed, without waiting for earlier commands
* @return bool
*/
bool close()
{
// pass to the underlying connection
return _connection.close();
}
bool close(bool immediate = false);
/**
* The max frame size. Useful if you set up a buffer to parse incoming data: it does not have to exceed this size.
* @return uint32_t

View File

@ -476,6 +476,29 @@ public:
// done
return this;
}
/**
* Close the connection immediately
* @param monitor object to check if connection object is still active
* @return TcpState the new state
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// if we have already told user space that connection is gone
if (_finalized) return new TcpClosed(this);
// object will be finalized now
_finalized = true;
// close the connection
close();
// inform user space that the party is over
_handler->onError(_connection, "ssl connection terminated");
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
/**
* Send data over the connection

View File

@ -51,6 +51,28 @@ private:
TcpOutBuffer _out;
/**
* Close the socket
* @return bool
*/
bool close()
{
// do nothing if already closed
if (_socket < 0) return false;
// and stop monitoring it
_handler->monitor(_connection, _socket, 0);
// close the socket
::close(_socket);
// forget filedescriptor
_socket = -1;
// done
return true;
}
/**
* Report a new state
* @param monitor
@ -90,14 +112,8 @@ private:
*/
TcpState *reportError(const Monitor &monitor)
{
// we are no longer interested in any events for this socket
_handler->monitor(_connection, _socket, 0);
// close the socket
close(_socket);
// forget filedescriptor
_socket = -1;
close();
// we have an error - report this to the user
_handler->onError(_connection, "failed to setup ssl connection");
@ -162,8 +178,9 @@ public:
// leap out if socket is invalidated
if (_socket < 0) return;
// the object got destructed without moving to a new state, this is normally
close(_socket);
// the object got destructed without moving to a new state, this
// situation should normally not occur
::close(_socket);
}
/**
@ -257,6 +274,24 @@ public:
}
}
}
/**
* Close the connection immediately
* @param monitor object to check if connection object is still active
* @return TcpState the new state
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// close the socket
close();
// report to the user that the handshake was aborted
_handler->onError(_connection, "ssl handshake aborted");
// done, go to the closed state (plus check if connection still exists, because
// after the onError() call the user space program may have destructed that object)
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
};
/**

View File

@ -242,6 +242,29 @@ public:
}
}
}
/**
* Abort the shutdown operation
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @return TcpState
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// if we have already told user space that connection is gone
if (_finalized) return new TcpClosed(this);
// object will be finalized now
_finalized = true;
// close the connection
close();
// inform user space that the party is over
_handler->onError(_connection, "ssl shutdown aborted");
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
};
/**

View File

@ -42,6 +42,17 @@ public:
* Destructor
*/
virtual ~TcpClosed() noexcept = default;
/**
* Abort the operation
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @return TcpState
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// connection was closed and stays closed
return this;
}
};
/**

View File

@ -278,6 +278,29 @@ public:
// all has been sent
return this;
}
/**
* Close the connection immediately
* @param monitor object to check if connection object is still active
* @return TcpState the new state
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// if we have already told user space that connection is gone
if (_finalized) return new TcpClosed(this);
// object will be finalized now
_finalized = true;
// close the connection
close();
// inform user space that the party is over
_handler->onError(_connection, "tcp connection terminated");
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
/**
* Report that heartbeat negotiation is going on

View File

@ -116,6 +116,52 @@ void TcpConnection::flush()
}
}
/**
* Close the connection
* @return bool
*/
bool TcpConnection::close(bool immediate)
{
// if no immediate disconnect is needed, we can simply start the closing handshake
if (!immediate) return _connection.close();
// a call to user-space will be made, so we need to monitor if "this" is destructed
Monitor monitor(this);
// pass to the underlying connection to start the amqp-closing handshake
_connection.close();
// if the user-space code destructed the connection, there is nothing else to do
if (!monitor.valid()) return true;
// store the old state
auto *oldstate = _state.get();
// abort the operation
auto *newstate = _state->abort(monitor);
// if the state did not change, we do not have to update a member,
// when the newstate is nullptr, the object is (being) destructed
// and we do not have to do anything else either
if (oldstate == newstate || newstate == nullptr) return true;
// in a bizarre set of circumstances, the user may have implemented the
// handler in such a way that the connection object was destructed
if (!monitor.valid())
{
// ok, user code is weird, connection object no longer exist, get rid of the state too
delete newstate;
}
else
{
// replace it with the new implementation
_state.reset(newstate);
}
// done, we return true because the connection is closed
return true;
}
/**
* Method that is called after the connection was constructed
* @param connection The connection that was attached to the handler

View File

@ -249,6 +249,26 @@ public:
return proceed(monitor);
}
/**
* Close the connection immediately
* @param monitor object to check if connection object is still active
* @return TcpState the new state
*/
virtual TcpState *abort(const Monitor &monitor) override
{
// just wait for the other thread to be ready
_thread.join();
// close the socket
if (_socket >= 0) ::close(_socket);
// inform user space that the connection is cancelled
_handler->onError(_connection, "tcp connect aborted");
// go to the final state (if not yet disconnected)
return monitor.valid() ? new TcpClosed(this) : nullptr;
}
/**
* Send data over the connection
* @param buffer buffer to send

View File

@ -115,6 +115,13 @@ public:
* @return TcpState New implementation object
*/
virtual TcpState *flush(const Monitor &monitor) { return this; }
/**
* Abort the operation, immediately proceed to the final state
* @param monitor Monitor that can be used to check if the tcp connection is still alive
* @return TcpState New implementation object
*/
virtual TcpState *abort(const Monitor &monitor) = 0;
/**
* Report to the handler that the connection was constructed