Added example/test application to demonstrate boost asio io_service handler.
This commit is contained in:
parent
5410f1abdd
commit
034e72bbc2
|
|
@ -3,9 +3,11 @@
|
||||||
*
|
*
|
||||||
* Implementation for the AMQP::TcpHandler that is optimized for boost::asio. You can
|
* Implementation for the AMQP::TcpHandler that is optimized for boost::asio. You can
|
||||||
* use this class instead of a AMQP::TcpHandler class, just pass the boost asio service
|
* use this class instead of a AMQP::TcpHandler class, just pass the boost asio service
|
||||||
* to the constructor and you're all set
|
* to the constructor and you're all set. See tests/libboostasio.cpp for example.
|
||||||
*
|
*
|
||||||
* @author Gavin Smith <gavin.smith@coralbay.tv>
|
* @author Gavin Smith <gavin.smith@coralbay.tv>
|
||||||
|
*
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -19,9 +21,10 @@
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
#include <boost/asio/io_service.hpp>
|
#include <boost/asio/io_service.hpp>
|
||||||
|
#include <boost/asio/strand.hpp>
|
||||||
|
#include <boost/asio/deadline_timer.hpp>
|
||||||
#include <boost/asio/posix/stream_descriptor.hpp>
|
#include <boost/asio/posix/stream_descriptor.hpp>
|
||||||
#include <boost/bind.hpp>
|
#include <boost/bind.hpp>
|
||||||
#include <boost/function.hpp>
|
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////
|
||||||
|
|
@ -78,7 +81,7 @@ private:
|
||||||
* @var class boost::asio::io_service&
|
* @var class boost::asio::io_service&
|
||||||
*/
|
*/
|
||||||
boost::asio::io_service & _ioservice;
|
boost::asio::io_service & _ioservice;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The boost asio io_service::strand managed pointer.
|
* The boost asio io_service::strand managed pointer.
|
||||||
* @var class std::shared_ptr<boost::asio::io_service>
|
* @var class std::shared_ptr<boost::asio::io_service>
|
||||||
|
|
@ -127,7 +130,7 @@ private:
|
||||||
* @note The handler will get called if a read is cancelled.
|
* @note The handler will get called if a read is cancelled.
|
||||||
*/
|
*/
|
||||||
void read_handler(const boost::system::error_code &ec,
|
void read_handler(const boost::system::error_code &ec,
|
||||||
const std::size_t bytes_transferred,
|
const std::size_t bytes_transferred,
|
||||||
const std::weak_ptr<Watcher> awpWatcher,
|
const std::weak_ptr<Watcher> awpWatcher,
|
||||||
TcpConnection *const connection,
|
TcpConnection *const connection,
|
||||||
const int fd)
|
const int fd)
|
||||||
|
|
@ -144,8 +147,8 @@ private:
|
||||||
connection->process(fd, AMQP::readable);
|
connection->process(fd, AMQP::readable);
|
||||||
|
|
||||||
_read_pending = true;
|
_read_pending = true;
|
||||||
|
|
||||||
_socket.async_read_some(boost::asio::null_buffers(),
|
_socket.async_read_some(boost::asio::null_buffers(),
|
||||||
STRAND_SOCKET_HANDLER(
|
STRAND_SOCKET_HANDLER(
|
||||||
boost::bind(&Watcher::read_handler,
|
boost::bind(&Watcher::read_handler,
|
||||||
this,
|
this,
|
||||||
|
|
@ -313,13 +316,13 @@ private:
|
||||||
* @var class boost::asio::io_service&
|
* @var class boost::asio::io_service&
|
||||||
*/
|
*/
|
||||||
boost::asio::io_service & _ioservice;
|
boost::asio::io_service & _ioservice;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The boost asio io_service::strand managed pointer.
|
* The boost asio io_service::strand managed pointer.
|
||||||
* @var class std::shared_ptr<boost::asio::io_service>
|
* @var class std::shared_ptr<boost::asio::io_service>
|
||||||
*/
|
*/
|
||||||
std::weak_ptr<boost::asio::io_service::strand> _strand;
|
std::weak_ptr<boost::asio::io_service::strand> _strand;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The boost asynchronous deadline timer.
|
* The boost asynchronous deadline timer.
|
||||||
* @var class boost::asio::deadline_timer
|
* @var class boost::asio::deadline_timer
|
||||||
|
|
@ -483,7 +486,7 @@ private:
|
||||||
std::make_shared<Watcher>(_ioservice, _strand, fd);
|
std::make_shared<Watcher>(_ioservice, _strand, fd);
|
||||||
|
|
||||||
_watchers[fd] = apWatcher;
|
_watchers[fd] = apWatcher;
|
||||||
|
|
||||||
// explicitly set the events to monitor
|
// explicitly set the events to monitor
|
||||||
apWatcher->events(connection, fd, flags);
|
apWatcher->events(connection, fd, flags);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* LibBoostAsio.cpp
|
||||||
|
*
|
||||||
|
* Test program to check AMQP functionality based on Boost's asio io_service.
|
||||||
|
*
|
||||||
|
* @author Gavin Smith <gavin.smith@coralbay.tv>
|
||||||
|
*
|
||||||
|
* Compile with g++ libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dependencies
|
||||||
|
*/
|
||||||
|
#include <boost/asio/io_service.hpp>
|
||||||
|
#include <boost/asio/strand.hpp>
|
||||||
|
#include <boost/asio/deadline_timer.hpp>
|
||||||
|
|
||||||
|
|
||||||
|
#include <amqpcpp.h>
|
||||||
|
#include <amqpcpp/libboostasio.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Main program
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
int main()
|
||||||
|
{
|
||||||
|
|
||||||
|
// access to the boost asio handler
|
||||||
|
// note: we suggest use of 2 threads - normally one is fin (we are simply demonstrating thread safety).
|
||||||
|
boost::asio::io_service service(4);
|
||||||
|
|
||||||
|
// create a work object to process our events.
|
||||||
|
boost::asio::io_service::work work(service);
|
||||||
|
|
||||||
|
// handler for libev
|
||||||
|
AMQP::LibBoostAsioHandler handler(service);
|
||||||
|
|
||||||
|
// make a connection
|
||||||
|
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost/"));
|
||||||
|
|
||||||
|
// we need a channel too
|
||||||
|
AMQP::TcpChannel channel(&connection);
|
||||||
|
|
||||||
|
// create a temporary queue
|
||||||
|
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
|
||||||
|
|
||||||
|
// report the name of the temporary queue
|
||||||
|
std::cout << "declared queue " << name << std::endl;
|
||||||
|
|
||||||
|
// now we can close the connection
|
||||||
|
connection.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
// run the handler
|
||||||
|
// a t the moment, one will need SIGINT to stop. In time, should add signal handling through boost API.
|
||||||
|
return service.run();
|
||||||
|
}
|
||||||
|
|
||||||
Loading…
Reference in New Issue