From 2754800ab91742af8081dcb4756239d8d15b57b4 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Mon, 1 Jun 2015 08:44:02 +0200 Subject: [PATCH] changed directories --- examples/boost/CMakeLists.txt | 37 +++ examples/boost/main.cpp | 8 + examples/rabbitmq_tutorials/CMakeLists.txt | 47 ++++ examples/rabbitmq_tutorials/README.md | 44 ++++ examples/rabbitmq_tutorials/asiohandler.cpp | 222 ++++++++++++++++++ examples/rabbitmq_tutorials/asiohandler.h | 59 +++++ examples/rabbitmq_tutorials/emit_log.cpp | 29 +++ .../rabbitmq_tutorials/emit_log_direct.cpp | 31 +++ .../rabbitmq_tutorials/emit_log_topic.cpp | 31 +++ examples/rabbitmq_tutorials/new_task.cpp | 35 +++ examples/rabbitmq_tutorials/receive.cpp | 28 +++ examples/rabbitmq_tutorials/receive_logs.cpp | 41 ++++ .../receive_logs_direct.cpp | 52 ++++ .../rabbitmq_tutorials/receive_logs_topic.cpp | 55 +++++ examples/rabbitmq_tutorials/rpc_client.cpp | 46 ++++ examples/rabbitmq_tutorials/rpc_server.cpp | 51 ++++ examples/rabbitmq_tutorials/send.cpp | 29 +++ examples/rabbitmq_tutorials/tools.h | 36 +++ examples/rabbitmq_tutorials/worker.cpp | 40 ++++ 19 files changed, 921 insertions(+) create mode 100644 examples/boost/CMakeLists.txt create mode 100644 examples/boost/main.cpp create mode 100644 examples/rabbitmq_tutorials/CMakeLists.txt create mode 100644 examples/rabbitmq_tutorials/README.md create mode 100644 examples/rabbitmq_tutorials/asiohandler.cpp create mode 100644 examples/rabbitmq_tutorials/asiohandler.h create mode 100644 examples/rabbitmq_tutorials/emit_log.cpp create mode 100644 examples/rabbitmq_tutorials/emit_log_direct.cpp create mode 100644 examples/rabbitmq_tutorials/emit_log_topic.cpp create mode 100644 examples/rabbitmq_tutorials/new_task.cpp create mode 100644 examples/rabbitmq_tutorials/receive.cpp create mode 100644 examples/rabbitmq_tutorials/receive_logs.cpp create mode 100644 examples/rabbitmq_tutorials/receive_logs_direct.cpp create mode 100644 examples/rabbitmq_tutorials/receive_logs_topic.cpp create mode 100644 examples/rabbitmq_tutorials/rpc_client.cpp create mode 100644 examples/rabbitmq_tutorials/rpc_server.cpp create mode 100644 examples/rabbitmq_tutorials/send.cpp create mode 100644 examples/rabbitmq_tutorials/tools.h create mode 100644 examples/rabbitmq_tutorials/worker.cpp diff --git a/examples/boost/CMakeLists.txt b/examples/boost/CMakeLists.txt new file mode 100644 index 0000000..0669683 --- /dev/null +++ b/examples/boost/CMakeLists.txt @@ -0,0 +1,37 @@ +set(Boost_USE_STATIC_LIBS ON) +set(Boost_USE_MULTITHREADED ON) +set(Boost_USE_STATIC_RUNTIME OFF) +find_package(Boost 1.55 REQUIRED COMPONENTS thread system) + +if(NOT Boost_FOUND) + message( FATAL_ERROR "boost must be installed") +endif() + +include_directories(${Boost_INCLUDE_DIRS}) + +set(SRC main.cpp) + +if("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") +set(SPECIFIC_HOST_LIBS rt pthread) +else() +set(SPECIFIC_HOST_LIBS) +endif() + +set(LIBS +${Boost_LIBRARIES} +amqp-cpp +) + +include_directories(SYSTEM ${CRYPTLIB_INCLUDE_PATH}) +link_directories(${Boost_LIBRARY_PATH}) + +include_directories(SYSTEM ${AMQP-CPP_INCLUDE_PATH}) + +add_executable(amqp_boost_test ${SRC}) +target_link_libraries(amqp_boost_test ${LIBS} ${SPECIFIC_HOST_LIBS}) + +install(TARGETS amqp_boost_test + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib +) diff --git a/examples/boost/main.cpp b/examples/boost/main.cpp new file mode 100644 index 0000000..0dd80bc --- /dev/null +++ b/examples/boost/main.cpp @@ -0,0 +1,8 @@ +#include + +int main(int argc, const char* argv[]) +{ + + + return 0; +} \ No newline at end of file diff --git a/examples/rabbitmq_tutorials/CMakeLists.txt b/examples/rabbitmq_tutorials/CMakeLists.txt new file mode 100644 index 0000000..e9bad11 --- /dev/null +++ b/examples/rabbitmq_tutorials/CMakeLists.txt @@ -0,0 +1,47 @@ +set(Boost_USE_STATIC_LIBS ON) +set(Boost_USE_MULTITHREADED ON) +set(Boost_USE_STATIC_RUNTIME OFF) +find_package(Boost 1.55 REQUIRED COMPONENTS system) + +if(NOT Boost_FOUND) + message( FATAL_ERROR "boost must be installed") +endif() + +include_directories(${Boost_INCLUDE_DIRS}) + +add_library(asio_handler asiohandler.cpp asiohandler.h) +target_link_libraries(asio_handler ${Boost_LIBRARIES}) + +set(PROGS send + receive + new_task + worker + emit_log + receive_logs + emit_log_direct + receive_logs_direct + emit_log_topic + receive_logs_topic + rpc_client + rpc_server +) + + +find_package (Threads) + +foreach(item ${PROGS}) + add_executable(${item} "${item}.cpp") + target_link_libraries(${item} amqp-cpp + asio_handler + ${CMAKE_THREAD_LIBS_INIT}) +endforeach(item) + +#little hack +file(GLOB AMQCPP_HEADERS ${PROJECT_SOURCE_DIR}/include/*) + +file(COPY ${AMQCPP_HEADERS} + DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/amqpcpp) + +include_directories(${PROJECT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR}) + \ No newline at end of file diff --git a/examples/rabbitmq_tutorials/README.md b/examples/rabbitmq_tutorials/README.md new file mode 100644 index 0000000..5976d59 --- /dev/null +++ b/examples/rabbitmq_tutorials/README.md @@ -0,0 +1,44 @@ +# C++ code for RabbitMQ tutorials + +## Requirements + +* Boost Asio +* Boost Uuid + + +## Code + +[Tutorial one: "Hello World!"](http://www.rabbitmq.com/tutorial-one-python.html): + + send + receive + + +[Tutorial two: Work Queues](http://www.rabbitmq.com/tutorial-two-python.html): + + new_task "A very hard task which takes two seconds.." + worker + + +[Tutorial three: Publish/Subscribe](http://www.rabbitmq.com/tutorial-three-python.html): + + receive_logs + emit_log "info: This is the log message" + + +[Tutorial four: Routing](http://www.rabbitmq.com/tutorial-four-python.html): + + receive_logs_direct info + emit_log_direct info "The message" + + +[Tutorial five: Topics](http://www.rabbitmq.com/tutorial-five-python.html): + + receive_logs_topic "*.rabbit" + emit_log_topic red.rabbit Hello + + +[Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-python.html): + + rpc_server + rpc_client diff --git a/examples/rabbitmq_tutorials/asiohandler.cpp b/examples/rabbitmq_tutorials/asiohandler.cpp new file mode 100644 index 0000000..4dec335 --- /dev/null +++ b/examples/rabbitmq_tutorials/asiohandler.cpp @@ -0,0 +1,222 @@ +#include + +#include "asiohandler.h" + +using boost::asio::ip::tcp; + +class AmqpBuffer +{ +public: + AmqpBuffer(size_t size) : + _data(size, 0), + _use(0) + { + } + + size_t write(const char* data, size_t size) + { + if (_use == _data.size()) + { + return 0; + } + + const size_t length = (size + _use); + size_t write = length < _data.size() ? size : _data.size() - _use; + memcpy(_data.data() + _use, data, write); + _use += write; + return write; + } + + void drain() + { + _use = 0; + } + + size_t available() const + { + return _use; + } + + const char* data() const + { + return _data.data(); + } + + void shl(size_t count) + { + assert(count < _use); + + const size_t diff = _use - count; + std::memmove(_data.data(), _data.data() + count, diff); + _use = _use - count; + } + +private: + std::vector _data; + size_t _use; +}; + +AsioHandler::AsioHandler(boost::asio::io_service& ioService) : + _ioService(ioService), + _socket(ioService), + _timer(ioService), + _asioInputBuffer(ASIO_INPUT_BUFFER_SIZE, 0), + _amqpBuffer(new AmqpBuffer(ASIO_INPUT_BUFFER_SIZE * 2)), + _connection(nullptr), + _writeInProgress(false), + _connected(false), + _quit(false) +{ +} + +AsioHandler::~AsioHandler() +{ +} + +void AsioHandler::connect(const std::string& host, uint16_t port) +{ + doConnect(host, port); +} + +void AsioHandler::doConnect(const std::string& host, uint16_t port) +{ + tcp::resolver::query query(host, std::to_string(port)); + tcp::resolver::iterator iter = tcp::resolver(_ioService).resolve(query); + _timer.expires_from_now(boost::posix_time::seconds(15)); + _timer.async_wait([this](const boost::system::error_code& ec){ + if(!ec && !_connected) + { + std::cerr<<"Connection timed out"; + _socket.cancel(); + exit(1); + } + }); + + boost::asio::async_connect(_socket, iter, + [this](boost::system::error_code ec, tcp::resolver::iterator) + { + _connected = true; + if (!ec) + { + doRead(); + + if(!_outputBuffer.empty()) + { + doWrite(); + } + } + else + { + std::cerr<<"Connection error:"<(data, data + size)); + if (!_writeInProgress && _connected) + { + doWrite(); + } +} + +void AsioHandler::doRead() +{ + _socket.async_receive(boost::asio::buffer(_asioInputBuffer), + [this](boost::system::error_code ec, std::size_t length) + { + if (!ec) + { + _amqpBuffer->write(_asioInputBuffer.data(), length); + parseData(); + doRead(); + } + else + { + std::cerr<<"Error reading:"<parse(_amqpBuffer->data(), + _amqpBuffer->available()); + + if (count == _amqpBuffer->available()) + { + _amqpBuffer->drain(); + } + else if (count > 0) + { + _amqpBuffer->shl(count); + } +} + +void AsioHandler::onConnected(AMQP::Connection *connection) +{ +} +bool AsioHandler::connected() const +{ + return _connected; +} + +void AsioHandler::onError(AMQP::Connection *connection, const char *message) +{ + std::cerr << "AMQP error " << message << std::endl; +} + +void AsioHandler::onClosed(AMQP::Connection *connection) +{ + std::cout << "AMQP closed connection" << std::endl; + _quit = true; + if (!_writeInProgress) + { + _socket.close(); + } +} diff --git a/examples/rabbitmq_tutorials/asiohandler.h b/examples/rabbitmq_tutorials/asiohandler.h new file mode 100644 index 0000000..490e20f --- /dev/null +++ b/examples/rabbitmq_tutorials/asiohandler.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include + +#include +#include + +class AmqpBuffer; +class AsioHandler: public AMQP::ConnectionHandler +{ +private: + + typedef std::deque> OutputBuffers; + + virtual void onData(AMQP::Connection *connection, const char *data, size_t size); + virtual void onConnected(AMQP::Connection *connection); + virtual void onError(AMQP::Connection *connection, const char *message); + virtual void onClosed(AMQP::Connection *connection); + + void doConnect(const std::string& host, uint16_t port); + + void doRead(); + + void doWrite(); + + void parseData(); + +private: + + boost::asio::io_service& _ioService; + boost::asio::ip::tcp::socket _socket; + boost::asio::deadline_timer _timer; + + std::vector _asioInputBuffer; + std::shared_ptr _amqpBuffer; + AMQP::Connection* _connection; + OutputBuffers _outputBuffer; + bool _writeInProgress; + bool _connected; + bool _quit; + +public: + + static constexpr size_t ASIO_INPUT_BUFFER_SIZE = 4*1024; //4kb + + AsioHandler(boost::asio::io_service& ioService); + + void connect(const std::string& host, uint16_t port); + + virtual ~AsioHandler(); + + AsioHandler(const AsioHandler&) = delete; + AsioHandler& operator=(const AsioHandler&)=delete; + + bool connected()const; +}; + diff --git a/examples/rabbitmq_tutorials/emit_log.cpp b/examples/rabbitmq_tutorials/emit_log.cpp new file mode 100644 index 0000000..d5cfe7a --- /dev/null +++ b/examples/rabbitmq_tutorials/emit_log.cpp @@ -0,0 +1,29 @@ +#include +#include + +#include "tools.h" +#include "asiohandler.h" + +int main(int argc, const char* argv[]) +{ + const std::string msg = + argc > 1 ? join(&argv[1], &argv[argc], " ") : "info: Hello World!"; + + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + + boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100)); + AMQP::Channel channel(&connection); + channel.declareExchange("logs", AMQP::fanout).onSuccess([&]() + { + channel.publish("logs", "", msg); + std::cout << " [x] Sent "< +#include + +#include "tools.h" +#include "asiohandler.h" + +int main(int argc, const char* argv[]) +{ + const std::string severity = argc > 2 ? argv[1] : "info"; + const std::string msg = + argc > 2 ? join(&argv[2], &argv[argc], " ") : "Hello World!"; + + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + + boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100)); + AMQP::Channel channel(&connection); + channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]() + { + channel.publish("direct_logs", severity, msg); + std::cout << " [x] Sent "< +#include + +#include "tools.h" +#include "asiohandler.h" + +int main(int argc, const char* argv[]) +{ + const std::string msg = + argc > 1 ? join(&argv[2], &argv[argc], " ") : "Hello World!"; + const std::string routing_key = argc > 1 ? argv[1] : "anonymous.info"; + + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + + boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100)); + AMQP::Channel channel(&connection); + channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]() + { + channel.publish("topic_logs", routing_key, msg); + std::cout << " [x] Sent "< +#include + +#include "tools.h" +#include "asiohandler.h" + +int main(int argc, const char* argv[]) +{ + const std::string msg = + argc > 1 ? join(&argv[1], &argv[argc], " ") : "Hello World!"; + + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + AMQP::Channel channel(&connection); + + boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100)); + AMQP::QueueCallback callback = + [&](const std::string &name, int msgcount, int consumercount) + { + AMQP::Envelope env(msg); + env.setDeliveryMode(2); + channel.publish("", "task_queue", env); + std::cout<<" [x] Sent '"< + +#include "asiohandler.h" + +int main(void) +{ + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + + AMQP::Channel channel(&connection); + channel.declareQueue("hello"); + channel.consume("hello", AMQP::noack).onReceived( + [](const AMQP::Message &message, + uint64_t deliveryTag, + bool redelivered) + { + + std::cout <<" [x] Received "< +#include + +#include "asiohandler.h" + +int main(void) +{ + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + + AMQP::Channel channel(&connection); + auto receiveMessageCallback = [](const AMQP::Message &message, + uint64_t deliveryTag, + bool redelivered) + { + + std::cout <<" [x] "< +#include + +#include "asiohandler.h" + +int main(int argc, const char* argv[]) +{ + if(argc==1) + { + std::cout<<"Usage: "< +#include + +#include "asiohandler.h" + +int main(int argc, const char* argv[]) +{ + if(argc==1) + { + std::cout<<"Usage: "< + +#include "tools.h" +#include "asiohandler.h" + +int main(int argc, const char* argv[]) +{ + const std::string correlation(uuid()); + + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + + AMQP::Channel channel(&connection); + AMQP::QueueCallback callback = [&](const std::string &name, + int msgcount, + int consumercount) + { + AMQP::Envelope env("30"); + env.setCorrelationID(correlation); + env.setReplyTo(name); + channel.publish("","rpc_queue",env); + std::cout<<" [x] Requesting fib(30)"< +#include +#include +#include + +#include "asiohandler.h" + +int fib(int n) +{ + switch (n) + { + case 0: + return 0; + case 1: + return 1; + default: + return fib(n - 1) + fib(n - 2); + } +} + +int main(void) +{ + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + + AMQP::Channel channel(&connection); + channel.setQos(1); + + channel.declareQueue("rpc_queue"); + channel.consume("").onReceived([&channel](const AMQP::Message &message, + uint64_t deliveryTag, + bool redelivered) + { + const auto body = message.message(); + std::cout<<" [.] fib("< +#include + +#include "asiohandler.h" + +int main(void) +{ + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + AMQP::Channel channel(&connection); + + boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100)); + channel.onReady([&]() + { + if(handler.connected()) + { + channel.publish("", "hello", "Hello World!"); + std::cout << " [x] Sent 'Hello World!'" << std::endl; + + t.async_wait([&](const boost::system::error_code&){ioService.stop();}); + } + }); + + ioService.run(); + return 0; +} diff --git a/examples/rabbitmq_tutorials/tools.h b/examples/rabbitmq_tutorials/tools.h new file mode 100644 index 0000000..ab185f3 --- /dev/null +++ b/examples/rabbitmq_tutorials/tools.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + +#include +#include +#include + + +template +std::string join(Iterator first, const Iterator last, + const std::string& separator) +{ + std::string str; + for (; first != last; ++first) + { + str.append(*first); + if (first != (last - 1)) + { + str.append(separator); + } + + } + return str; +} + +std::string uuid() +{ + boost::uuids::random_generator generator; + boost::uuids::uuid uuid(generator()); + + std::stringstream sstr; + sstr< +#include +#include +#include + +#include "asiohandler.h" + +int main(void) +{ + boost::asio::io_service ioService; + AsioHandler handler(ioService); + handler.connect("localhost", 5672); + + AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); + + AMQP::Channel channel(&connection); + channel.setQos(1); + + channel.declareQueue("task_queue", AMQP::durable); + channel.consume("task_queue").onReceived( + [&channel](const AMQP::Message &message, + uint64_t deliveryTag, + bool redelivered) + { + const auto body = message.message(); + std::cout<<" [x] Received "<