installed test program, added onMessage() function in deferredconsumer
This commit is contained in:
parent
b043c33cc6
commit
59e0b61e6b
|
|
@ -77,6 +77,7 @@ public:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a function to be called when a message arrives
|
* Register a function to be called when a message arrives
|
||||||
|
* This fuction is also available as onMessage() because I always forget which name I gave to it
|
||||||
* @param callback the callback to execute
|
* @param callback the callback to execute
|
||||||
*/
|
*/
|
||||||
DeferredConsumer &onReceived(const MessageCallback &callback)
|
DeferredConsumer &onReceived(const MessageCallback &callback)
|
||||||
|
|
@ -88,6 +89,20 @@ public:
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a function to be called when a message arrives
|
||||||
|
* This fuction is also available as onMessage() because I always forget which name I gave to it
|
||||||
|
* @param callback the callback to execute
|
||||||
|
*/
|
||||||
|
DeferredConsumer &onMessage(const MessageCallback &callback)
|
||||||
|
{
|
||||||
|
// store callback
|
||||||
|
_messageCallback = callback;
|
||||||
|
|
||||||
|
// allow chaining
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All the onSuccess() functions defined in the base class are accessible too
|
* All the onSuccess() functions defined in the base class are accessible too
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,18 @@
|
||||||
|
CPP = g++
|
||||||
|
CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g
|
||||||
|
LD = g++
|
||||||
|
LDFLAGS = -lamqpcpp -lcopernica_event -lcopernica_network -lev
|
||||||
|
RESULT = a.out
|
||||||
|
SOURCES = $(wildcard *.cpp)
|
||||||
|
OBJECTS = $(SOURCES:%.cpp=%.o)
|
||||||
|
|
||||||
|
all: ${OBJECTS} ${RESULT}
|
||||||
|
|
||||||
|
${RESULT}: ${OBJECTS}
|
||||||
|
${LD} -o $@ ${OBJECTS} ${LDFLAGS}
|
||||||
|
|
||||||
|
clean:
|
||||||
|
${RM} *.obj *~* ${OBJECTS} ${RESULT}
|
||||||
|
|
||||||
|
${OBJECTS}:
|
||||||
|
${CPP} ${CPPFLAGS} -o $@ ${@:%.o=%.cpp}
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
I'm sorry, the test case makes use of the closed source Copernica libraries. Maybe someone
|
||||||
|
is willing to provide a test case based on plain system calls?
|
||||||
Binary file not shown.
|
|
@ -0,0 +1,54 @@
|
||||||
|
/**
|
||||||
|
* Main.cpp
|
||||||
|
*
|
||||||
|
* Test program
|
||||||
|
*
|
||||||
|
* @copyright 2014 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Global libraries that we need
|
||||||
|
*/
|
||||||
|
#include <amqpcpp.h>
|
||||||
|
#include <copernica/network.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Namespaces to use
|
||||||
|
*/
|
||||||
|
using namespace std;
|
||||||
|
using namespace Copernica;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Local libraries
|
||||||
|
*/
|
||||||
|
#include "myconnection.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Main procedure
|
||||||
|
* @param argc
|
||||||
|
* @param argv
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
int main(int argc, const char *argv[])
|
||||||
|
{
|
||||||
|
// need an ip
|
||||||
|
if (argc != 2)
|
||||||
|
{
|
||||||
|
// report error
|
||||||
|
std::cerr << "usage: " << argv[0] << " <ip>" << std::endl;
|
||||||
|
|
||||||
|
// done
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// create connection
|
||||||
|
MyConnection connection(argv[1]);
|
||||||
|
|
||||||
|
// start the main event loop
|
||||||
|
Event::MainLoop::instance()->run();
|
||||||
|
|
||||||
|
// done
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,234 @@
|
||||||
|
/**
|
||||||
|
* MyConnection.cpp
|
||||||
|
*
|
||||||
|
* @copyright 2014 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Required external libraries
|
||||||
|
*/
|
||||||
|
#include <amqpcpp.h>
|
||||||
|
#include <copernica/network.h>
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Namespaces to use
|
||||||
|
*/
|
||||||
|
using namespace std;
|
||||||
|
using namespace Copernica;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Required local class definitions
|
||||||
|
*/
|
||||||
|
#include "myconnection.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
MyConnection::MyConnection(const std::string &ip) :
|
||||||
|
_socket(Event::MainLoop::instance(), this),
|
||||||
|
_connection(nullptr),
|
||||||
|
_channel(nullptr)
|
||||||
|
{
|
||||||
|
// start connecting
|
||||||
|
if (_socket.connect(Network::Ipv4Address(ip), 5672)) return;
|
||||||
|
|
||||||
|
// failure
|
||||||
|
onFailure(&_socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destructor
|
||||||
|
*/
|
||||||
|
MyConnection::~MyConnection()
|
||||||
|
{
|
||||||
|
// do we still have a channel?
|
||||||
|
if (_channel) delete _channel;
|
||||||
|
|
||||||
|
// do we still have a connection?
|
||||||
|
if (_connection) delete _connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the connection failed
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
void MyConnection::onFailure(Network::TcpSocket *socket)
|
||||||
|
{
|
||||||
|
// report error
|
||||||
|
std::cout << "connect failure" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the connection timed out (which also is a failure
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
void MyConnection::onTimeout(Network::TcpSocket *socket)
|
||||||
|
{
|
||||||
|
// report error
|
||||||
|
std::cout << "connect timeout" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the connection succeeded
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
void MyConnection::onConnected(Network::TcpSocket *socket)
|
||||||
|
{
|
||||||
|
// report connection
|
||||||
|
std::cout << "connected" << std::endl;
|
||||||
|
|
||||||
|
// we are connected, leap out if there already is a amqp connection
|
||||||
|
if (_connection) return;
|
||||||
|
|
||||||
|
// create amqp connection, and a new channel
|
||||||
|
_connection = new AMQP::Connection(this, AMQP::Login("guest", "guest"), "/");
|
||||||
|
_channel = new AMQP::Channel(_connection);
|
||||||
|
|
||||||
|
// install a handler when channel is in error
|
||||||
|
_channel->onError([](const char *message) {
|
||||||
|
|
||||||
|
std::cout << "channel error " << message << std::endl;
|
||||||
|
});
|
||||||
|
|
||||||
|
// install a handler when channel is ready
|
||||||
|
_channel->onReady([]() {
|
||||||
|
|
||||||
|
std::cout << "channel ready" << std::endl;
|
||||||
|
});
|
||||||
|
|
||||||
|
// we declare a queue, an exchange and we publish a message
|
||||||
|
_channel->declareQueue("my_queue").onSuccess([this]() {
|
||||||
|
std::cout << "queue declared" << std::endl;
|
||||||
|
|
||||||
|
// start consuming
|
||||||
|
_channel->consume("my_queue").onReceived([](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
|
||||||
|
std::cout << "received: " << message.message() << std::endl;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// declare an exchange
|
||||||
|
_channel->declareExchange("my_exchange", AMQP::direct).onSuccess([]() {
|
||||||
|
std::cout << "exchange declared" << std::endl;
|
||||||
|
});
|
||||||
|
|
||||||
|
// bind queue and exchange
|
||||||
|
_channel->bindQueue("my_exchange", "my_queue", "key").onSuccess([this]() {
|
||||||
|
std::cout << "queue bound to exchange" << std::endl;
|
||||||
|
|
||||||
|
_channel->publish("my_exchange", "key", "just a message");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the socket is closed (as a result of a TcpSocket::close() call)
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
void MyConnection::onClosed(Network::TcpSocket *socket)
|
||||||
|
{
|
||||||
|
// show
|
||||||
|
std::cout << "myconnection closed" << std::endl;
|
||||||
|
|
||||||
|
// close the channel and connection
|
||||||
|
if (_channel) delete _channel;
|
||||||
|
if (_connection) delete _connection;
|
||||||
|
|
||||||
|
// set to null
|
||||||
|
_channel = nullptr;
|
||||||
|
_connection = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the peer closed the connection
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
void MyConnection::onLost(Network::TcpSocket *socket)
|
||||||
|
{
|
||||||
|
// report error
|
||||||
|
std::cout << "connection lost" << std::endl;
|
||||||
|
|
||||||
|
// close the channel and connection
|
||||||
|
if (_channel) delete _channel;
|
||||||
|
if (_connection) delete _connection;
|
||||||
|
|
||||||
|
// set to null
|
||||||
|
_channel = nullptr;
|
||||||
|
_connection = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when data is received on the socket
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
* @param buffer Pointer to the fill input buffer
|
||||||
|
*/
|
||||||
|
void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer)
|
||||||
|
{
|
||||||
|
// send what came in
|
||||||
|
std::cout << "received: " << buffer->size() << " bytes" << std::endl;
|
||||||
|
|
||||||
|
// leap out if there is no connection
|
||||||
|
if (!_connection) return;
|
||||||
|
|
||||||
|
// let the data be handled by the connection
|
||||||
|
size_t bytes = _connection->parse(buffer->data(), buffer->size());
|
||||||
|
|
||||||
|
// shrink the buffer
|
||||||
|
buffer->shrink(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when data needs to be sent over the network
|
||||||
|
*
|
||||||
|
* Note that the AMQP library does no buffering by itself. This means
|
||||||
|
* that this method should always send out all data or do the buffering
|
||||||
|
* itself.
|
||||||
|
*
|
||||||
|
* @param connection The connection that created this output
|
||||||
|
* @param buffer Data to send
|
||||||
|
* @param size Size of the buffer
|
||||||
|
*/
|
||||||
|
void MyConnection::onData(AMQP::Connection *connection, const char *buffer, size_t size)
|
||||||
|
{
|
||||||
|
// // report what is going on
|
||||||
|
// std::cout << "send: " << size << std::endl;
|
||||||
|
//
|
||||||
|
// for (unsigned i=0; i<size; i++) std::cout << (int)buffer[i] << " ";
|
||||||
|
// std::cout << std::endl;
|
||||||
|
|
||||||
|
|
||||||
|
// send to the socket
|
||||||
|
_socket.write(buffer, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When the connection ends up in an error state this method is called.
|
||||||
|
* This happens when data comes in that does not match the AMQP protocol
|
||||||
|
*
|
||||||
|
* After this method is called, the connection no longer is in a valid
|
||||||
|
* state and can be used. In normal circumstances this method is not called.
|
||||||
|
*
|
||||||
|
* @param connection The connection that entered the error state
|
||||||
|
* @param message Error message
|
||||||
|
*/
|
||||||
|
void MyConnection::onError(AMQP::Connection *connection, const char *message)
|
||||||
|
{
|
||||||
|
// report error
|
||||||
|
std::cout << "AMQP Connection error: " << message << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the login attempt succeeded. After this method
|
||||||
|
* was called, the connection is ready to use
|
||||||
|
*
|
||||||
|
* @param connection The connection that can now be used
|
||||||
|
*/
|
||||||
|
void MyConnection::onConnected(AMQP::Connection *connection)
|
||||||
|
{
|
||||||
|
// show
|
||||||
|
std::cout << "AMQP login success" << std::endl;
|
||||||
|
|
||||||
|
// create channel if it does not yet exist
|
||||||
|
if (!_channel) _channel = new AMQP::Channel(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -0,0 +1,118 @@
|
||||||
|
/**
|
||||||
|
* MyConnection.h
|
||||||
|
*
|
||||||
|
* Our own test implementation for a connection handler
|
||||||
|
*
|
||||||
|
* @copyright 2014 Copernica BV
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class definition
|
||||||
|
*/
|
||||||
|
class MyConnection :
|
||||||
|
public AMQP::ConnectionHandler,
|
||||||
|
public Network::TcpHandler
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* The actual TCP socket that is connected with RabbitMQ
|
||||||
|
* @var TcpSocket
|
||||||
|
*/
|
||||||
|
Network::TcpSocket _socket;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The AMQP connection
|
||||||
|
* @var Connection
|
||||||
|
*/
|
||||||
|
AMQP::Connection *_connection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The AMQP channel
|
||||||
|
* @var Channel
|
||||||
|
*/
|
||||||
|
AMQP::Channel *_channel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the connection failed
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
virtual void onFailure(Network::TcpSocket *socket) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the connection timed out (which also is a failure
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
virtual void onTimeout(Network::TcpSocket *socket) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the connection succeeded
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
virtual void onConnected(Network::TcpSocket *socket) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the socket is closed (as a result of a TcpSocket::close() call)
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
virtual void onClosed(Network::TcpSocket *socket) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the peer closed the connection
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
*/
|
||||||
|
virtual void onLost(Network::TcpSocket *socket) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when data is received on the socket
|
||||||
|
* @param socket Pointer to the socket
|
||||||
|
* @param buffer Pointer to the fill input buffer
|
||||||
|
*/
|
||||||
|
virtual void onData(Network::TcpSocket *socket, Network::Buffer *buffer) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when data needs to be sent over the network
|
||||||
|
*
|
||||||
|
* Note that the AMQP library does no buffering by itself. This means
|
||||||
|
* that this method should always send out all data or do the buffering
|
||||||
|
* itself.
|
||||||
|
*
|
||||||
|
* @param connection The connection that created this output
|
||||||
|
* @param buffer Data to send
|
||||||
|
* @param size Size of the buffer
|
||||||
|
*/
|
||||||
|
virtual void onData(AMQP::Connection *connection, const char *buffer, size_t size) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When the connection ends up in an error state this method is called.
|
||||||
|
* This happens when data comes in that does not match the AMQP protocol
|
||||||
|
*
|
||||||
|
* After this method is called, the connection no longer is in a valid
|
||||||
|
* state and can be used. In normal circumstances this method is not called.
|
||||||
|
*
|
||||||
|
* @param connection The connection that entered the error state
|
||||||
|
* @param message Error message
|
||||||
|
*/
|
||||||
|
virtual void onError(AMQP::Connection *connection, const char *message) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called when the login attempt succeeded. After this method
|
||||||
|
* was called, the connection is ready to use
|
||||||
|
*
|
||||||
|
* @param connection The connection that can now be used
|
||||||
|
*/
|
||||||
|
virtual void onConnected(AMQP::Connection *connection) override;
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param ip
|
||||||
|
*/
|
||||||
|
MyConnection(const std::string &ip);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destructor
|
||||||
|
*/
|
||||||
|
virtual ~MyConnection();
|
||||||
|
|
||||||
|
|
||||||
|
};
|
||||||
|
|
@ -0,0 +1,19 @@
|
||||||
|
#include <iostream>
|
||||||
|
#include <amqpcpp.h>
|
||||||
|
|
||||||
|
|
||||||
|
int main()
|
||||||
|
{
|
||||||
|
AMQP::Array x;
|
||||||
|
|
||||||
|
x[0] = "abc";
|
||||||
|
x[1] = "xyz";
|
||||||
|
|
||||||
|
std::cout << x << std::endl;
|
||||||
|
std::cout << x[0] << std::endl;
|
||||||
|
std::cout << x[1] << std::endl;
|
||||||
|
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
Loading…
Reference in New Issue