fixed to make returning message functional, and added test code

This commit is contained in:
Emiel Bruijntjes 2018-03-01 22:54:50 +01:00
parent 1ccd93cc5e
commit 41239a1952
4 changed files with 21 additions and 6 deletions

View File

@ -71,6 +71,7 @@
#include "amqpcpp/deferreddelete.h" #include "amqpcpp/deferreddelete.h"
#include "amqpcpp/deferredcancel.h" #include "amqpcpp/deferredcancel.h"
#include "amqpcpp/deferredget.h" #include "amqpcpp/deferredget.h"
#include "amqpcpp/deferredpublisher.h"
#include "amqpcpp/channelimpl.h" #include "amqpcpp/channelimpl.h"
#include "amqpcpp/channel.h" #include "amqpcpp/channel.h"
#include "amqpcpp/login.h" #include "amqpcpp/login.h"

View File

@ -33,6 +33,9 @@ void DeferredPublisher::process(BasicReturnFrame &frame)
// initialize the object for the next message // initialize the object for the next message
initialize(frame.exchange(), frame.routingKey()); initialize(frame.exchange(), frame.routingKey());
// do we have anybody interested in messages? in that case we construct the message
if (_bounceCallback) _message.construct(frame.exchange(), frame.routingKey());
} }
/** /**

View File

@ -18,6 +18,7 @@
* @param argv * @param argv
* @return int * @return int
*/ */
/*
int main(int argc, const char *argv[]) int main(int argc, const char *argv[])
{ {
// iterate over the arguments // iterate over the arguments
@ -37,3 +38,4 @@ int main(int argc, const char *argv[])
// done // done
return 0; return 0;
} }
*/

View File

@ -30,7 +30,7 @@ MyConnection::MyConnection(const std::string &ip) :
_socket(Event::MainLoop::instance(), this) _socket(Event::MainLoop::instance(), this)
{ {
// start connecting // start connecting
if (_socket.connect(Network::Ipv4Address(ip), 5672)) return; if (_socket.connect(Dns::IpAddress(ip), 5672)) return;
// failure // failure
onFailure(&_socket); onFailure(&_socket);
@ -96,21 +96,30 @@ void MyConnection::onConnected(Network::TcpSocket *socket)
std::cout << "queue declared" << std::endl; std::cout << "queue declared" << std::endl;
// start consuming // start consuming
_channel->consume("my_queue").onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { _channel->consume("my_queue").onReceived([this](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::cout << "received: " << message.message() << std::endl; std::cout << "consumed from exchange " << message.exchange() << " " << message.routingkey() << ": " << std::string(message.body(), message.bodySize()) << std::endl;
_channel->ack(deliveryTag);
}); });
}); });
// declare an exchange // declare an exchange
_channel->declareExchange().onSuccess([]() { _channel->declareExchange("my_exchange", AMQP::direct).onSuccess([]() {
std::cout << "exchange declared" << std::endl; std::cout << "exchange declared" << std::endl;
}); });
// bind queue and exchange // bind queue and exchange
_channel->bindQueue("my_exchange", "my_queue", "key").onSuccess([this]() { _channel->bindQueue("my_exchange", "my_queue", "key").onSuccess([this]() {
std::cout << "queue bound to exchange" << std::endl; std::cout << "queue bound to exchange" << std::endl;
// callback for returns
auto callback = [](const AMQP::Message &message, int16_t code, const std::string &description) {
_channel->publish("my_exchange", "key", "just a message"); std::cout << "message was returned: " << code << " " << description << ": " << std::string(message.body(), message.bodySize()) << std::endl;
};
_channel->publish("my_exchange", "key", "just a message", AMQP::mandatory).onReturned(callback);
_channel->publish("my_exchange", "unknown key", "just another message", AMQP::mandatory).onReturned(callback);
}); });
} }
@ -156,7 +165,7 @@ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer)
if (!_connection) return; if (!_connection) return;
// let the data be handled by the connection // let the data be handled by the connection
size_t bytes = _connection->parse(buffer->data(), buffer->size()); size_t bytes = _connection->parse(buffer->buffer(), buffer->size());
// shrink the buffer // shrink the buffer
buffer->shrink(bytes); buffer->shrink(bytes);