2016-07-15 11:30:43 +08:00
|
|
|
/**
|
|
|
|
|
* LibUV.h
|
|
|
|
|
*
|
|
|
|
|
* Implementation for the AMQP::TcpHandler that is optimized for libuv. You can
|
|
|
|
|
* use this class instead of a AMQP::TcpHandler class, just pass the event loop
|
|
|
|
|
* to the constructor and you're all set.
|
|
|
|
|
*
|
|
|
|
|
* Based heavily on the libev.h implementation by Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
|
|
|
|
*
|
|
|
|
|
* @author David Nikdel <david@nikdel.com>
|
|
|
|
|
* @copyright 2015 Copernica BV
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Include guard
|
|
|
|
|
*/
|
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Dependencies
|
|
|
|
|
*/
|
|
|
|
|
#include <uv.h>
|
|
|
|
|
|
2018-01-30 18:07:27 +08:00
|
|
|
#include "amqpcpp/linux_tcp.h"
|
|
|
|
|
|
2016-07-15 11:30:43 +08:00
|
|
|
/**
|
|
|
|
|
* Set up namespace
|
|
|
|
|
*/
|
|
|
|
|
namespace AMQP {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Class definition
|
|
|
|
|
*/
|
|
|
|
|
class LibUvHandler : public TcpHandler
|
|
|
|
|
{
|
|
|
|
|
private:
|
|
|
|
|
/**
|
|
|
|
|
* Helper class that wraps a libev I/O watcher
|
|
|
|
|
*/
|
|
|
|
|
class Watcher
|
|
|
|
|
{
|
|
|
|
|
private:
|
|
|
|
|
/**
|
|
|
|
|
* The event loop to which it is attached
|
2016-07-17 08:30:03 +08:00
|
|
|
* @var uv_loop_t
|
2016-07-15 11:30:43 +08:00
|
|
|
*/
|
2016-07-17 08:30:03 +08:00
|
|
|
uv_loop_t *_loop;
|
2016-07-15 11:30:43 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The actual watcher structure
|
2016-07-17 08:30:03 +08:00
|
|
|
* @var uv_poll_t
|
2016-07-15 11:30:43 +08:00
|
|
|
*/
|
2016-07-17 08:30:03 +08:00
|
|
|
uv_poll_t *_poll;
|
2016-07-15 11:30:43 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Callback method that is called by libuv when a filedescriptor becomes active
|
|
|
|
|
* @param handle Internal poll handle
|
|
|
|
|
* @param status LibUV error code UV_*
|
|
|
|
|
* @param events Events triggered
|
|
|
|
|
*/
|
2016-07-17 08:30:03 +08:00
|
|
|
static void callback(uv_poll_t *handle, int status, int events)
|
2016-07-15 11:30:43 +08:00
|
|
|
{
|
|
|
|
|
// retrieve the connection
|
|
|
|
|
TcpConnection *connection = static_cast<TcpConnection*>(handle->data);
|
|
|
|
|
|
|
|
|
|
// tell the connection that its filedescriptor is active
|
|
|
|
|
int fd = -1;
|
2016-07-15 11:49:31 +08:00
|
|
|
uv_fileno(reinterpret_cast<uv_handle_t*>(handle), &fd);
|
2016-07-17 08:27:00 +08:00
|
|
|
connection->process(fd, uv_to_amqp_events(status, events));
|
2016-07-15 11:30:43 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
/**
|
|
|
|
|
* Constructor
|
|
|
|
|
* @param loop The current event loop
|
|
|
|
|
* @param connection The connection being watched
|
|
|
|
|
* @param fd The filedescriptor being watched
|
|
|
|
|
* @param events The events that should be monitored
|
|
|
|
|
*/
|
2016-07-17 08:30:03 +08:00
|
|
|
Watcher(uv_loop_t *loop, TcpConnection *connection, int fd, int events) : _loop(loop)
|
2016-07-15 11:30:43 +08:00
|
|
|
{
|
|
|
|
|
// create a new poll
|
|
|
|
|
_poll = new uv_poll_t();
|
|
|
|
|
|
|
|
|
|
// initialize the libev structure
|
|
|
|
|
uv_poll_init(_loop, _poll, fd);
|
|
|
|
|
|
|
|
|
|
// store the connection in the data "void*"
|
|
|
|
|
_poll->data = connection;
|
|
|
|
|
|
|
|
|
|
// start the watcher
|
|
|
|
|
uv_poll_start(_poll, amqp_to_uv_events(events), callback);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Watchers cannot be copied or moved
|
|
|
|
|
*
|
|
|
|
|
* @param that The object to not move or copy
|
|
|
|
|
*/
|
|
|
|
|
Watcher(Watcher &&that) = delete;
|
|
|
|
|
Watcher(const Watcher &that) = delete;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Destructor
|
|
|
|
|
*/
|
|
|
|
|
virtual ~Watcher()
|
|
|
|
|
{
|
|
|
|
|
// stop the watcher
|
|
|
|
|
uv_poll_stop(_poll);
|
|
|
|
|
|
|
|
|
|
// close the handle
|
2016-07-15 11:49:31 +08:00
|
|
|
uv_close(reinterpret_cast<uv_handle_t*>(_poll), [](uv_handle_t* handle) {
|
2016-07-15 11:30:43 +08:00
|
|
|
// delete memory once closed
|
2016-07-15 11:49:31 +08:00
|
|
|
delete reinterpret_cast<uv_poll_t*>(handle);
|
2016-07-15 11:30:43 +08:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Change the events for which the filedescriptor is monitored
|
|
|
|
|
* @param events
|
|
|
|
|
*/
|
|
|
|
|
void events(int events)
|
|
|
|
|
{
|
2016-07-17 08:27:00 +08:00
|
|
|
// update the events being watched for
|
2016-07-15 11:30:43 +08:00
|
|
|
uv_poll_start(_poll, amqp_to_uv_events(events), callback);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Convert event flags from UV format to AMQP format
|
|
|
|
|
*/
|
2016-07-17 08:27:00 +08:00
|
|
|
static int uv_to_amqp_events(int status, int events)
|
2016-07-15 11:30:43 +08:00
|
|
|
{
|
2016-07-17 08:43:19 +08:00
|
|
|
// if the socket is closed report both so we pick up the error
|
|
|
|
|
if (status != 0)
|
|
|
|
|
return AMQP::readable | AMQP::writable;
|
|
|
|
|
|
|
|
|
|
// map read or write
|
2016-07-15 11:30:43 +08:00
|
|
|
int amqp_events = 0;
|
|
|
|
|
if (events & UV_READABLE)
|
|
|
|
|
amqp_events |= AMQP::readable;
|
|
|
|
|
if (events & UV_WRITABLE)
|
|
|
|
|
amqp_events |= AMQP::writable;
|
|
|
|
|
return amqp_events;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Convert event flags from AMQP format to UV format
|
|
|
|
|
*/
|
|
|
|
|
static int amqp_to_uv_events(int events)
|
|
|
|
|
{
|
|
|
|
|
int uv_events = 0;
|
|
|
|
|
if (events & AMQP::readable)
|
|
|
|
|
uv_events |= UV_READABLE;
|
|
|
|
|
if (events & AMQP::writable)
|
|
|
|
|
uv_events |= UV_WRITABLE;
|
|
|
|
|
return uv_events;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The event loop
|
2016-07-17 08:30:03 +08:00
|
|
|
* @var uv_loop_t*
|
2016-07-15 11:30:43 +08:00
|
|
|
*/
|
2016-07-17 08:30:03 +08:00
|
|
|
uv_loop_t *_loop;
|
2016-07-15 11:30:43 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* All I/O watchers that are active, indexed by their filedescriptor
|
|
|
|
|
* @var std::map<int,Watcher>
|
|
|
|
|
*/
|
|
|
|
|
std::map<int,std::unique_ptr<Watcher>> _watchers;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
|
|
|
|
|
* @param connection The TCP connection object that is reporting
|
|
|
|
|
* @param fd The filedescriptor to be monitored
|
|
|
|
|
* @param flags Should the object be monitored for readability or writability?
|
|
|
|
|
*/
|
|
|
|
|
virtual void monitor(TcpConnection *connection, int fd, int flags) override
|
|
|
|
|
{
|
|
|
|
|
// do we already have this filedescriptor
|
|
|
|
|
auto iter = _watchers.find(fd);
|
|
|
|
|
|
|
|
|
|
// was it found?
|
|
|
|
|
if (iter == _watchers.end())
|
|
|
|
|
{
|
|
|
|
|
// we did not yet have this watcher - but that is ok if no filedescriptor was registered
|
|
|
|
|
if (flags == 0) return;
|
|
|
|
|
|
|
|
|
|
// construct a new watcher, and put it in the map
|
|
|
|
|
_watchers[fd] = std::unique_ptr<Watcher>(new Watcher(_loop, connection, fd, flags));
|
|
|
|
|
}
|
|
|
|
|
else if (flags == 0)
|
|
|
|
|
{
|
|
|
|
|
// the watcher does already exist, but we no longer have to watch this watcher
|
|
|
|
|
_watchers.erase(iter);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// change the events
|
|
|
|
|
iter->second->events(flags);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
/**
|
|
|
|
|
* Constructor
|
|
|
|
|
* @param loop The event loop to wrap
|
|
|
|
|
*/
|
2016-07-17 08:30:03 +08:00
|
|
|
LibUvHandler(uv_loop_t *loop) : _loop(loop) {}
|
2016-07-15 11:30:43 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Destructor
|
|
|
|
|
*/
|
|
|
|
|
virtual ~LibUvHandler() = default;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* End of namespace
|
|
|
|
|
*/
|
|
|
|
|
}
|