removed examples because i did not write them, dont like the coding style, and people keep asking questions about why they dont work (questions that i cannot answer)
This commit is contained in:
parent
98d808e49d
commit
1392e2b873
|
|
@ -1,48 +0,0 @@
|
||||||
set(Boost_USE_STATIC_LIBS ON)
|
|
||||||
set(Boost_USE_MULTITHREADED ON)
|
|
||||||
set(Boost_USE_STATIC_RUNTIME OFF)
|
|
||||||
find_package(Boost 1.55 REQUIRED COMPONENTS system)
|
|
||||||
|
|
||||||
if(NOT Boost_FOUND)
|
|
||||||
message( FATAL_ERROR "boost must be installed")
|
|
||||||
endif()
|
|
||||||
|
|
||||||
include_directories(${Boost_INCLUDE_DIRS})
|
|
||||||
|
|
||||||
add_library(asio_handler asiohandler.cpp asiohandler.h)
|
|
||||||
target_link_libraries(asio_handler ${Boost_LIBRARIES})
|
|
||||||
add_definitions(-std=c++11)
|
|
||||||
|
|
||||||
set(PROGS send
|
|
||||||
receive
|
|
||||||
new_task
|
|
||||||
worker
|
|
||||||
emit_log
|
|
||||||
receive_logs
|
|
||||||
emit_log_direct
|
|
||||||
receive_logs_direct
|
|
||||||
emit_log_topic
|
|
||||||
receive_logs_topic
|
|
||||||
rpc_client
|
|
||||||
rpc_server
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
find_package (Threads)
|
|
||||||
|
|
||||||
foreach(item ${PROGS})
|
|
||||||
add_executable(${item} "${item}.cpp")
|
|
||||||
target_link_libraries(${item} amqpcpp
|
|
||||||
asio_handler
|
|
||||||
${CMAKE_THREAD_LIBS_INIT})
|
|
||||||
endforeach(item)
|
|
||||||
|
|
||||||
#little hack
|
|
||||||
file(GLOB AMQCPP_HEADERS ${PROJECT_SOURCE_DIR}/include/*)
|
|
||||||
|
|
||||||
file(COPY ${AMQCPP_HEADERS}
|
|
||||||
DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/amqpcpp)
|
|
||||||
|
|
||||||
include_directories(${PROJECT_SOURCE_DIR}
|
|
||||||
${CMAKE_CURRENT_BINARY_DIR})
|
|
||||||
|
|
||||||
|
|
@ -1,44 +0,0 @@
|
||||||
# C++ code for RabbitMQ tutorials
|
|
||||||
|
|
||||||
## Requirements
|
|
||||||
|
|
||||||
* Boost Asio
|
|
||||||
* Boost Uuid
|
|
||||||
|
|
||||||
|
|
||||||
## Code
|
|
||||||
|
|
||||||
[Tutorial one: "Hello World!"](http://www.rabbitmq.com/tutorial-one-python.html):
|
|
||||||
|
|
||||||
send
|
|
||||||
receive
|
|
||||||
|
|
||||||
|
|
||||||
[Tutorial two: Work Queues](http://www.rabbitmq.com/tutorial-two-python.html):
|
|
||||||
|
|
||||||
new_task "A very hard task which takes two seconds.."
|
|
||||||
worker
|
|
||||||
|
|
||||||
|
|
||||||
[Tutorial three: Publish/Subscribe](http://www.rabbitmq.com/tutorial-three-python.html):
|
|
||||||
|
|
||||||
receive_logs
|
|
||||||
emit_log "info: This is the log message"
|
|
||||||
|
|
||||||
|
|
||||||
[Tutorial four: Routing](http://www.rabbitmq.com/tutorial-four-python.html):
|
|
||||||
|
|
||||||
receive_logs_direct info
|
|
||||||
emit_log_direct info "The message"
|
|
||||||
|
|
||||||
|
|
||||||
[Tutorial five: Topics](http://www.rabbitmq.com/tutorial-five-python.html):
|
|
||||||
|
|
||||||
receive_logs_topic "*.rabbit"
|
|
||||||
emit_log_topic red.rabbit Hello
|
|
||||||
|
|
||||||
|
|
||||||
[Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-python.html):
|
|
||||||
|
|
||||||
rpc_server
|
|
||||||
rpc_client
|
|
||||||
|
|
@ -1,222 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
using boost::asio::ip::tcp;
|
|
||||||
|
|
||||||
class AmqpBuffer
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
AmqpBuffer(size_t size) :
|
|
||||||
_data(size, 0),
|
|
||||||
_use(0)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t write(const char* data, size_t size)
|
|
||||||
{
|
|
||||||
if (_use == _data.size())
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
const size_t length = (size + _use);
|
|
||||||
size_t write = length < _data.size() ? size : _data.size() - _use;
|
|
||||||
memcpy(_data.data() + _use, data, write);
|
|
||||||
_use += write;
|
|
||||||
return write;
|
|
||||||
}
|
|
||||||
|
|
||||||
void drain()
|
|
||||||
{
|
|
||||||
_use = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t available() const
|
|
||||||
{
|
|
||||||
return _use;
|
|
||||||
}
|
|
||||||
|
|
||||||
const char* data() const
|
|
||||||
{
|
|
||||||
return _data.data();
|
|
||||||
}
|
|
||||||
|
|
||||||
void shl(size_t count)
|
|
||||||
{
|
|
||||||
assert(count < _use);
|
|
||||||
|
|
||||||
const size_t diff = _use - count;
|
|
||||||
std::memmove(_data.data(), _data.data() + count, diff);
|
|
||||||
_use = _use - count;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::vector<char> _data;
|
|
||||||
size_t _use;
|
|
||||||
};
|
|
||||||
|
|
||||||
AsioHandler::AsioHandler(boost::asio::io_service& ioService) :
|
|
||||||
_ioService(ioService),
|
|
||||||
_socket(ioService),
|
|
||||||
_timer(ioService),
|
|
||||||
_asioInputBuffer(ASIO_INPUT_BUFFER_SIZE, 0),
|
|
||||||
_amqpBuffer(new AmqpBuffer(ASIO_INPUT_BUFFER_SIZE * 2)),
|
|
||||||
_connection(nullptr),
|
|
||||||
_writeInProgress(false),
|
|
||||||
_connected(false),
|
|
||||||
_quit(false)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
AsioHandler::~AsioHandler()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsioHandler::connect(const std::string& host, uint16_t port)
|
|
||||||
{
|
|
||||||
doConnect(host, port);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsioHandler::doConnect(const std::string& host, uint16_t port)
|
|
||||||
{
|
|
||||||
tcp::resolver::query query(host, std::to_string(port));
|
|
||||||
tcp::resolver::iterator iter = tcp::resolver(_ioService).resolve(query);
|
|
||||||
_timer.expires_from_now(boost::posix_time::seconds(15));
|
|
||||||
_timer.async_wait([this](const boost::system::error_code& ec){
|
|
||||||
if(!ec && !_connected)
|
|
||||||
{
|
|
||||||
std::cerr<<"Connection timed out";
|
|
||||||
_socket.cancel();
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
boost::asio::async_connect(_socket, iter,
|
|
||||||
[this](boost::system::error_code ec, tcp::resolver::iterator)
|
|
||||||
{
|
|
||||||
_connected = true;
|
|
||||||
if (!ec)
|
|
||||||
{
|
|
||||||
doRead();
|
|
||||||
|
|
||||||
if(!_outputBuffer.empty())
|
|
||||||
{
|
|
||||||
doWrite();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
std::cerr<<"Connection error:"<<ec<<std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsioHandler::onData(
|
|
||||||
AMQP::Connection *connection, const char *data, size_t size)
|
|
||||||
{
|
|
||||||
_connection = connection;
|
|
||||||
|
|
||||||
_outputBuffer.push_back(std::vector<char>(data, data + size));
|
|
||||||
if (!_writeInProgress && _connected)
|
|
||||||
{
|
|
||||||
doWrite();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsioHandler::doRead()
|
|
||||||
{
|
|
||||||
_socket.async_receive(boost::asio::buffer(_asioInputBuffer),
|
|
||||||
[this](boost::system::error_code ec, std::size_t length)
|
|
||||||
{
|
|
||||||
if (!ec)
|
|
||||||
{
|
|
||||||
_amqpBuffer->write(_asioInputBuffer.data(), length);
|
|
||||||
parseData();
|
|
||||||
doRead();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
std::cerr<<"Error reading:"<<ec<<std::endl;
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsioHandler::doWrite()
|
|
||||||
{
|
|
||||||
_writeInProgress = true;
|
|
||||||
boost::asio::async_write(_socket,
|
|
||||||
boost::asio::buffer(_outputBuffer.front()),
|
|
||||||
[this](boost::system::error_code ec, std::size_t length )
|
|
||||||
{
|
|
||||||
if(!ec)
|
|
||||||
{
|
|
||||||
_outputBuffer.pop_front();
|
|
||||||
if(!_outputBuffer.empty())
|
|
||||||
{
|
|
||||||
doWrite();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
_writeInProgress = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(_quit)
|
|
||||||
{
|
|
||||||
_socket.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
std::cerr<<"Error writing:"<<ec<<std::endl;
|
|
||||||
_socket.close();
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsioHandler::parseData()
|
|
||||||
{
|
|
||||||
if (_connection == nullptr)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const size_t count = _connection->parse(_amqpBuffer->data(),
|
|
||||||
_amqpBuffer->available());
|
|
||||||
|
|
||||||
if (count == _amqpBuffer->available())
|
|
||||||
{
|
|
||||||
_amqpBuffer->drain();
|
|
||||||
}
|
|
||||||
else if (count > 0)
|
|
||||||
{
|
|
||||||
_amqpBuffer->shl(count);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsioHandler::onConnected(AMQP::Connection *connection)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
bool AsioHandler::connected() const
|
|
||||||
{
|
|
||||||
return _connected;
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsioHandler::onError(AMQP::Connection *connection, const char *message)
|
|
||||||
{
|
|
||||||
std::cerr << "AMQP error " << message << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsioHandler::onClosed(AMQP::Connection *connection)
|
|
||||||
{
|
|
||||||
std::cout << "AMQP closed connection" << std::endl;
|
|
||||||
_quit = true;
|
|
||||||
if (!_writeInProgress)
|
|
||||||
{
|
|
||||||
_socket.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,59 +0,0 @@
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include <deque>
|
|
||||||
#include <vector>
|
|
||||||
#include <memory>
|
|
||||||
|
|
||||||
#include <amqpcpp.h>
|
|
||||||
#include <boost/asio.hpp>
|
|
||||||
|
|
||||||
class AmqpBuffer;
|
|
||||||
class AsioHandler: public AMQP::ConnectionHandler
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
|
|
||||||
typedef std::deque<std::vector<char>> OutputBuffers;
|
|
||||||
|
|
||||||
virtual void onData(AMQP::Connection *connection, const char *data, size_t size);
|
|
||||||
virtual void onConnected(AMQP::Connection *connection);
|
|
||||||
virtual void onError(AMQP::Connection *connection, const char *message);
|
|
||||||
virtual void onClosed(AMQP::Connection *connection);
|
|
||||||
|
|
||||||
void doConnect(const std::string& host, uint16_t port);
|
|
||||||
|
|
||||||
void doRead();
|
|
||||||
|
|
||||||
void doWrite();
|
|
||||||
|
|
||||||
void parseData();
|
|
||||||
|
|
||||||
private:
|
|
||||||
|
|
||||||
boost::asio::io_service& _ioService;
|
|
||||||
boost::asio::ip::tcp::socket _socket;
|
|
||||||
boost::asio::deadline_timer _timer;
|
|
||||||
|
|
||||||
std::vector<char> _asioInputBuffer;
|
|
||||||
std::shared_ptr<AmqpBuffer> _amqpBuffer;
|
|
||||||
AMQP::Connection* _connection;
|
|
||||||
OutputBuffers _outputBuffer;
|
|
||||||
bool _writeInProgress;
|
|
||||||
bool _connected;
|
|
||||||
bool _quit;
|
|
||||||
|
|
||||||
public:
|
|
||||||
|
|
||||||
static constexpr size_t ASIO_INPUT_BUFFER_SIZE = 4*1024; //4kb
|
|
||||||
|
|
||||||
AsioHandler(boost::asio::io_service& ioService);
|
|
||||||
|
|
||||||
void connect(const std::string& host, uint16_t port);
|
|
||||||
|
|
||||||
virtual ~AsioHandler();
|
|
||||||
|
|
||||||
AsioHandler(const AsioHandler&) = delete;
|
|
||||||
AsioHandler& operator=(const AsioHandler&)=delete;
|
|
||||||
|
|
||||||
bool connected()const;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
@ -1,29 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
||||||
|
|
||||||
#include "tools.h"
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(int argc, const char* argv[])
|
|
||||||
{
|
|
||||||
const std::string msg =
|
|
||||||
argc > 1 ? join(&argv[1], &argv[argc], " ") : "info: Hello World!";
|
|
||||||
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
|
|
||||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()
|
|
||||||
{
|
|
||||||
channel.publish("logs", "", msg);
|
|
||||||
std::cout << " [x] Sent "<<msg<< std::endl;
|
|
||||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
|
||||||
});
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
||||||
|
|
||||||
#include "tools.h"
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(int argc, const char* argv[])
|
|
||||||
{
|
|
||||||
const std::string severity = argc > 2 ? argv[1] : "info";
|
|
||||||
const std::string msg =
|
|
||||||
argc > 2 ? join(&argv[2], &argv[argc], " ") : "Hello World!";
|
|
||||||
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
|
|
||||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]()
|
|
||||||
{
|
|
||||||
channel.publish("direct_logs", severity, msg);
|
|
||||||
std::cout << " [x] Sent "<<severity<<":"<<msg<< std::endl;
|
|
||||||
|
|
||||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
|
||||||
});
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
||||||
|
|
||||||
#include "tools.h"
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(int argc, const char* argv[])
|
|
||||||
{
|
|
||||||
const std::string msg =
|
|
||||||
argc > 1 ? join(&argv[2], &argv[argc], " ") : "Hello World!";
|
|
||||||
const std::string routing_key = argc > 1 ? argv[1] : "anonymous.info";
|
|
||||||
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
|
|
||||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]()
|
|
||||||
{
|
|
||||||
channel.publish("topic_logs", routing_key, msg);
|
|
||||||
std::cout << " [x] Sent "<<routing_key<<":"<<msg<< std::endl;
|
|
||||||
|
|
||||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
|
||||||
});
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,35 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
||||||
|
|
||||||
#include "tools.h"
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(int argc, const char* argv[])
|
|
||||||
{
|
|
||||||
const std::string msg =
|
|
||||||
argc > 1 ? join(&argv[1], &argv[argc], " ") : "Hello World!";
|
|
||||||
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
|
|
||||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
|
||||||
AMQP::QueueCallback callback =
|
|
||||||
[&](const std::string &name, int msgcount, int consumercount)
|
|
||||||
{
|
|
||||||
AMQP::Envelope env(msg);
|
|
||||||
env.setDeliveryMode(2);
|
|
||||||
channel.publish("", "task_queue", env);
|
|
||||||
std::cout<<" [x] Sent '"<<msg<<"'\n";
|
|
||||||
|
|
||||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
|
||||||
};
|
|
||||||
|
|
||||||
channel.declareQueue("task_queue", AMQP::durable).onSuccess(callback);
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,28 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(void)
|
|
||||||
{
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
channel.declareQueue("hello");
|
|
||||||
channel.consume("hello", AMQP::noack).onReceived(
|
|
||||||
[](const AMQP::Message &message,
|
|
||||||
uint64_t deliveryTag,
|
|
||||||
bool redelivered)
|
|
||||||
{
|
|
||||||
|
|
||||||
std::cout <<" [x] Received "<<message.message() << std::endl;
|
|
||||||
});
|
|
||||||
|
|
||||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,41 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
||||||
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(void)
|
|
||||||
{
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
auto receiveMessageCallback = [](const AMQP::Message &message,
|
|
||||||
uint64_t deliveryTag,
|
|
||||||
bool redelivered)
|
|
||||||
{
|
|
||||||
|
|
||||||
std::cout <<" [x] "<<message.message() << std::endl;
|
|
||||||
};
|
|
||||||
|
|
||||||
AMQP::QueueCallback callback =
|
|
||||||
[&](const std::string &name, int msgcount, int consumercount)
|
|
||||||
{
|
|
||||||
channel.bindQueue("logs", name,"");
|
|
||||||
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
|
|
||||||
};
|
|
||||||
|
|
||||||
AMQP::SuccessCallback success = [&]()
|
|
||||||
{
|
|
||||||
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
|
|
||||||
};
|
|
||||||
|
|
||||||
channel.declareExchange("logs", AMQP::fanout).onSuccess(success);
|
|
||||||
|
|
||||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,52 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <algorithm>
|
|
||||||
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(int argc, const char* argv[])
|
|
||||||
{
|
|
||||||
if(argc==1)
|
|
||||||
{
|
|
||||||
std::cout<<"Usage: "<<argv[0]<<" [info] [warning] [error]"<<std::endl;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
|
|
||||||
channel.declareExchange("direct_logs", AMQP::direct);
|
|
||||||
|
|
||||||
auto receiveMessageCallback =
|
|
||||||
[](const AMQP::Message &message,
|
|
||||||
uint64_t deliveryTag,
|
|
||||||
bool redelivered)
|
|
||||||
{
|
|
||||||
std::cout <<" [x] "
|
|
||||||
<<message.routingKey()
|
|
||||||
<<":"
|
|
||||||
<<message.message()
|
|
||||||
<< std::endl;
|
|
||||||
};
|
|
||||||
|
|
||||||
AMQP::QueueCallback callback = [&](const std::string &name,
|
|
||||||
int msgcount,
|
|
||||||
int consumercount)
|
|
||||||
{
|
|
||||||
std::for_each(&argv[1],
|
|
||||||
&argv[argc],
|
|
||||||
[&](const char* severity)
|
|
||||||
{
|
|
||||||
channel.bindQueue("direct_logs","", severity);
|
|
||||||
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
|
|
||||||
});
|
|
||||||
|
|
||||||
};
|
|
||||||
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
|
|
||||||
|
|
||||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,55 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <algorithm>
|
|
||||||
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(int argc, const char* argv[])
|
|
||||||
{
|
|
||||||
if(argc==1)
|
|
||||||
{
|
|
||||||
std::cout<<"Usage: "<<argv[0]<<" [binding_key]..."<<std::endl;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
|
|
||||||
channel.declareExchange("topic_logs", AMQP::topic);
|
|
||||||
|
|
||||||
auto receiveMessageCallback =
|
|
||||||
[](const AMQP::Message &message,
|
|
||||||
uint64_t deliveryTag,
|
|
||||||
bool redelivered)
|
|
||||||
{
|
|
||||||
std::cout <<" [x] "
|
|
||||||
<<message.routingKey()
|
|
||||||
<<":"
|
|
||||||
<<message.message()
|
|
||||||
<< std::endl;
|
|
||||||
};
|
|
||||||
|
|
||||||
AMQP::QueueCallback callback = [&](const std::string &name,
|
|
||||||
int msgcount,
|
|
||||||
int consumercount)
|
|
||||||
{
|
|
||||||
std::for_each(&argv[1],
|
|
||||||
&argv[argc],
|
|
||||||
[&](const char* bindingKeys)
|
|
||||||
{
|
|
||||||
std::cout<<bindingKeys<<std::endl;
|
|
||||||
channel.bindQueue("topic_logs",name, bindingKeys);
|
|
||||||
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
|
|
||||||
});
|
|
||||||
|
|
||||||
};
|
|
||||||
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
|
|
||||||
|
|
||||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,46 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
|
|
||||||
#include "tools.h"
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(int argc, const char* argv[])
|
|
||||||
{
|
|
||||||
const std::string correlation(uuid());
|
|
||||||
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
AMQP::QueueCallback callback = [&](const std::string &name,
|
|
||||||
int msgcount,
|
|
||||||
int consumercount)
|
|
||||||
{
|
|
||||||
AMQP::Envelope env("30");
|
|
||||||
env.setCorrelationID(correlation);
|
|
||||||
env.setReplyTo(name);
|
|
||||||
channel.publish("","rpc_queue",env);
|
|
||||||
std::cout<<" [x] Requesting fib(30)"<<std::endl;
|
|
||||||
|
|
||||||
};
|
|
||||||
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
|
|
||||||
|
|
||||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
|
||||||
auto receiveCallback = [&](const AMQP::Message &message,
|
|
||||||
uint64_t deliveryTag,
|
|
||||||
bool redelivered)
|
|
||||||
{
|
|
||||||
if(message.correlationID() != correlation)
|
|
||||||
return;
|
|
||||||
|
|
||||||
std::cout<<" [.] Got "<<message.message()<<std::endl;
|
|
||||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
|
||||||
};
|
|
||||||
|
|
||||||
channel.consume("", AMQP::noack).onReceived(receiveCallback);
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,51 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <algorithm>
|
|
||||||
#include <thread>
|
|
||||||
#include <chrono>
|
|
||||||
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int fib(int n)
|
|
||||||
{
|
|
||||||
switch (n)
|
|
||||||
{
|
|
||||||
case 0:
|
|
||||||
return 0;
|
|
||||||
case 1:
|
|
||||||
return 1;
|
|
||||||
default:
|
|
||||||
return fib(n - 1) + fib(n - 2);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(void)
|
|
||||||
{
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
channel.setQos(1);
|
|
||||||
|
|
||||||
channel.declareQueue("rpc_queue");
|
|
||||||
channel.consume("").onReceived([&channel](const AMQP::Message &message,
|
|
||||||
uint64_t deliveryTag,
|
|
||||||
bool redelivered)
|
|
||||||
{
|
|
||||||
const auto body = message.message();
|
|
||||||
std::cout<<" [.] fib("<<body<<")"<<std::endl;
|
|
||||||
|
|
||||||
AMQP::Envelope env(std::to_string(fib(std::stoi(body))));
|
|
||||||
env.setCorrelationID(message.correlationID());
|
|
||||||
|
|
||||||
channel.publish("", message.replyTo(), env);
|
|
||||||
channel.ack(deliveryTag);
|
|
||||||
});
|
|
||||||
|
|
||||||
std::cout << " [x] Awaiting RPC requests" << std::endl;
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,29 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
||||||
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(void)
|
|
||||||
{
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
|
|
||||||
boost::asio::deadline_timer t(ioService, boost::posix_time::millisec(100));
|
|
||||||
channel.onReady([&]()
|
|
||||||
{
|
|
||||||
if(handler.connected())
|
|
||||||
{
|
|
||||||
channel.publish("", "hello", "Hello World!");
|
|
||||||
std::cout << " [x] Sent 'Hello World!'" << std::endl;
|
|
||||||
|
|
||||||
t.async_wait([&](const boost::system::error_code&){ioService.stop();});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
@ -1,36 +0,0 @@
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <sstream>
|
|
||||||
|
|
||||||
#include <boost/uuid/uuid.hpp>
|
|
||||||
#include <boost/uuid/uuid_io.hpp>
|
|
||||||
#include <boost/uuid/uuid_generators.hpp>
|
|
||||||
|
|
||||||
|
|
||||||
template<typename Iterator>
|
|
||||||
std::string join(Iterator first, const Iterator last,
|
|
||||||
const std::string& separator)
|
|
||||||
{
|
|
||||||
std::string str;
|
|
||||||
for (; first != last; ++first)
|
|
||||||
{
|
|
||||||
str.append(*first);
|
|
||||||
if (first != (last - 1))
|
|
||||||
{
|
|
||||||
str.append(separator);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
return str;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string uuid()
|
|
||||||
{
|
|
||||||
boost::uuids::random_generator generator;
|
|
||||||
boost::uuids::uuid uuid(generator());
|
|
||||||
|
|
||||||
std::stringstream sstr;
|
|
||||||
sstr<<boost::uuids::to_string(uuid);
|
|
||||||
return sstr.str();
|
|
||||||
}
|
|
||||||
|
|
@ -1,40 +0,0 @@
|
||||||
#include <iostream>
|
|
||||||
#include <algorithm>
|
|
||||||
#include <thread>
|
|
||||||
#include <chrono>
|
|
||||||
|
|
||||||
#include "asiohandler.h"
|
|
||||||
|
|
||||||
int main(void)
|
|
||||||
{
|
|
||||||
boost::asio::io_service ioService;
|
|
||||||
AsioHandler handler(ioService);
|
|
||||||
handler.connect("localhost", 5672);
|
|
||||||
|
|
||||||
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
|
|
||||||
|
|
||||||
AMQP::Channel channel(&connection);
|
|
||||||
channel.setQos(1);
|
|
||||||
|
|
||||||
channel.declareQueue("task_queue", AMQP::durable);
|
|
||||||
channel.consume("task_queue").onReceived(
|
|
||||||
[&channel](const AMQP::Message &message,
|
|
||||||
uint64_t deliveryTag,
|
|
||||||
bool redelivered)
|
|
||||||
{
|
|
||||||
const auto body = message.message();
|
|
||||||
std::cout<<" [x] Received "<<body<<std::endl;
|
|
||||||
|
|
||||||
size_t count = std::count(body.cbegin(), body.cend(), '.');
|
|
||||||
std::this_thread::sleep_for (std::chrono::seconds(count));
|
|
||||||
|
|
||||||
std::cout<<" [x] Done"<<std::endl;
|
|
||||||
channel.ack(deliveryTag);
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
|
|
||||||
|
|
||||||
ioService.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue