Merge pull request #54 from brentnd/master

Add TcpHandler implementation for libevent
This commit is contained in:
Emiel Bruijntjes 2016-01-10 13:07:49 +01:00
commit 25ce57818a
3 changed files with 253 additions and 10 deletions

View File

@ -333,8 +333,8 @@ class MyTcpHandler : public AMQP::TcpHandler
````
The "monitor()" method can be used to integrate the AMQP filedescriptors in your
application's event loop. For some popular event loops (libev), we have already
added example handler objects (see the next section for that).
application's event loop. For some popular event loops (libev, libevent), we have
already added example handler objects (see the next section for that).
Using the TCP module of the AMQP-CPP library is easier than using the
raw AMQP::Connection and AMQP::Channel objects, because you do not have to
@ -373,9 +373,9 @@ call), or if you use an existing library for it (like libevent, libev or libuv),
you can implement the "monitor()" method to watch the file descriptors and
hand over control back to AMQP-CPP when one of the sockets become active.
For libev users, we have even implemented an example implementation, so that
you do not even have to do this. Instead of implementing the monitor() method
yourself, you can use the AMQP::LibEvHandler class instead:
For libev and libevent users, we have even implemented an example implementation,
so that you do not even have to do this. Instead of implementing the monitor() method
yourself, you can use the AMQP::LibEvHandler or AMQP:LibEventHandler classes instead:
````c++
#include <ev.h>
@ -414,15 +414,15 @@ int main()
}
````
The AMQP::LibEvHandler class is an extended AMQP::TcpHandler class, with an
implementation of the monitor() method that simply adds the filedescriptor to the
libev event loop. If you use this class however, it is recommended not to
The AMQP::LibEvHandler and AMQP::LibEventHandler classes are extended AMQP::TcpHandler
classes, with an implementation of the monitor() method that simply adds the
filedescriptor to the event loop. If you use this class however, it is recommended not to
instantiate it directly (like we did in the example), but to create your own
"MyHandler" class that extends from it, and in which you also implement the
onError() method to report possible connection errors to your end users.
Currently, we have only added such an example TcpHandler implementation for libev.
For other event loops (like libevent, libev and boost asio) we do not yet have
Currently, we have only added such an example TcpHandler implementation for libev and
libevent. For other event loops (like libuv and boost asio) we do not yet have
such examples.

190
include/libevent.h Normal file
View File

@ -0,0 +1,190 @@
/**
* LibEvent.h
*
* Implementation for the AMQP::TcpHandler that is optimized for libevent. You can
* use this class instead of a AMQP::TcpHandler class, just pass the event loop
* to the constructor and you're all set
*
* Compile with: "g++ -std=c++11 libevent.cpp -lamqpcpp -levent -lpthread"
*
* @author Brent Dimmig <brentdimmig@gmail.com>
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include <event2/event.h>
#include <amqpcpp/flags.h>
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class LibEventHandler : public TcpHandler
{
private:
/**
* Helper class that wraps a libev I/O watcher
*/
class Watcher
{
private:
/**
* The actual event structure
* @var struct event
*/
struct event * _event;
/**
* Callback method that is called by libevent when a filedescriptor becomes active
* @param fd The filedescriptor with an event
* @param what Events triggered
* @param connection_arg void * to the connection
*/
static void callback(evutil_socket_t fd, short what, void *connection_arg)
{
// retrieve the connection
TcpConnection *connection = static_cast<TcpConnection*>(connection_arg);
// setup amqp flags
int amqp_flags = 0;
if (what & EV_READ)
amqp_flags |= AMQP::readable;
if (what & EV_WRITE)
amqp_flags |= AMQP::writable;
// tell the connection that its filedescriptor is active
connection->process(fd, amqp_flags);
}
public:
/**
* Constructor
* @param evbase The current event loop
* @param connection The connection being watched
* @param fd The filedescriptor being watched
* @param events The events that should be monitored
*/
Watcher(struct event_base *evbase, TcpConnection *connection, int fd, int events)
{
// setup libevent flags
short event_flags = EV_PERSIST;
if (events & AMQP::readable)
event_flags |= EV_READ;
if (events & AMQP::writable)
event_flags |= EV_WRITE;
// initialize the event
_event = event_new(evbase, fd, event_flags, callback, connection);
event_add(_event, nullptr);
}
/**
* Destructor
*/
virtual ~Watcher()
{
// stop the event
event_del(_event);
// free the event
event_free(_event);
}
/**
* Change the events for which the filedescriptor is monitored
* @param events
*/
void events(int events)
{
// stop the event if it was active
event_del(_event);
// setup libevent flags
short event_flags = EV_PERSIST;
if (events & AMQP::readable)
event_flags |= EV_READ;
if (events & AMQP::writable)
event_flags |= EV_WRITE;
// set the events
event_assign(_event, event_get_base(_event), event_get_fd(_event), event_flags,
event_get_callback(_event), event_get_callback_arg(_event));
// and restart it
event_add(_event, nullptr);
}
};
/**
* The event loop
* @var struct event_base*
*/
struct event_base *_evbase;
/**
* All I/O watchers that are active, indexed by their filedescriptor
* @var std::map<int,Watcher>
*/
std::map<int,std::unique_ptr<Watcher>> _watchers;
/**
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
* @param connection The TCP connection object that is reporting
* @param fd The filedescriptor to be monitored
* @param flags Should the object be monitored for readability or writability?
*/
virtual void monitor(TcpConnection *connection, int fd, int flags) override
{
// do we already have this filedescriptor
auto iter = _watchers.find(fd);
// was it found?
if (iter == _watchers.end())
{
// we did not yet have this watcher - but that is ok if no filedescriptor was registered
if (flags == 0) return;
// construct a new watcher, and put it in the map
_watchers[fd] = std::unique_ptr<Watcher>(new Watcher(_evbase, connection, fd, flags));
}
else if (flags == 0)
{
// the watcher does already exist, but we no longer have to watch this watcher
_watchers.erase(iter);
}
else
{
// change the events
iter->second->events(flags);
}
}
public:
/**
* Constructor
* @param evbase The event loop to wrap
*/
LibEventHandler(struct event_base *evbase) : _evbase(evbase) {}
/**
* Destructor
*/
virtual ~LibEventHandler() = default;
};
/**
* End of namespace
*/
}

53
tests/libevent.cpp Normal file
View File

@ -0,0 +1,53 @@
/**
* Libevent.cpp
*
* Test program to check AMQP functionality based on Libevent
*
* @author Brent Dimmig <brentdimmig@gmail.com>
*/
/**
* Dependencies
*/
#include <event2/event.h>
#include <amqpcpp.h>
#include <amqpcpp/libevent.h>
/**
* Main program
* @return int
*/
int main()
{
// access to the event loop
auto evbase = event_base_new();
// handler for libevent
AMQP::LibEventHandler handler(evbase);
// make a connection
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/"));
// we need a channel too
AMQP::TcpChannel channel(&connection);
// create a temporary queue
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
// report the name of the temporary queue
std::cout << "declared queue " << name << std::endl;
// now we can close the connection
connection.close();
});
// run the loop
event_base_dispatch(evbase);
event_base_free(evbase);
// done
return 0;
}