From 66d2173f860927d8310bdf8494eb59faac38bfa1 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Tue, 17 Nov 2015 09:33:28 +0100 Subject: [PATCH] added libev implementation for the Tcp handler, added libev example program --- include/libev.h | 160 ++++++++++++++++++++++++++++++++++++++++++++++++ tests/libev.cpp | 51 +++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 include/libev.h create mode 100644 tests/libev.cpp diff --git a/include/libev.h b/include/libev.h new file mode 100644 index 0000000..fe6b7cc --- /dev/null +++ b/include/libev.h @@ -0,0 +1,160 @@ +/** + * LibEV.h + * + * Implementation for the AMQP::TcpHandler that is optimized for libev. You can + * use this class instead of a AMQP::TcpHandler class, just pass the event loop + * to the constructor and you're all set + * + * Compile with: "g++ -std=c++11 libev.cpp -lamqpcpp -lev -lpthread" + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class LibEvHandler : public TcpHandler +{ +private: + /** + * Helper class that wraps a libev I/O watcher + */ + class Watcher + { + private: + /** + * The event loop to which it is attached + * @var struct ev_loop + */ + struct ev_loop *_loop; + + /** + * The actual watcher structure + * @var struct ev_io + */ + struct ev_io _io; + + /** + * Callback method that is called by libev when a filedescriptor becomes active + * @param loop The loop in which the event was triggered + * @param w Internal watcher object + * @param revents Events triggered + */ + static void callback(struct ev_loop *loop, struct ev_io *watcher, int revents) + { + // retrieve the connection + TcpConnection *connection = static_cast(watcher->data); + + // tell the connection that its filedescriptor is active + connection->process(watcher->fd, revents); + } + + public: + /** + * Constructor + * @param loop The current event loop + * @param connection The connection being watched + * @param fd The filedescriptor being watched + * @param events The events that should be monitored + */ + Watcher(struct ev_loop *loop, TcpConnection *connection, int fd, int events) : _loop(loop) + { + // initialize the libev structure + ev_io_init(&_io, callback, fd, events); + + // store the connection in the data "void*" + _io.data = connection; + + // start the watcher + ev_io_start(_loop, &_io); + } + + /** + * Destructor + */ + virtual ~Watcher() + { + // stop the watcher + ev_io_stop(_loop, &_io); + } + + /** + * Change the events for which the filedescriptor is monitored + * @param events + */ + void events(int events) + { + // libev has a function for this + ev_io_set(&_io, _io.fd, events); + } + }; + + + /** + * The event loop + * @var struct ev_loop* + */ + struct ev_loop *_loop; + + /** + * All I/O watchers that are active, indexed by their filedescriptor + * @var std::map + */ + std::map> _watchers; + + + /** + * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability + * @param connection The TCP connection object that is reporting + * @param fd The filedescriptor to be monitored + * @param flags Should the object be monitored for readability or writability? + */ + virtual void monitor(TcpConnection *connection, int fd, int flags) override + { + // do we already have this filedescriptor + auto iter = _watchers.find(fd); + + // was it found? + if (iter == _watchers.end()) + { + // we did not yet have this watcher - but that is ok if no filedescriptor was registered + if (flags == 0) return; + + // construct a new watcher, and put it in the map + _watchers[fd] = std::unique_ptr(new Watcher(_loop, connection, fd, flags)); + } + else if (flags == 0) + { + // the watcher does already exist, but we no longer have to watch this watcher + _watchers.erase(iter); + } + else + { + // change the events + iter->second->events(flags); + } + } + +public: + /** + * Constructor + * @param loop The event loop to wrap + */ + LibEvHandler(struct ev_loop *loop) : _loop(loop) {} + + /** + * Destructor + */ + virtual ~LibEvHandler() = default; +}; + +/** + * End of namespace + */ +} diff --git a/tests/libev.cpp b/tests/libev.cpp new file mode 100644 index 0000000..06277f0 --- /dev/null +++ b/tests/libev.cpp @@ -0,0 +1,51 @@ +/** + * LibEV.cpp + * + * Test program to check AMQP functionality based on LibEV + * + * @author Emiel Bruijntjes + * @copyright 2015 Copernica BV + */ + +/** + * Dependencies + */ +#include +#include +#include + +/** + * Main program + * @return int + */ +int main() +{ + // access to the event loop + auto *loop = EV_DEFAULT; + + // handler for libev + AMQP::LibEvHandler handler(loop); + + // make a connection + AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/")); + + // we need a channel too + AMQP::TcpChannel channel(&connection); + + // create a temporary queue + channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) { + + // report the name of the temporary queue + std::cout << "declared queue " << name << std::endl; + + // now we can close the connection + connection.close(); + }); + + // run the loop + ev_run(loop, 0); + + // done + return 0; +} +