add RabbitMQ Tutorials
This commit is contained in:
parent
a93b88697d
commit
44027dbb52
|
|
@ -30,5 +30,10 @@ install(TARGETS amqp-cpp
|
|||
ARCHIVE DESTINATION lib
|
||||
)
|
||||
|
||||
option(BUILD_TUTORIALS "build rabbitmq tutorials" OFF)
|
||||
if(BUILD_TUTORIALS)
|
||||
add_subdirectory(rabbitmq_tutorials)
|
||||
endif()
|
||||
|
||||
set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR} PARENT_SCOPE)
|
||||
set(AMQP-CPP_INCLUDE_PATH ${CMAKE_CURRENT_SOURCE_DIR} PARENT_SCOPE)
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
set(Boost_USE_STATIC_LIBS ON)
|
||||
set(Boost_USE_MULTITHREADED ON)
|
||||
set(Boost_USE_STATIC_RUNTIME OFF)
|
||||
find_package(Boost 1.55 REQUIRED COMPONENTS system)
|
||||
|
||||
if(NOT Boost_FOUND)
|
||||
message( FATAL_ERROR "boost must be installed")
|
||||
endif()
|
||||
|
||||
include_directories(${Boost_INCLUDE_DIRS})
|
||||
|
||||
add_library(asio_handler asiohandler.cpp asiohandler.h)
|
||||
target_link_libraries(asio_handler ${Boost_LIBRARIES})
|
||||
|
||||
set(PROGS send
|
||||
receive
|
||||
new_task
|
||||
worker
|
||||
emit_log
|
||||
receive_logs
|
||||
emit_log_direct
|
||||
receive_logs_direct
|
||||
emit_log_topic
|
||||
receive_logs_topic
|
||||
rpc_client
|
||||
rpc_server
|
||||
)
|
||||
|
||||
|
||||
find_package (Threads)
|
||||
|
||||
foreach(item ${PROGS})
|
||||
add_executable(${item} "${item}.cpp")
|
||||
target_link_libraries(${item} amqp-cpp
|
||||
asio_handler
|
||||
${CMAKE_THREAD_LIBS_INIT})
|
||||
endforeach(item)
|
||||
|
||||
#little hack
|
||||
file(GLOB AMQCPP_HEADERS ${PROJECT_SOURCE_DIR}/include/*)
|
||||
|
||||
file(COPY ${AMQCPP_HEADERS}
|
||||
DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/amqpcpp)
|
||||
|
||||
include_directories(${PROJECT_SOURCE_DIR}
|
||||
${CMAKE_CURRENT_BINARY_DIR})
|
||||
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
# C++ code for RabbitMQ tutorials
|
||||
|
||||
## Requirements
|
||||
|
||||
* Boost Asio
|
||||
* Boost Uuid
|
||||
|
||||
|
||||
## Code
|
||||
|
||||
[Tutorial one: "Hello World!"](http://www.rabbitmq.com/tutorial-one-python.html):
|
||||
|
||||
send
|
||||
receive
|
||||
|
||||
|
||||
[Tutorial two: Work Queues](http://www.rabbitmq.com/tutorial-two-python.html):
|
||||
|
||||
new_task "A very hard task which takes two seconds.."
|
||||
worker
|
||||
|
||||
|
||||
[Tutorial three: Publish/Subscribe](http://www.rabbitmq.com/tutorial-three-python.html):
|
||||
|
||||
receive_logs
|
||||
emit_log "info: This is the log message"
|
||||
|
||||
|
||||
[Tutorial four: Routing](http://www.rabbitmq.com/tutorial-four-python.html):
|
||||
|
||||
receive_logs_direct info
|
||||
emit_log_direct info "The message"
|
||||
|
||||
|
||||
[Tutorial five: Topics](http://www.rabbitmq.com/tutorial-five-python.html):
|
||||
|
||||
receive_logs_topic "*.rabbit"
|
||||
emit_log_topic red.rabbit Hello
|
||||
|
||||
|
||||
[Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-python.html):
|
||||
|
||||
rpc_server
|
||||
rpc_client
|
||||
|
|
@ -0,0 +1,211 @@
|
|||
#include <iostream>
|
||||
|
||||
#include "asiohandler.h"
|
||||
|
||||
using boost::asio::ip::tcp;
|
||||
|
||||
class AmqpBuffer
|
||||
{
|
||||
public:
|
||||
AmqpBuffer(size_t size) :
|
||||
_data(size, 0),
|
||||
_use(0)
|
||||
{
|
||||
}
|
||||
|
||||
size_t write(const char* data, size_t size)
|
||||
{
|
||||
if (_use == _data.size())
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
const size_t length = (size + _use);
|
||||
size_t write = length < _data.size() ? size : _data.size() - _use;
|
||||
memcpy(_data.data() + _use, data, write);
|
||||
_use += write;
|
||||
return write;
|
||||
}
|
||||
|
||||
void drain()
|
||||
{
|
||||
_use = 0;
|
||||
}
|
||||
|
||||
size_t available() const
|
||||
{
|
||||
return _use;
|
||||
}
|
||||
|
||||
const char* data() const
|
||||
{
|
||||
return _data.data();
|
||||
}
|
||||
|
||||
void shl(size_t count)
|
||||
{
|
||||
assert(count < _use);
|
||||
|
||||
const size_t diff = _use - count;
|
||||
std::memmove(_data.data(), _data.data() + count, diff);
|
||||
_use = _use - count;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<char> _data;
|
||||
size_t _use;
|
||||
};
|
||||
|
||||
AsioHandler::AsioHandler(boost::asio::io_service& ioService) :
|
||||
_ioService(ioService),
|
||||
_socket(ioService),
|
||||
_asioInputBuffer(ASIO_INPUT_BUFFER_SIZE, 0),
|
||||
_amqpBuffer(new AmqpBuffer(ASIO_INPUT_BUFFER_SIZE * 2)),
|
||||
_connection(nullptr),
|
||||
_writeInProgress(false),
|
||||
_connected(false),
|
||||
_quit(false)
|
||||
{
|
||||
}
|
||||
|
||||
AsioHandler::~AsioHandler()
|
||||
{
|
||||
}
|
||||
|
||||
void AsioHandler::connect(const std::string& host, uint16_t port)
|
||||
{
|
||||
doConnect(host, port);
|
||||
}
|
||||
|
||||
void AsioHandler::doConnect(const std::string& host, uint16_t port)
|
||||
{
|
||||
tcp::resolver::query query(host, std::to_string(port));
|
||||
tcp::resolver::iterator iter = tcp::resolver(_ioService).resolve(query);
|
||||
|
||||
boost::asio::async_connect(_socket, iter,
|
||||
[this](boost::system::error_code ec, tcp::resolver::iterator)
|
||||
{
|
||||
_connected = true;
|
||||
if (!ec)
|
||||
{
|
||||
doRead();
|
||||
|
||||
if(!_outputBuffer.empty())
|
||||
{
|
||||
doWrite();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr<<ec<<std::endl;
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
void AsioHandler::onData(
|
||||
AMQP::Connection *connection, const char *data, size_t size)
|
||||
{
|
||||
_connection = connection;
|
||||
|
||||
_outputBuffer.push_back(std::vector<char>(data, data + size));
|
||||
if (!_writeInProgress && _connected)
|
||||
{
|
||||
doWrite();
|
||||
}
|
||||
}
|
||||
|
||||
void AsioHandler::doRead()
|
||||
{
|
||||
_socket.async_receive(boost::asio::buffer(_asioInputBuffer),
|
||||
[this](boost::system::error_code ec, std::size_t length)
|
||||
{
|
||||
if (!ec)
|
||||
{
|
||||
_amqpBuffer->write(_asioInputBuffer.data(), length);
|
||||
parseData();
|
||||
doRead();
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr<<ec<<std::endl;
|
||||
exit(1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void AsioHandler::doWrite()
|
||||
{
|
||||
_writeInProgress = true;
|
||||
boost::asio::async_write(_socket,
|
||||
boost::asio::buffer(_outputBuffer.front()),
|
||||
[this](boost::system::error_code ec, std::size_t length )
|
||||
{
|
||||
if(!ec)
|
||||
{
|
||||
_outputBuffer.pop_front();
|
||||
if(!_outputBuffer.empty())
|
||||
{
|
||||
doWrite();
|
||||
}
|
||||
else
|
||||
{
|
||||
_writeInProgress = false;
|
||||
}
|
||||
|
||||
if(_quit)
|
||||
{
|
||||
_socket.close();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr<<ec<<std::endl;
|
||||
_socket.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void AsioHandler::parseData()
|
||||
{
|
||||
if (_connection == nullptr)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
const size_t count = _connection->parse(_amqpBuffer->data(),
|
||||
_amqpBuffer->available());
|
||||
|
||||
if (count == _amqpBuffer->available())
|
||||
{
|
||||
_amqpBuffer->drain();
|
||||
}
|
||||
else if (count > 0)
|
||||
{
|
||||
_amqpBuffer->shl(count);
|
||||
}
|
||||
}
|
||||
|
||||
void AsioHandler::onConnected(AMQP::Connection *connection)
|
||||
{
|
||||
//_connected = true;
|
||||
}
|
||||
bool AsioHandler::connected() const
|
||||
{
|
||||
return _connected;
|
||||
}
|
||||
|
||||
void AsioHandler::onError(AMQP::Connection *connection, const char *message)
|
||||
{
|
||||
std::cerr << "AMQP error " << message << std::endl;
|
||||
}
|
||||
|
||||
void AsioHandler::onClosed(AMQP::Connection *connection)
|
||||
{
|
||||
std::cout << "AMQP closed connection" << std::endl;
|
||||
_quit = true;
|
||||
if (!_writeInProgress)
|
||||
{
|
||||
_socket.close();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
#pragma once
|
||||
|
||||
#include <deque>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
|
||||
#include <amqpcpp.h>
|
||||
#include <boost/asio.hpp>
|
||||
|
||||
class AmqpBuffer;
|
||||
class AsioHandler: public AMQP::ConnectionHandler
|
||||
{
|
||||
private:
|
||||
|
||||
typedef std::deque<std::vector<char>> OutputBuffers;
|
||||
|
||||
virtual void onData(AMQP::Connection *connection, const char *data, size_t size);
|
||||
virtual void onConnected(AMQP::Connection *connection);
|
||||
virtual void onError(AMQP::Connection *connection, const char *message);
|
||||
virtual void onClosed(AMQP::Connection *connection);
|
||||
|
||||
void doConnect(const std::string& host, uint16_t port);
|
||||
|
||||
void doRead();
|
||||
|
||||
void doWrite();
|
||||
|
||||
void parseData();
|
||||
|
||||
private:
|
||||
|
||||
boost::asio::io_service& _ioService;
|
||||
boost::asio::ip::tcp::socket _socket;
|
||||
|
||||
std::vector<char> _asioInputBuffer;
|
||||
std::shared_ptr<AmqpBuffer> _amqpBuffer;
|
||||
AMQP::Connection* _connection;
|
||||
OutputBuffers _outputBuffer;
|
||||
bool _writeInProgress;
|
||||
bool _connected;
|
||||
bool _quit;
|
||||
|
||||
public:
|
||||
|
||||
static constexpr size_t ASIO_INPUT_BUFFER_SIZE = 4*1024; //4kb
|
||||
|
||||
AsioHandler(boost::asio::io_service& ioService);
|
||||
|
||||
void connect(const std::string& host, uint16_t port);
|
||||
|
||||
virtual ~AsioHandler();
|
||||
|
||||
AsioHandler(const AsioHandler&) = delete;
|
||||
AsioHandler& operator=(const AsioHandler&)=delete;
|
||||
|
||||
bool connected()const;
|
||||
};
|
||||
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
#include <iostream>
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include "tools.h"
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
const std::string msg =
|
||||
argc > 1 ? join(&argv[1], &argv[argc], " ") : "info: Hello World!";
|
||||
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
|
||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
||||
AMQP::Channel channel(&connection);
|
||||
channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()
|
||||
{
|
||||
channel.publish("logs", "", msg);
|
||||
std::cout << " [x] Sent "<<msg<< std::endl;
|
||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
||||
});
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
#include <iostream>
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include "tools.h"
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
const std::string severity = argc > 2 ? argv[1] : "info";
|
||||
const std::string msg =
|
||||
argc > 2 ? join(&argv[2], &argv[argc], " ") : "Hello World!";
|
||||
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
|
||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
||||
AMQP::Channel channel(&connection);
|
||||
channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]()
|
||||
{
|
||||
channel.publish("direct_logs", severity, msg);
|
||||
std::cout << " [x] Sent "<<severity<<":"<<msg<< std::endl;
|
||||
|
||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
||||
});
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
#include <iostream>
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include "tools.h"
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
const std::string msg =
|
||||
argc > 1 ? join(&argv[2], &argv[argc], " ") : "Hello World!";
|
||||
const std::string routing_key = argc > 1 ? argv[1] : "anonymous.info";
|
||||
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
|
||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
||||
AMQP::Channel channel(&connection);
|
||||
channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]()
|
||||
{
|
||||
channel.publish("topic_logs", routing_key, msg);
|
||||
std::cout << " [x] Sent "<<routing_key<<":"<<msg<< std::endl;
|
||||
|
||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
||||
});
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
#include <iostream>
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include "tools.h"
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
const std::string msg =
|
||||
argc > 1 ? join(&argv[1], &argv[argc], " ") : "Hello World!";
|
||||
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
AMQP::Channel channel(&connection);
|
||||
|
||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
||||
AMQP::QueueCallback callback =
|
||||
[&](const std::string &name, int msgcount, int consumercount)
|
||||
{
|
||||
AMQP::Envelope env(msg);
|
||||
env.setDeliveryMode(2);
|
||||
channel.publish("", "task_queue", env);
|
||||
std::cout<<" [x] Sent '"<<msg<<"'\n";
|
||||
|
||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
||||
};
|
||||
|
||||
channel.declareQueue("task_queue", AMQP::durable).onSuccess(callback);
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
#include <iostream>
|
||||
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(void)
|
||||
{
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
|
||||
AMQP::Channel channel(&connection);
|
||||
channel.declareQueue("hello");
|
||||
channel.consume("hello", AMQP::noack).onReceived(
|
||||
[](const AMQP::Message &message,
|
||||
uint64_t deliveryTag,
|
||||
bool redelivered)
|
||||
{
|
||||
|
||||
std::cout <<" [x] Received "<<message.message() << std::endl;
|
||||
});
|
||||
|
||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
#include <iostream>
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(void)
|
||||
{
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
|
||||
AMQP::Channel channel(&connection);
|
||||
auto receiveMessageCallback = [](const AMQP::Message &message,
|
||||
uint64_t deliveryTag,
|
||||
bool redelivered)
|
||||
{
|
||||
|
||||
std::cout <<" [x] "<<message.message() << std::endl;
|
||||
};
|
||||
|
||||
AMQP::QueueCallback callback =
|
||||
[&](const std::string &name, int msgcount, int consumercount)
|
||||
{
|
||||
channel.bindQueue("logs", name,"");
|
||||
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
|
||||
};
|
||||
|
||||
AMQP::SuccessCallback success = [&]()
|
||||
{
|
||||
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
|
||||
};
|
||||
|
||||
channel.declareExchange("logs", AMQP::fanout).onSuccess(success);
|
||||
|
||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
#include <iostream>
|
||||
#include <algorithm>
|
||||
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
if(argc==1)
|
||||
{
|
||||
std::cout<<"Usage: "<<argv[0]<<" [info] [warning] [error]"<<std::endl;
|
||||
return 1;
|
||||
}
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
AMQP::Channel channel(&connection);
|
||||
|
||||
channel.declareExchange("direct_logs", AMQP::direct);
|
||||
|
||||
auto receiveMessageCallback =
|
||||
[](const AMQP::Message &message,
|
||||
uint64_t deliveryTag,
|
||||
bool redelivered)
|
||||
{
|
||||
std::cout <<" [x] "
|
||||
<<message.routingKey()
|
||||
<<":"
|
||||
<<message.message()
|
||||
<< std::endl;
|
||||
};
|
||||
|
||||
AMQP::QueueCallback callback = [&](const std::string &name,
|
||||
int msgcount,
|
||||
int consumercount)
|
||||
{
|
||||
std::for_each(&argv[1],
|
||||
&argv[argc],
|
||||
[&](const char* severity)
|
||||
{
|
||||
channel.bindQueue("direct_logs","", severity);
|
||||
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
|
||||
});
|
||||
|
||||
};
|
||||
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
|
||||
|
||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
#include <iostream>
|
||||
#include <algorithm>
|
||||
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
if(argc==1)
|
||||
{
|
||||
std::cout<<"Usage: "<<argv[0]<<" [binding_key]..."<<std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
|
||||
AMQP::Channel channel(&connection);
|
||||
|
||||
channel.declareExchange("topic_logs", AMQP::topic);
|
||||
|
||||
auto receiveMessageCallback =
|
||||
[](const AMQP::Message &message,
|
||||
uint64_t deliveryTag,
|
||||
bool redelivered)
|
||||
{
|
||||
std::cout <<" [x] "
|
||||
<<message.routingKey()
|
||||
<<":"
|
||||
<<message.message()
|
||||
<< std::endl;
|
||||
};
|
||||
|
||||
AMQP::QueueCallback callback = [&](const std::string &name,
|
||||
int msgcount,
|
||||
int consumercount)
|
||||
{
|
||||
std::for_each(&argv[1],
|
||||
&argv[argc],
|
||||
[&](const char* bindingKeys)
|
||||
{
|
||||
std::cout<<bindingKeys<<std::endl;
|
||||
channel.bindQueue("topic_logs",name, bindingKeys);
|
||||
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
|
||||
});
|
||||
|
||||
};
|
||||
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
|
||||
|
||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
#include <iostream>
|
||||
|
||||
#include "tools.h"
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(int argc, const char* argv[])
|
||||
{
|
||||
const std::string correlation(uuid());
|
||||
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
|
||||
AMQP::Channel channel(&connection);
|
||||
AMQP::QueueCallback callback = [&](const std::string &name,
|
||||
int msgcount,
|
||||
int consumercount)
|
||||
{
|
||||
AMQP::Envelope env("30");
|
||||
env.setCorrelationID(correlation);
|
||||
env.setReplyTo(name);
|
||||
channel.publish("","rpc_queue",env);
|
||||
std::cout<<" [x] Requesting fib(30)"<<std::endl;
|
||||
|
||||
};
|
||||
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
|
||||
|
||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
||||
auto receiveCallback = [&](const AMQP::Message &message,
|
||||
uint64_t deliveryTag,
|
||||
bool redelivered)
|
||||
{
|
||||
if(message.correlationID() != correlation)
|
||||
return;
|
||||
|
||||
std::cout<<" [.] Got "<<message.message()<<std::endl;
|
||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
||||
};
|
||||
|
||||
channel.consume("", AMQP::noack).onReceived(receiveCallback);
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
#include <iostream>
|
||||
#include <algorithm>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
#include "asiohandler.h"
|
||||
|
||||
int fib(int n)
|
||||
{
|
||||
switch (n)
|
||||
{
|
||||
case 0:
|
||||
return 0;
|
||||
case 1:
|
||||
return 1;
|
||||
default:
|
||||
return fib(n - 1) + fib(n - 2);
|
||||
}
|
||||
}
|
||||
|
||||
int main(void)
|
||||
{
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
|
||||
AMQP::Channel channel(&connection);
|
||||
channel.setQos(1);
|
||||
|
||||
channel.declareQueue("rpc_queue");
|
||||
channel.consume("").onReceived([&channel](const AMQP::Message &message,
|
||||
uint64_t deliveryTag,
|
||||
bool redelivered)
|
||||
{
|
||||
const auto body = message.message();
|
||||
std::cout<<" [.] fib("<<body<<")"<<std::endl;
|
||||
|
||||
AMQP::Envelope env(std::to_string(fib(std::stoi(body))));
|
||||
env.setCorrelationID(message.correlationID());
|
||||
|
||||
channel.publish("", message.replyTo(), env);
|
||||
channel.ack(deliveryTag);
|
||||
});
|
||||
|
||||
std::cout << " [x] Awaiting RPC requests" << std::endl;
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
#include <iostream>
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(void)
|
||||
{
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
AMQP::Channel channel(&connection);
|
||||
|
||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
||||
channel.onReady([&]()
|
||||
{
|
||||
if(handler.connected())
|
||||
{
|
||||
channel.publish("", "hello", "Hello World!");
|
||||
std::cout << " [x] Sent 'Hello World!'" << std::endl;
|
||||
|
||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
||||
}
|
||||
});
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
#include <boost/uuid/uuid_generators.hpp>
|
||||
|
||||
|
||||
template<typename Iterator>
|
||||
std::string join(Iterator first, const Iterator last,
|
||||
const std::string& separator)
|
||||
{
|
||||
std::string str;
|
||||
for (; first != last; ++first)
|
||||
{
|
||||
str.append(*first);
|
||||
if (first != (last - 1))
|
||||
{
|
||||
str.append(separator);
|
||||
}
|
||||
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
std::string uuid()
|
||||
{
|
||||
boost::uuids::random_generator generator;
|
||||
boost::uuids::uuid uuid(generator());
|
||||
|
||||
std::stringstream sstr;
|
||||
sstr<<boost::uuids::to_string(uuid);
|
||||
return sstr.str();
|
||||
}
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
#include <iostream>
|
||||
#include <algorithm>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
#include "asiohandler.h"
|
||||
|
||||
int main(void)
|
||||
{
|
||||
boost::asio::io_service ioService;
|
||||
AsioHandler handler(ioService);
|
||||
handler.connect("localhost", 5672);
|
||||
|
||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
||||
|
||||
AMQP::Channel channel(&connection);
|
||||
channel.setQos(1);
|
||||
|
||||
channel.declareQueue("task_queue", AMQP::durable);
|
||||
channel.consume("task_queue").onReceived(
|
||||
[&channel](const AMQP::Message &message,
|
||||
uint64_t deliveryTag,
|
||||
bool redelivered)
|
||||
{
|
||||
const auto body = message.message();
|
||||
std::cout<<" [x] Received "<<body<<std::endl;
|
||||
|
||||
size_t count = std::count(body.cbegin(), body.cend(), '.');
|
||||
std::this_thread::sleep_for (std::chrono::seconds(count));
|
||||
|
||||
std::cout<<" [x] Done"<<std::endl;
|
||||
channel.ack(deliveryTag);
|
||||
});
|
||||
|
||||
|
||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
||||
|
||||
ioService.run();
|
||||
return 0;
|
||||
}
|
||||
Loading…
Reference in New Issue