From bbfcf76f39cda99667b9436d9de6d7eea84793fb Mon Sep 17 00:00:00 2001 From: Brent Dimmig Date: Sat, 9 Jan 2016 15:59:57 -0500 Subject: [PATCH] add TcpHandler implementation for libevent --- include/libevent.h | 190 +++++++++++++++++++++++++++++++++++++++++++++ tests/libevent.cpp | 53 +++++++++++++ 2 files changed, 243 insertions(+) create mode 100644 include/libevent.h create mode 100644 tests/libevent.cpp diff --git a/include/libevent.h b/include/libevent.h new file mode 100644 index 0000000..7621dcc --- /dev/null +++ b/include/libevent.h @@ -0,0 +1,190 @@ +/** + * LibEvent.h + * + * Implementation for the AMQP::TcpHandler that is optimized for libevent. You can + * use this class instead of a AMQP::TcpHandler class, just pass the event loop + * to the constructor and you're all set + * + * Compile with: "g++ -std=c++11 libevent.cpp -lamqpcpp -levent -lpthread" + * + * @author Brent Dimmig + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include +#include + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class LibEventHandler : public TcpHandler +{ +private: + /** + * Helper class that wraps a libev I/O watcher + */ + class Watcher + { + private: + /** + * The actual event structure + * @var struct event + */ + struct event * _event; + + /** + * Callback method that is called by libevent when a filedescriptor becomes active + * @param fd The filedescriptor with an event + * @param what Events triggered + * @param connection_arg void * to the connection + */ + static void callback(evutil_socket_t fd, short what, void *connection_arg) + { + // retrieve the connection + TcpConnection *connection = static_cast(connection_arg); + + // setup amqp flags + int amqp_flags = 0; + if (what & EV_READ) + amqp_flags |= AMQP::readable; + if (what & EV_WRITE) + amqp_flags |= AMQP::writable; + + // tell the connection that its filedescriptor is active + connection->process(fd, amqp_flags); + } + + public: + /** + * Constructor + * @param evbase The current event loop + * @param connection The connection being watched + * @param fd The filedescriptor being watched + * @param events The events that should be monitored + */ + Watcher(struct event_base *evbase, TcpConnection *connection, int fd, int events) + { + // setup libevent flags + short event_flags = EV_PERSIST; + if (events & AMQP::readable) + event_flags |= EV_READ; + if (events & AMQP::writable) + event_flags |= EV_WRITE; + + // initialize the event + + _event = event_new(evbase, fd, event_flags, callback, connection); + event_add(_event, nullptr); + } + + /** + * Destructor + */ + virtual ~Watcher() + { + // stop the event + event_del(_event); + // free the event + event_free(_event); + } + + /** + * Change the events for which the filedescriptor is monitored + * @param events + */ + void events(int events) + { + // stop the event if it was active + event_del(_event); + + // setup libevent flags + short event_flags = EV_PERSIST; + if (events & AMQP::readable) + event_flags |= EV_READ; + if (events & AMQP::writable) + event_flags |= EV_WRITE; + + // set the events + event_assign(_event, event_get_base(_event), event_get_fd(_event), event_flags, + event_get_callback(_event), event_get_callback_arg(_event)); + + // and restart it + event_add(_event, nullptr); + } + }; + + + /** + * The event loop + * @var struct event_base* + */ + struct event_base *_evbase; + + /** + * All I/O watchers that are active, indexed by their filedescriptor + * @var std::map + */ + std::map> _watchers; + + + /** + * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability + * @param connection The TCP connection object that is reporting + * @param fd The filedescriptor to be monitored + * @param flags Should the object be monitored for readability or writability? + */ + virtual void monitor(TcpConnection *connection, int fd, int flags) override + { + // do we already have this filedescriptor + auto iter = _watchers.find(fd); + + // was it found? + if (iter == _watchers.end()) + { + // we did not yet have this watcher - but that is ok if no filedescriptor was registered + if (flags == 0) return; + + // construct a new watcher, and put it in the map + _watchers[fd] = std::unique_ptr(new Watcher(_evbase, connection, fd, flags)); + } + else if (flags == 0) + { + // the watcher does already exist, but we no longer have to watch this watcher + _watchers.erase(iter); + } + else + { + // change the events + iter->second->events(flags); + } + } + +public: + /** + * Constructor + * @param evbase The event loop to wrap + */ + LibEventHandler(struct event_base *evbase) : _evbase(evbase) {} + + /** + * Destructor + */ + virtual ~LibEventHandler() = default; +}; + +/** + * End of namespace + */ +} diff --git a/tests/libevent.cpp b/tests/libevent.cpp new file mode 100644 index 0000000..360418d --- /dev/null +++ b/tests/libevent.cpp @@ -0,0 +1,53 @@ +/** + * Libevent.cpp + * + * Test program to check AMQP functionality based on Libevent + * + * @author Brent Dimmig + */ + +/** + * Dependencies + */ +#include +#include +#include + + +/** + * Main program + * @return int + */ +int main() +{ + // access to the event loop + auto evbase = event_base_new(); + + // handler for libevent + AMQP::LibEventHandler handler(evbase); + + // make a connection + AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/")); + + // we need a channel too + AMQP::TcpChannel channel(&connection); + + // create a temporary queue + channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) { + + // report the name of the temporary queue + std::cout << "declared queue " << name << std::endl; + + // now we can close the connection + connection.close(); + }); + + // run the loop + event_base_dispatch(evbase); + + event_base_free(evbase); + + // done + return 0; +} +