From 5042b699bd4818004b59d4d5b0642c8ecf90084f Mon Sep 17 00:00:00 2001 From: David Nikdel Date: Thu, 14 Jul 2016 23:30:43 -0400 Subject: [PATCH 1/5] create libuv.h based on libev.h --- include/libuv.h | 220 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 include/libuv.h diff --git a/include/libuv.h b/include/libuv.h new file mode 100644 index 0000000..d3aa8f3 --- /dev/null +++ b/include/libuv.h @@ -0,0 +1,220 @@ +/** + * LibUV.h + * + * Implementation for the AMQP::TcpHandler that is optimized for libuv. You can + * use this class instead of a AMQP::TcpHandler class, just pass the event loop + * to the constructor and you're all set. + * + * Based heavily on the libev.h implementation by Emiel Bruijntjes + * + * @author David Nikdel + * @copyright 2015 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class LibUvHandler : public TcpHandler +{ +private: + /** + * Helper class that wraps a libev I/O watcher + */ + class Watcher + { + private: + /** + * The event loop to which it is attached + * @var struct uv_loop_t + */ + struct uv_loop_t *_loop; + + /** + * The actual watcher structure + * @var struct uv_poll_t + */ + struct uv_poll_t *_poll; + + /** + * Callback method that is called by libuv when a filedescriptor becomes active + * @param handle Internal poll handle + * @param status LibUV error code UV_* + * @param events Events triggered + */ + static void callback(struct uv_poll_t *handle, int status, int events) + { + // retrieve the connection + TcpConnection *connection = static_cast(handle->data); + + // tell the connection that its filedescriptor is active + int fd = -1; + uv_fileno(static_cast(handle), &fd); + connection->process(fd, uv_to_amqp_events(events)); + } + + public: + /** + * Constructor + * @param loop The current event loop + * @param connection The connection being watched + * @param fd The filedescriptor being watched + * @param events The events that should be monitored + */ + Watcher(struct uv_loop_t *loop, TcpConnection *connection, int fd, int events) : _loop(loop) + { + // create a new poll + _poll = new uv_poll_t(); + + // initialize the libev structure + uv_poll_init(_loop, _poll, fd); + + // store the connection in the data "void*" + _poll->data = connection; + + // start the watcher + uv_poll_start(_poll, amqp_to_uv_events(events), callback); + } + + /** + * Watchers cannot be copied or moved + * + * @param that The object to not move or copy + */ + Watcher(Watcher &&that) = delete; + Watcher(const Watcher &that) = delete; + + /** + * Destructor + */ + virtual ~Watcher() + { + // stop the watcher + uv_poll_stop(_poll); + + // close the handle + uv_close(static_cast(_poll), [](uv_handle_t* handle) { + // delete memory once closed + delete static_cast(handle); + }); + } + + /** + * Change the events for which the filedescriptor is monitored + * @param events + */ + void events(int events) + { + // stop the watcher if it was active + uv_poll_stop(_poll); + + // and restart it + uv_poll_start(_poll, amqp_to_uv_events(events), callback); + } + + private: + + /** + * Convert event flags from UV format to AMQP format + */ + static int uv_to_amqp_events(int events) + { + int amqp_events = 0; + if (events & UV_READABLE) + amqp_events |= AMQP::readable; + if (events & UV_WRITABLE) + amqp_events |= AMQP::writable; + return amqp_events; + } + + /** + * Convert event flags from AMQP format to UV format + */ + static int amqp_to_uv_events(int events) + { + int uv_events = 0; + if (events & AMQP::readable) + uv_events |= UV_READABLE; + if (events & AMQP::writable) + uv_events |= UV_WRITABLE; + return uv_events; + } + }; + + + /** + * The event loop + * @var struct uv_loop_t* + */ + struct uv_loop_t *_loop; + + /** + * All I/O watchers that are active, indexed by their filedescriptor + * @var std::map + */ + std::map> _watchers; + + + /** + * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability + * @param connection The TCP connection object that is reporting + * @param fd The filedescriptor to be monitored + * @param flags Should the object be monitored for readability or writability? + */ + virtual void monitor(TcpConnection *connection, int fd, int flags) override + { + // do we already have this filedescriptor + auto iter = _watchers.find(fd); + + // was it found? + if (iter == _watchers.end()) + { + // we did not yet have this watcher - but that is ok if no filedescriptor was registered + if (flags == 0) return; + + // construct a new watcher, and put it in the map + _watchers[fd] = std::unique_ptr(new Watcher(_loop, connection, fd, flags)); + } + else if (flags == 0) + { + // the watcher does already exist, but we no longer have to watch this watcher + _watchers.erase(iter); + } + else + { + // change the events + iter->second->events(flags); + } + } + +public: + /** + * Constructor + * @param loop The event loop to wrap + */ + LibUvHandler(struct uv_loop_t *loop) : _loop(loop) {} + + /** + * Destructor + */ + virtual ~LibUvHandler() = default; +}; + +/** + * End of namespace + */ +} From b8d2c0c600b714462edcaa50b105f1f44c0a8614 Mon Sep 17 00:00:00 2001 From: David Nikdel Date: Thu, 14 Jul 2016 23:49:31 -0400 Subject: [PATCH 2/5] fix casting --- include/libuv.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/libuv.h b/include/libuv.h index d3aa8f3..612b254 100644 --- a/include/libuv.h +++ b/include/libuv.h @@ -63,7 +63,7 @@ private: // tell the connection that its filedescriptor is active int fd = -1; - uv_fileno(static_cast(handle), &fd); + uv_fileno(reinterpret_cast(handle), &fd); connection->process(fd, uv_to_amqp_events(events)); } @@ -107,9 +107,9 @@ private: uv_poll_stop(_poll); // close the handle - uv_close(static_cast(_poll), [](uv_handle_t* handle) { + uv_close(reinterpret_cast(_poll), [](uv_handle_t* handle) { // delete memory once closed - delete static_cast(handle); + delete reinterpret_cast(handle); }); } From 1e1ec0c133013ea3e3a067031dbc8ff010eeeb42 Mon Sep 17 00:00:00 2001 From: David Nikdel Date: Sat, 16 Jul 2016 20:27:00 -0400 Subject: [PATCH 3/5] Update libuv.h - if status is not ok, report that as readable - no need to stop uv_poll_t before changing events according to the docs --- include/libuv.h | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/include/libuv.h b/include/libuv.h index 612b254..4913199 100644 --- a/include/libuv.h +++ b/include/libuv.h @@ -64,7 +64,7 @@ private: // tell the connection that its filedescriptor is active int fd = -1; uv_fileno(reinterpret_cast(handle), &fd); - connection->process(fd, uv_to_amqp_events(events)); + connection->process(fd, uv_to_amqp_events(status, events)); } public: @@ -119,10 +119,7 @@ private: */ void events(int events) { - // stop the watcher if it was active - uv_poll_stop(_poll); - - // and restart it + // update the events being watched for uv_poll_start(_poll, amqp_to_uv_events(events), callback); } @@ -131,13 +128,15 @@ private: /** * Convert event flags from UV format to AMQP format */ - static int uv_to_amqp_events(int events) + static int uv_to_amqp_events(int status, int events) { int amqp_events = 0; if (events & UV_READABLE) amqp_events |= AMQP::readable; if (events & UV_WRITABLE) amqp_events |= AMQP::writable; + if (status != 0) + amqp_events |= AMQP::readable; return amqp_events; } From 584d92e751ea368229e38e9cb582f10c850d1b34 Mon Sep 17 00:00:00 2001 From: David Nikdel Date: Sat, 16 Jul 2016 20:30:03 -0400 Subject: [PATCH 4/5] can't use struct elaboration with typedef --- include/libuv.h | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/include/libuv.h b/include/libuv.h index 4913199..8649407 100644 --- a/include/libuv.h +++ b/include/libuv.h @@ -40,15 +40,15 @@ private: private: /** * The event loop to which it is attached - * @var struct uv_loop_t + * @var uv_loop_t */ - struct uv_loop_t *_loop; + uv_loop_t *_loop; /** * The actual watcher structure - * @var struct uv_poll_t + * @var uv_poll_t */ - struct uv_poll_t *_poll; + uv_poll_t *_poll; /** * Callback method that is called by libuv when a filedescriptor becomes active @@ -56,7 +56,7 @@ private: * @param status LibUV error code UV_* * @param events Events triggered */ - static void callback(struct uv_poll_t *handle, int status, int events) + static void callback(uv_poll_t *handle, int status, int events) { // retrieve the connection TcpConnection *connection = static_cast(handle->data); @@ -75,7 +75,7 @@ private: * @param fd The filedescriptor being watched * @param events The events that should be monitored */ - Watcher(struct uv_loop_t *loop, TcpConnection *connection, int fd, int events) : _loop(loop) + Watcher(uv_loop_t *loop, TcpConnection *connection, int fd, int events) : _loop(loop) { // create a new poll _poll = new uv_poll_t(); @@ -157,9 +157,9 @@ private: /** * The event loop - * @var struct uv_loop_t* + * @var uv_loop_t* */ - struct uv_loop_t *_loop; + uv_loop_t *_loop; /** * All I/O watchers that are active, indexed by their filedescriptor @@ -205,7 +205,7 @@ public: * Constructor * @param loop The event loop to wrap */ - LibUvHandler(struct uv_loop_t *loop) : _loop(loop) {} + LibUvHandler(uv_loop_t *loop) : _loop(loop) {} /** * Destructor From fd69a4c01dec0463b222fada31d3367d03fe6356 Mon Sep 17 00:00:00 2001 From: David Nikdel Date: Sat, 16 Jul 2016 20:43:19 -0400 Subject: [PATCH 5/5] map closed sockets to both read and write - this is more consistent with libev so probably will match what existing code expects --- include/libuv.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/include/libuv.h b/include/libuv.h index 8649407..d48303f 100644 --- a/include/libuv.h +++ b/include/libuv.h @@ -130,13 +130,16 @@ private: */ static int uv_to_amqp_events(int status, int events) { + // if the socket is closed report both so we pick up the error + if (status != 0) + return AMQP::readable | AMQP::writable; + + // map read or write int amqp_events = 0; if (events & UV_READABLE) amqp_events |= AMQP::readable; if (events & UV_WRITABLE) amqp_events |= AMQP::writable; - if (status != 0) - amqp_events |= AMQP::readable; return amqp_events; }