From 41239a1952d5a103e0e89b244ec5409d68626490 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Thu, 1 Mar 2018 22:54:50 +0100 Subject: [PATCH] fixed to make returning message functional, and added test code --- include/amqpcpp.h | 1 + src/deferredpublisher.cpp | 3 +++ tests/address.cpp | 2 ++ tests/myconnection.cpp | 21 +++++++++++++++------ 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/include/amqpcpp.h b/include/amqpcpp.h index 4d7735a..9d0e5a7 100644 --- a/include/amqpcpp.h +++ b/include/amqpcpp.h @@ -71,6 +71,7 @@ #include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferredcancel.h" #include "amqpcpp/deferredget.h" +#include "amqpcpp/deferredpublisher.h" #include "amqpcpp/channelimpl.h" #include "amqpcpp/channel.h" #include "amqpcpp/login.h" diff --git a/src/deferredpublisher.cpp b/src/deferredpublisher.cpp index f25b167..b75e6df 100644 --- a/src/deferredpublisher.cpp +++ b/src/deferredpublisher.cpp @@ -33,6 +33,9 @@ void DeferredPublisher::process(BasicReturnFrame &frame) // initialize the object for the next message initialize(frame.exchange(), frame.routingKey()); + + // do we have anybody interested in messages? in that case we construct the message + if (_bounceCallback) _message.construct(frame.exchange(), frame.routingKey()); } /** diff --git a/tests/address.cpp b/tests/address.cpp index 2145931..0de9a15 100644 --- a/tests/address.cpp +++ b/tests/address.cpp @@ -18,6 +18,7 @@ * @param argv * @return int */ +/* int main(int argc, const char *argv[]) { // iterate over the arguments @@ -37,3 +38,4 @@ int main(int argc, const char *argv[]) // done return 0; } +*/ \ No newline at end of file diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index 96f3580..8875d02 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -30,7 +30,7 @@ MyConnection::MyConnection(const std::string &ip) : _socket(Event::MainLoop::instance(), this) { // start connecting - if (_socket.connect(Network::Ipv4Address(ip), 5672)) return; + if (_socket.connect(Dns::IpAddress(ip), 5672)) return; // failure onFailure(&_socket); @@ -96,21 +96,30 @@ void MyConnection::onConnected(Network::TcpSocket *socket) std::cout << "queue declared" << std::endl; // start consuming - _channel->consume("my_queue").onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { - std::cout << "received: " << message.message() << std::endl; + _channel->consume("my_queue").onReceived([this](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { + std::cout << "consumed from exchange " << message.exchange() << " " << message.routingkey() << ": " << std::string(message.body(), message.bodySize()) << std::endl; + _channel->ack(deliveryTag); }); }); // declare an exchange - _channel->declareExchange().onSuccess([]() { + _channel->declareExchange("my_exchange", AMQP::direct).onSuccess([]() { std::cout << "exchange declared" << std::endl; }); // bind queue and exchange _channel->bindQueue("my_exchange", "my_queue", "key").onSuccess([this]() { std::cout << "queue bound to exchange" << std::endl; + + // callback for returns + auto callback = [](const AMQP::Message &message, int16_t code, const std::string &description) { - _channel->publish("my_exchange", "key", "just a message"); + std::cout << "message was returned: " << code << " " << description << ": " << std::string(message.body(), message.bodySize()) << std::endl; + + }; + + _channel->publish("my_exchange", "key", "just a message", AMQP::mandatory).onReturned(callback); + _channel->publish("my_exchange", "unknown key", "just another message", AMQP::mandatory).onReturned(callback); }); } @@ -156,7 +165,7 @@ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer) if (!_connection) return; // let the data be handled by the connection - size_t bytes = _connection->parse(buffer->data(), buffer->size()); + size_t bytes = _connection->parse(buffer->buffer(), buffer->size()); // shrink the buffer buffer->shrink(bytes);