diff --git a/README.md b/README.md index 84e4b9d..f0ff033 100644 --- a/README.md +++ b/README.md @@ -333,8 +333,8 @@ class MyTcpHandler : public AMQP::TcpHandler ```` The "monitor()" method can be used to integrate the AMQP filedescriptors in your -application's event loop. For some popular event loops (libev), we have already -added example handler objects (see the next section for that). +application's event loop. For some popular event loops (libev, libevent), we have +already added example handler objects (see the next section for that). Using the TCP module of the AMQP-CPP library is easier than using the raw AMQP::Connection and AMQP::Channel objects, because you do not have to @@ -373,9 +373,9 @@ call), or if you use an existing library for it (like libevent, libev or libuv), you can implement the "monitor()" method to watch the file descriptors and hand over control back to AMQP-CPP when one of the sockets become active. -For libev users, we have even implemented an example implementation, so that -you do not even have to do this. Instead of implementing the monitor() method -yourself, you can use the AMQP::LibEvHandler class instead: +For libev and libevent users, we have even implemented an example implementation, +so that you do not even have to do this. Instead of implementing the monitor() method +yourself, you can use the AMQP::LibEvHandler or AMQP:LibEventHandler classes instead: ````c++ #include @@ -414,15 +414,15 @@ int main() } ```` -The AMQP::LibEvHandler class is an extended AMQP::TcpHandler class, with an -implementation of the monitor() method that simply adds the filedescriptor to the -libev event loop. If you use this class however, it is recommended not to +The AMQP::LibEvHandler and AMQP::LibEventHandler classes are extended AMQP::TcpHandler +classes, with an implementation of the monitor() method that simply adds the +filedescriptor to the event loop. If you use this class however, it is recommended not to instantiate it directly (like we did in the example), but to create your own "MyHandler" class that extends from it, and in which you also implement the onError() method to report possible connection errors to your end users. -Currently, we have only added such an example TcpHandler implementation for libev. -For other event loops (like libevent, libev and boost asio) we do not yet have +Currently, we have only added such an example TcpHandler implementation for libev and +libevent. For other event loops (like libuv and boost asio) we do not yet have such examples. diff --git a/include/libevent.h b/include/libevent.h new file mode 100644 index 0000000..7621dcc --- /dev/null +++ b/include/libevent.h @@ -0,0 +1,190 @@ +/** + * LibEvent.h + * + * Implementation for the AMQP::TcpHandler that is optimized for libevent. You can + * use this class instead of a AMQP::TcpHandler class, just pass the event loop + * to the constructor and you're all set + * + * Compile with: "g++ -std=c++11 libevent.cpp -lamqpcpp -levent -lpthread" + * + * @author Brent Dimmig + */ + +/** + * Include guard + */ +#pragma once + +/** + * Dependencies + */ +#include +#include + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class LibEventHandler : public TcpHandler +{ +private: + /** + * Helper class that wraps a libev I/O watcher + */ + class Watcher + { + private: + /** + * The actual event structure + * @var struct event + */ + struct event * _event; + + /** + * Callback method that is called by libevent when a filedescriptor becomes active + * @param fd The filedescriptor with an event + * @param what Events triggered + * @param connection_arg void * to the connection + */ + static void callback(evutil_socket_t fd, short what, void *connection_arg) + { + // retrieve the connection + TcpConnection *connection = static_cast(connection_arg); + + // setup amqp flags + int amqp_flags = 0; + if (what & EV_READ) + amqp_flags |= AMQP::readable; + if (what & EV_WRITE) + amqp_flags |= AMQP::writable; + + // tell the connection that its filedescriptor is active + connection->process(fd, amqp_flags); + } + + public: + /** + * Constructor + * @param evbase The current event loop + * @param connection The connection being watched + * @param fd The filedescriptor being watched + * @param events The events that should be monitored + */ + Watcher(struct event_base *evbase, TcpConnection *connection, int fd, int events) + { + // setup libevent flags + short event_flags = EV_PERSIST; + if (events & AMQP::readable) + event_flags |= EV_READ; + if (events & AMQP::writable) + event_flags |= EV_WRITE; + + // initialize the event + + _event = event_new(evbase, fd, event_flags, callback, connection); + event_add(_event, nullptr); + } + + /** + * Destructor + */ + virtual ~Watcher() + { + // stop the event + event_del(_event); + // free the event + event_free(_event); + } + + /** + * Change the events for which the filedescriptor is monitored + * @param events + */ + void events(int events) + { + // stop the event if it was active + event_del(_event); + + // setup libevent flags + short event_flags = EV_PERSIST; + if (events & AMQP::readable) + event_flags |= EV_READ; + if (events & AMQP::writable) + event_flags |= EV_WRITE; + + // set the events + event_assign(_event, event_get_base(_event), event_get_fd(_event), event_flags, + event_get_callback(_event), event_get_callback_arg(_event)); + + // and restart it + event_add(_event, nullptr); + } + }; + + + /** + * The event loop + * @var struct event_base* + */ + struct event_base *_evbase; + + /** + * All I/O watchers that are active, indexed by their filedescriptor + * @var std::map + */ + std::map> _watchers; + + + /** + * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability + * @param connection The TCP connection object that is reporting + * @param fd The filedescriptor to be monitored + * @param flags Should the object be monitored for readability or writability? + */ + virtual void monitor(TcpConnection *connection, int fd, int flags) override + { + // do we already have this filedescriptor + auto iter = _watchers.find(fd); + + // was it found? + if (iter == _watchers.end()) + { + // we did not yet have this watcher - but that is ok if no filedescriptor was registered + if (flags == 0) return; + + // construct a new watcher, and put it in the map + _watchers[fd] = std::unique_ptr(new Watcher(_evbase, connection, fd, flags)); + } + else if (flags == 0) + { + // the watcher does already exist, but we no longer have to watch this watcher + _watchers.erase(iter); + } + else + { + // change the events + iter->second->events(flags); + } + } + +public: + /** + * Constructor + * @param evbase The event loop to wrap + */ + LibEventHandler(struct event_base *evbase) : _evbase(evbase) {} + + /** + * Destructor + */ + virtual ~LibEventHandler() = default; +}; + +/** + * End of namespace + */ +} diff --git a/tests/libevent.cpp b/tests/libevent.cpp new file mode 100644 index 0000000..360418d --- /dev/null +++ b/tests/libevent.cpp @@ -0,0 +1,53 @@ +/** + * Libevent.cpp + * + * Test program to check AMQP functionality based on Libevent + * + * @author Brent Dimmig + */ + +/** + * Dependencies + */ +#include +#include +#include + + +/** + * Main program + * @return int + */ +int main() +{ + // access to the event loop + auto evbase = event_base_new(); + + // handler for libevent + AMQP::LibEventHandler handler(evbase); + + // make a connection + AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/")); + + // we need a channel too + AMQP::TcpChannel channel(&connection); + + // create a temporary queue + channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) { + + // report the name of the temporary queue + std::cout << "declared queue " << name << std::endl; + + // now we can close the connection + connection.close(); + }); + + // run the loop + event_base_dispatch(evbase); + + event_base_free(evbase); + + // done + return 0; +} +