implemented heartbeats for libev
This commit is contained in:
parent
a091921e88
commit
90d9946474
|
|
@ -98,9 +98,9 @@ public:
|
||||||
* Send a ping/heartbeat to the channel to keep it alive
|
* Send a ping/heartbeat to the channel to keep it alive
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
bool ping()
|
bool heartbeat()
|
||||||
{
|
{
|
||||||
return _implementation.ping();
|
return _implementation.heartbeat();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
108
include/libev.h
108
include/libev.h
|
|
@ -119,6 +119,84 @@ private:
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timer class to periodically fire a heartbeat
|
||||||
|
*/
|
||||||
|
class Timer
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* The event loop to which it is attached
|
||||||
|
* @var struct ev_loop
|
||||||
|
*/
|
||||||
|
struct ev_loop *_loop;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The actual watcher structure
|
||||||
|
* @var struct ev_io
|
||||||
|
*/
|
||||||
|
struct ev_timer _timer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback method that is called by libev when the timer expires
|
||||||
|
* @param loop The loop in which the event was triggered
|
||||||
|
* @param timer Internal timer object
|
||||||
|
* @param revents The events that triggered this call
|
||||||
|
*/
|
||||||
|
static void callback(struct ev_loop *loop, struct ev_timer *timer, int revents)
|
||||||
|
{
|
||||||
|
// retrieve the connection
|
||||||
|
TcpConnection *connection = static_cast<TcpConnection*>(timer->data);
|
||||||
|
|
||||||
|
// send the heartbeat
|
||||||
|
connection->heartbeat();
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param loop The current event loop
|
||||||
|
*/
|
||||||
|
Timer(struct ev_loop *loop) : _loop(loop)
|
||||||
|
{
|
||||||
|
// initialize the libev structure
|
||||||
|
ev_timer_init(&_timer, callback, 60.0, 60.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Watchers cannot be copied or moved
|
||||||
|
*
|
||||||
|
* @param that The object to not move or copy
|
||||||
|
*/
|
||||||
|
Timer(Watcher &&that) = delete;
|
||||||
|
Timer(const Watcher &that) = delete;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destructor
|
||||||
|
*/
|
||||||
|
virtual ~Timer()
|
||||||
|
{
|
||||||
|
// stop the timer
|
||||||
|
ev_timer_stop(_loop, &_timer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Change the expire time
|
||||||
|
* @param connection
|
||||||
|
* @param timeout
|
||||||
|
*/
|
||||||
|
void set(TcpConnection *connection, uint16_t timeout)
|
||||||
|
{
|
||||||
|
// store the connection in the data "void*"
|
||||||
|
_timer.data = connection;
|
||||||
|
|
||||||
|
// set the timer
|
||||||
|
ev_timer_set(&_timer, timeout, timeout);
|
||||||
|
|
||||||
|
// and start it
|
||||||
|
ev_timer_start(_loop, &_timer);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The event loop
|
* The event loop
|
||||||
|
|
@ -132,6 +210,12 @@ private:
|
||||||
*/
|
*/
|
||||||
std::map<int,std::unique_ptr<Watcher>> _watchers;
|
std::map<int,std::unique_ptr<Watcher>> _watchers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A timer to periodically send out heartbeats
|
||||||
|
* @var Timer
|
||||||
|
*/
|
||||||
|
Timer _timer;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
|
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
|
||||||
|
|
@ -165,12 +249,34 @@ private:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/**
|
||||||
|
* Method that is called when the heartbeat frequency is negotiated between the server and the client.
|
||||||
|
* @param connection The connection that suggested a heartbeat interval
|
||||||
|
* @param interval The suggested interval from the server
|
||||||
|
* @return uint16_t The interval to use
|
||||||
|
*/
|
||||||
|
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
|
||||||
|
{
|
||||||
|
// call base to find out the timeout that the client wants
|
||||||
|
interval = TcpHandler::onNegotiate(connection, interval);
|
||||||
|
|
||||||
|
// skip if base does not want a timeout
|
||||||
|
if (interval == 0) return 0;
|
||||||
|
|
||||||
|
// set the timer
|
||||||
|
_timer.set(connection, interval);
|
||||||
|
|
||||||
|
// done
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param loop The event loop to wrap
|
* @param loop The event loop to wrap
|
||||||
*/
|
*/
|
||||||
LibEvHandler(struct ev_loop *loop) : _loop(loop) {}
|
LibEvHandler(struct ev_loop *loop) : _loop(loop), _timer(loop) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destructor
|
* Destructor
|
||||||
|
|
|
||||||
|
|
@ -176,6 +176,15 @@ public:
|
||||||
// return the amount of channels this connection has
|
// return the amount of channels this connection has
|
||||||
return _connection.channels();
|
return _connection.channels();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a heartbeat
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
bool heartbeat()
|
||||||
|
{
|
||||||
|
return _connection.heartbeat();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue