clients can now intercept the heartbeat negotiation, and install a shorter of longer heartbeat interval, and clients can be notified for received heartbeat frames

This commit is contained in:
Toon Schoenmakers 2015-12-02 10:46:55 +01:00
parent 57c1109115
commit 93a0b60b6e
8 changed files with 130 additions and 5 deletions

View File

@ -25,6 +25,27 @@ namespace AMQP {
class ConnectionHandler
{
public:
/**
* Method that is called when the heartbeat frequency is negotiated
* between the server and the client. You normally do not have to
* override this method, because in the default implementation the
* suggested heartbeat is simply accepted by the client.
*
* However, if you want to disable heartbeats, or when you want an
* alternative heartbeat interval, you can override this method
* to use an other interval. You should return 0 if you want to
* disable heartbeats.
*
* @param connection The connection that suggested a heartbeat interval
* @param interval The suggested interval from the server
* @return uint16_t The interval to use
*/
virtual uint16_t onNegotiate(Connection *connection, uint16_t interval)
{
// default implementation, suggested heartbeat is ok
return interval;
}
/**
* Method that is called when data needs to be sent over the network
*
@ -38,6 +59,17 @@ public:
*/
virtual void onData(Connection *connection, const char *buffer, size_t size) = 0;
/**
* Method that is called when the server sends a heartbeat to the client
*
* You do not have to do anything here, the client sends back a heartbeat
* frame automatically, but if you like, you can implement/override this
* method if you want to be notified of such heartbeats
*
* @param connection The connection over which the heartbeat was received
*/
virtual void onHeartbeat(Connection *connection) {}
/**
* When the connection ends up in an error state this method is called.
* This happens when data comes in that does not match the AMQP protocol

View File

@ -368,10 +368,22 @@ public:
/**
* Set the heartbeat delay
* @param heartbeat suggested heartbeat by server
* @return uint16_t accepted heartbeat by client
*/
void setHeartbeat(uint16_t heartbeat)
uint16_t setHeartbeat(uint16_t heartbeat)
{
_heartbeat = heartbeat;
// pass to the handler
return _heartbeat = _handler->onNegotiate(_parent, heartbeat);
}
/**
* Report a heartbeat to the connection handler
*/
void reportHeartbeat()
{
// pass to handler
_handler->onHeartbeat(_parent);
}
/**

View File

@ -42,6 +42,13 @@ private:
*/
Connection _connection;
/**
* Method that is called when the heartbeat frequency is negotiated.
* @param connection The connection that suggested a heartbeat interval
* @param interval The suggested interval from the server
* @return uint16_t The interval to use
*/
virtual uint16_t onNegotiate(Connection *connection, uint16_t interval) override;
/**
* Method that is called by the connection when data needs to be sent over the network
@ -51,6 +58,12 @@ private:
*/
virtual void onData(Connection *connection, const char *buffer, size_t size) override;
/**
* Method that is called when the server sends a heartbeat to the client
* @param connection The connection over which the heartbeat was received
*/
virtual void onHeartbeat(Connection *connection) override;
/**
* Method called when the connection ends up in an error state
* @param connection The connection that entered the error state

View File

@ -25,12 +25,35 @@ class TcpConnection;
class TcpHandler
{
public:
/**
* Method that is called when the heartbeat frequency is negotiated
* between the server and the client.
* @param connection The connection that suggested a heartbeat interval
* @param interval The suggested interval from the server
* @return uint16_t The interval to use
*
* @see ConnectionHandler::onNegotiate
*/
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval)
{
// default implementation, suggested heartbeat is ok
return interval;
}
/**
* Method that is called when the TCP connection ends up in a connected state
* @param connection The TCP connection
*/
virtual void onConnected(TcpConnection *connection) {}
/**
* Method that is called when the server sends a heartbeat to the client
* @param connection The connection over which the heartbeat was received
*
* @see ConnectionHandler::onHeartbeat
*/
virtual void onHeartbeat(TcpConnection *connection) {}
/**
* Method that is called when the TCP connection ends up in an error state
* @param connection The TCP connection

View File

@ -138,10 +138,10 @@ public:
Monitor monitor(connection);
// store the heartbeat the server wants
connection->setHeartbeat(heartbeat());
uint16_t interval = connection->setHeartbeat(heartbeat());
// send it back
connection->send(ConnectionTuneOKFrame(channelMax(), frameMax(), heartbeat()));
connection->send(ConnectionTuneOKFrame(channelMax(), frameMax(), interval));
// check if the connection object still exists
if (!monitor.valid()) return true;

View File

@ -57,6 +57,9 @@ public:
*/
virtual bool process(ConnectionImpl *connection) override
{
// notify the connection-handler
connection->reportHeartbeat();
// send back the same frame
connection->send(*this);

View File

@ -62,6 +62,18 @@ void TcpConnection::process(int fd, int flags)
_state = result;
}
/**
* Method that is called when the heartbeat frequency is negotiated.
* @param connection The connection that suggested a heartbeat interval
* @param interval The suggested interval from the server
* @return uint16_t The interval to use
*/
uint16_t TcpConnection::onNegotiate(Connection *connection, uint16_t interval)
{
// the state object should do this
return _state->reportNegotiate(interval);
}
/**
* Method that is called by the connection when data needs to be sent over the network
* @param connection The connection that created this output
@ -74,6 +86,16 @@ void TcpConnection::onData(Connection *connection, const char *buffer, size_t si
_state->send(buffer, size);
}
/**
* Method that is called when the server sends a heartbeat to the client
* @param connection The connection over which the heartbeat was received
*/
void TcpConnection::onHeartbeat(Connection *connection)
{
// let the state object do this
_state->reportHeartbeat();
}
/**
* Method called when the connection ends up in an error state
* @param connection The connection that entered the error state

View File

@ -79,6 +79,17 @@ public:
// default does nothing
}
/**
* Report that heartbeat negotiation is going on
* @param heartbeat suggested heartbeat
* @return uint16_t accepted heartbeat
*/
uint16_t reportNegotiate(uint16_t heartbeat)
{
// pass to handler
return _handler->onNegotiate(_connection, heartbeat);
}
/**
* Report to the handler that the object is in an error state
* @param error
@ -89,6 +100,15 @@ public:
_handler->onError(_connection, error);
}
/**
* Report that a heartbeat frame was received
*/
void reportHeartbeat()
{
// pass to handler
_handler->onHeartbeat(_connection);
}
/**
* Report to the handler that the connection is ready for use
*/