From e4cd8e02f4249e6ba669dc2ac99360345cdbb888 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Sun, 5 Jan 2014 04:11:38 -0800 Subject: [PATCH] fixed test cases --- tests/Makefile | 19 + tests/lowlevel.cpp | 899 ----------------------------------------- tests/main.cpp | 2 +- tests/midlevel.cpp | 214 ---------- tests/myconnection.cpp | 100 ++--- tests/myconnection.h | 2 - tests/test.cpp | 133 ------ 7 files changed, 49 insertions(+), 1320 deletions(-) create mode 100644 tests/Makefile delete mode 100644 tests/lowlevel.cpp delete mode 100644 tests/midlevel.cpp delete mode 100644 tests/test.cpp diff --git a/tests/Makefile b/tests/Makefile new file mode 100644 index 0000000..1c1dec2 --- /dev/null +++ b/tests/Makefile @@ -0,0 +1,19 @@ +CPP = g++ +CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g +LD = g++ +LDFLAGS = -llibamqp -lcopernica_event -lcopernica_network -lev +RESULT = a.out +SOURCES = $(wildcard *.cpp) +OBJECTS = $(SOURCES:%.cpp=%.o) + +all: ${OBJECTS} ${RESULT} + +${RESULT}: ${OBJECTS} + ${LD} -o $@ ${OBJECTS} ${LDFLAGS} + +clean: + ${RM} *.obj *~* ${OBJECTS} ${RESULT} + +${OBJECTS}: + ${CPP} ${CPPFLAGS} -o $@ ${@:%.o=%.cpp} + diff --git a/tests/lowlevel.cpp b/tests/lowlevel.cpp deleted file mode 100644 index f079c7c..0000000 --- a/tests/lowlevel.cpp +++ /dev/null @@ -1,899 +0,0 @@ -#include -#include -#include -#include "amqpbasicframe.h" - -// namespaces to use -using namespace std; -using namespace Copernica; - -/** - * Our own socket handler - */ -class SocketHandler : public Network::TcpHandler -{ -private: - /** - * Status of the connection, and substatuses - * @var int - */ - int _status; - int _connectionStatus; - int _channelStatus; - int _exchangeStatus; - int _qosStatus; - int _queueStatus; - int _basicStatus; - int _deleteStatus; - int _restStatus; - - AMQP::AMQPBasic _basic; - - /** - * Channel we're working on - * @var int - */ - uint16_t _channel; - - std::string _exchangeName; - std::string _queueName; - -public: - - - - /** - * Constructor - */ - SocketHandler() : _status(0), _connectionStatus(0), _channelStatus(0), _qosStatus(0), _exchangeStatus(0), _queueStatus(0), _basicStatus(0), _deleteStatus(0), _restStatus(0), _basic(), _channel(0), _exchangeName("testexchange"), _queueName("testqueue") {} - - /** - * Virtual destructor - */ - virtual ~SocketHandler() {} - - /** - * Method that is called when the connection failed - * @param socket Pointer to the socket - */ - virtual void onFailure(Network::TcpSocket *socket) - { - cout << "connection failure" << endl; - } - - /** - * Method that is called when the connection timed out (which also is a failure - * @param socket Pointer to the socket - */ - virtual void onTimeout(Network::TcpSocket *socket) - { - cout << "connection timeout" << endl; - } - - /** - * Method that is called when the connection succeeded - * @param socket Pointer to the socket - */ - virtual void onConnected(Network::TcpSocket *socket) - { - if(_status == 0){ // send protocol header frame - AMQP::ProtocolHeaderFrame frame; - socket->write(frame.buffer(), frame.size()); - } - } - - /** - * Method that is called when the socket is closed (as a result of a TcpSocket::close() call) - * @param socket Pointer to the socket - */ - virtual void onClosed(Network::TcpSocket *socket) - { - cout << "connection closed" << endl; - } - - /** - * Method that is called when the peer closed the connection - * @param socket Pointer to the socket - */ - virtual void onLost(Network::TcpSocket *socket) - { - cout << "connection lost" << endl; - } - - /** - * Method that is called when data is received on the socket - * @param socket Pointer to the socket - * @param buffer Pointer to the filled input buffer - */ - virtual void onData(Network::TcpSocket *socket, Network::Buffer *buffer) - { - // try getting a frame - auto frame = AMQP::Frame::decode(buffer->data(), buffer->length()); - if(frame == nullptr) - { - std::cout << "error decoding received frame" << std::endl; - - return; - } - if(_status != 5) buffer->shrink(frame->totalSize()); - if(_status == 0) - { - // the connection hasn't been set up, do this now. - setupConnection(*socket, *buffer, frame); - } - if(_status == 1) - { - // time to setup the channel - setupChannel(*socket, *buffer, frame); - } - if(_status == 2) - { - // time to do the QOS stuff - setupQOS(*socket, *buffer, frame); - } - if(_status == 3) - { - // setup the exchange - setupExchange(*socket, *buffer, frame); - } - if(_status == 4) - { - // setup the queue - setupQueue(*socket, *buffer, frame); - } - if(_status == 5) - { - // publish testing etc - testBasicFunctionalities(*socket, *buffer, frame); - } - if(_status == 6) - { - testRestStuff(*socket, *buffer, frame); - } - if(_status == 7) - { - // start deleting everything. - deleteAMQP(*socket, *buffer, frame); - } - if(_status == 8) - { - socket->close(); - } - } - - - void testRestStuff(Network::TcpSocket &socket, Network::Buffer &buffer, std::shared_ptr &frame) - { - if(_restStatus == 0) - { - std::cout << "rest testing"; - std::string consumertag = ""; - auto *sendframe = new AMQP::BasicCancelFrame(_channel, consumertag, false); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _restStatus++; - delete sendframe; - return; // wait for response - } - if(_restStatus == 1) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to BasicCancelOKFrame, should work"<buffer(), sendframe->totalSize()); - _restStatus++; - delete sendframe; - return; // wait for response - } - if(_restStatus == 3) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to BasicRecoverOKFrame, should work"<buffer(), sendframe->totalSize()); - _restStatus++; - delete sendframe; - // recover async does not wait for response. - } - if(_restStatus == 5) - { - declareQueue(socket, buffer); - _restStatus++; - return; //wait - } - if(_restStatus == 6) - { - waitDeclareQueueOK(buffer, frame); - _restStatus = 20; - } - - if(_restStatus == 20) - { - purgeQueue(socket, buffer); - _restStatus++; - return; - } - - if(_restStatus == 21) - { - waitPurgeQueueOK(buffer, frame); - _restStatus = 7; - } - - - if(_restStatus == 7) - { - auto *sendframe = new AMQP::TransactionSelectFrame(_channel); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _restStatus++; - delete sendframe; - return; // wait for response - } - if(_restStatus == 8) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to TransactionSelectOKFrame, should work"<buffer(), sendframe->totalSize()); - _restStatus++; - delete sendframe; - return; // wait for response - } - if(_restStatus == 10) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to TransactionCommitOKFrame, should work"<buffer(), sendframe->totalSize()); - _restStatus++; - delete sendframe; - return; // wait for response - } - if(_restStatus == 12) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to TransactionRollbackOKFrame, should work"< &frame) - { - int lastDeliveryTag = 0; - while(buffer.length() > 0) - { - if(_basicStatus != 20) - { - frame = AMQP::Frame::decode(buffer.data(), buffer.length()); - buffer.shrink(frame->totalSize()); - } - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to BasicDeliverFrame, should work"<deliveryTag(); - - frame = AMQP::Frame::decode(buffer.data(), buffer.length()); - auto f2 = std::dynamic_pointer_cast(frame); - if(f2 == nullptr){ std::cout<<"Error casting to BasicHeaderFrame, should work"<totalSize()); - - int bytesRead = f2->bodySize(); - - while(bytesRead > 0) - { - frame = AMQP::Frame::decode(buffer.data(), buffer.length()); - auto f3 = std::dynamic_pointer_cast(frame); - if(f3 == nullptr){ std::cout<<"Error casting to BodyFrame, should work"<totalSize()); - - auto payload = f3->payload(); - bytesRead -= payload.size(); - } - } - return lastDeliveryTag; - } - - - - void testBasicFunctionalities(Network::TcpSocket& socket, Network::Buffer& buffer, std::shared_ptr &frame) - { - int lastDeliveryTag = 0; - if(_basicStatus == 0) - { // basic publish - std::cout << "basic testing"; - _basic.sendPublish(socket); - _basicStatus++; - } - - if(_basicStatus == 1) - { // content header - _basic.sendHeader(socket); - _basicStatus++; - - } - if(_basicStatus == 2) - { // content body - for(int i = 0 ; i < 10; i++) { _basic.sendBody(socket); } - _basicStatus++; - - } - if(_basicStatus == 3) - { // basic consume frame - _basic.sendConsume(socket); - _basicStatus++; - return; - } - - - if(_basicStatus == 4) - { - _basic.waitConsumeOK(buffer); - _basicStatus++; - } - - if(_basicStatus == 5) - { // received consumeok frame, let's work on those basic deliver, content header, content body frames. - _basic.waitDeliver(buffer); - _basic.waitHeader(buffer); - _basic.waitBody(buffer); - _basicStatus++; - } - - // done reading all incoming messages, ack them. - if(_basicStatus == 6) - { - _basic.sendAck(socket); - _basicStatus+=2; - } - - if(_basicStatus == 8) - { // send a basic get frame - _basic.sendGet(socket); - _basicStatus++; - return; // wait for response - } - - - if(_basicStatus == 9) - { // expect a basic-get-empty frame (at 5 & 6 we've cleared the queue) - - _basic.waitGetEmpty(buffer); - _basicStatus ++; - } - - - if(_basicStatus == 10) - { // send some more data to the queue, to test basicgetok - for(int i = 0; i < 5; i++){ - _basic.sendPublish(socket); - _basic.sendHeader(socket); - for(int i = 0; i < 10; i++) { _basic.sendBody(socket); } - } - _basicStatus++; - } - - - if(_basicStatus == 11) - { // send a basic get frame - _basic.sendGet(socket); - _basicStatus++; - return; // wait for response - } - - - - if(_basicStatus == 12) - { // expect a basic-get-empty frame (at 5 & 6 we've cleared the queue) - _basic.waitGetOK(buffer); - _basic.waitHeader(buffer); - _basic.waitBody(buffer); - _basicStatus++; - } - - if(_basicStatus == 13) - { // received consumeok frame, let's work on those basic deliver, content header, content body frames. - while(buffer.length() > 0) - { - _basic.waitDeliver(buffer); - _basic.waitHeader(buffer); - _basic.waitBody(buffer); - } - _basicStatus++; - } - - if(_basicStatus == 14) - { - // add some more messages to the queue - _basic.sendPublish(socket); - _basic.sendHeader(socket, 9); - _basic.sendBody(socket); - _basicStatus++; - } - - if(_basicStatus == 15) - { - // channel flow frame - auto *sendframe2 = new AMQP::ChannelFlowFrame(_channel, true); - socket.write(sendframe2->buffer(), sendframe2->totalSize()); - _basicStatus++; - delete sendframe2; - return; // wait for response - } - - - if(_basicStatus == 16) - { - // check if we've received a channelflowokframe - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to channelflowokframe, should work"<totalSize()); - _basicStatus++; - } - - if(_basicStatus == 17) - { - _status++; - std::cout << " ... DONE" << std::endl; - return; - } - - } - - void purgeQueue(Network::TcpSocket& socket, Network::Buffer& buffer) - { - auto *sendframe = new AMQP::QueuePurgeFrame(_channel, _queueName); - socket.write(sendframe->buffer(), sendframe->totalSize()); - delete sendframe; - } - - void waitPurgeQueueOK( Network::Buffer& buffer, std::shared_ptr &frame) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to QueuePurgeOKFrame, should work"<totalSize()); - } - - void deleteAMQP(Network::TcpSocket& socket, Network::Buffer& buffer, std::shared_ptr &frame) - { - // queue deletion stuff - - // for purge testing, let's add a message to the queue (in case it is empty) - if(_deleteStatus == 0) - { // nothing deleted so far - // queue purge - auto *sendframe = new AMQP::QueuePurgeFrame(_channel, _queueName); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _deleteStatus++; - delete sendframe; - return; // wait for response - } - if(_deleteStatus == 1) - { // nothing deleted so far - // queue purge ok - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to QueuePurgeOKFrame, should work"<totalSize()); - - _deleteStatus++; - } - - if(_deleteStatus == 2) - { // nothing deleted so far - std::cout << "cleanup"; - // unbind queue - auto *sendframe = new AMQP::QueueUnbindFrame(_channel, _queueName, _exchangeName); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _deleteStatus++; - delete sendframe; - return; // wait for response - } - if(_deleteStatus == 3) - { // nothing deleted so far - // unbind queue ok - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to QueueUnbindOKFrame, should work"<totalSize()); - - _deleteStatus++; - } - if(_deleteStatus == 4) - { // nothing deleted so far - // queue delete - auto *sendframe = new AMQP::QueueDeleteFrame(_channel, _queueName); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _deleteStatus++; - delete sendframe; - return; // wait for response - } - if(_deleteStatus == 5) - { // nothing deleted so far - //// queue delete ok - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to QueueDeleteOKFrame, should work"<totalSize()); - - _deleteStatus++; - } - - //exchange deletion - if(_deleteStatus == 6) - { // send exchange delete - auto *sendframe = new AMQP::ExchangeDeleteFrame(_channel, _exchangeName); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _deleteStatus++; - delete sendframe; - return; // wait for response - } - if(_deleteStatus == 7) - { // expect exchangedeleteok - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to ExchangeDeleteOKFrame, should work"<totalSize()); - - _deleteStatus++; - } - - // channel close - if(_deleteStatus == 8) - { // send channel close - std::string replyText = "200"; - auto *sendframe = new AMQP::ChannelCloseFrame(_channel, 200, replyText); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _deleteStatus++; - - delete sendframe; - return; // wait for response - } - if(_deleteStatus == 9) - { // expect channelcloseok - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to ChannelCloseOKFrame, should work"<totalSize()); - - _deleteStatus++; - } - - // connection close - if(_deleteStatus == 10) - { // send exchange delete - std::string replyText = "200"; - auto *sendframe = new AMQP::ConnectionCloseFrame(0, 200, replyText); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _deleteStatus++; - - delete sendframe; - return; // wait for response - } - - if(_deleteStatus == 11) - { // expect exchangedeleteok - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to ConnectionCloseOKFrame, should work"<totalSize()); - - _deleteStatus++; - _status++; - - std::cout << " ... DONE" << std::endl; - } - } - - - void declareQueue(Network::TcpSocket& socket, Network::Buffer& buffer) - { - auto *sendframe = new AMQP::QueueDeclareFrame(_channel, _queueName, false, true, false, false, false); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _queueStatus++; - delete sendframe; - } - - void waitDeclareQueueOK(Network::Buffer& buffer, std::shared_ptr &frame) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to QueueDeclareOKFrame, should work"<totalSize()); - _queueStatus++; - } - - void setupQueue(Network::TcpSocket& socket, Network::Buffer& buffer, std::shared_ptr &frame) - { - if(_queueStatus == 0) - { // nothing has been done in the queue world - std::cout << "set up queue " << _queueName; - // send a queue declare frame - declareQueue(socket, buffer); - return; //wait for response - } - if(_queueStatus == 1) - { - waitDeclareQueueOK(buffer, frame); - } - if(_queueStatus == 2) - { - auto *sendframe = new AMQP::QueueBindFrame(_channel, _queueName, _exchangeName); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _queueStatus++; - delete sendframe; - return; // wait for response - } - if(_queueStatus == 3) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to QueueBindOKFrame, should work"<totalSize()); - - _queueStatus++; - _status++; - std::cout << " ... DONE" << std::endl; - } - } - - - - - - - - - - void setupQOS(Network::TcpSocket& socket, Network::Buffer& buffer, std::shared_ptr &frame) - { - if(_qosStatus == 0) - { // nothing has been done in the qos world - std::cout << "set up qos"; - // send the basic qos packet - auto *sendframe = new AMQP::BasicQosFrame(_channel, 0, 3, false); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _qosStatus++; - delete sendframe; - return; // we wait for a response - } - if(_qosStatus == 1) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to basicqosokframe, should work"<totalSize()); - _status++; - std::cout << " ... DONE" << std::endl; - } - } - - - - - - - - - - - - - void setupChannel(Network::TcpSocket& socket, Network::Buffer& buffer, std::shared_ptr &frame) - { - if(_channelStatus == 0) - { // nothing has been done in the channel world. - std::cout << "set up channel"; - // we send the channel open frame - _channel = 1; - auto *sendframe = new AMQP::ChannelOpenFrame(_channel); - socket.write(sendframe->buffer(), sendframe->totalSize()); - _channelStatus++; - - delete sendframe; - return; // we wait for the response - } - if(_channelStatus == 1) - { - // check if we've received a channel open ok response - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to channelopenokframe, should work"<totalSize()); - - _channelStatus++; - // received a response, send a new frame - } - if(_channelStatus == 2) - { - // channel flow frame - auto *sendframe2 = new AMQP::ChannelFlowFrame(_channel, true); - socket.write(sendframe2->buffer(), sendframe2->totalSize()); - _channelStatus++; - delete sendframe2; - return; // wait for response - } - if(_channelStatus == 3) - { - // check if we've received a channelflowokframe - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to channelflowokframe, should work"<totalSize()); - _status++; - _channelStatus++; - - std::cout << " ... DONE" << std::endl; - return; - } - - } - - - - - - - - - - - - void setupExchange(Network::TcpSocket& socket, Network::Buffer& buffer, std::shared_ptr &frame) - { - if(_exchangeStatus == 0) - { // nothing has been done in the exchange world. - std::cout << "set up exchange"; - // create an exchange declare frame - const AMQP::Table *t = new AMQP::Table(); - std::string type = "direct"; - auto *sendframe = new AMQP::ExchangeDeclareFrame(_channel, _exchangeName, type, false, true, false, *t); - - socket.write(sendframe->buffer(), sendframe->totalSize()); - _exchangeStatus++; - - delete t; - delete sendframe; - return; // await response - } - if(_exchangeStatus == 1) - { // expect an exchangedeclareokframe - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr){ std::cout<<"Error casting to exchangedeclareokframe, should work"<totalSize()); - _status++; - _exchangeStatus++; - std::cout << " ... DONE" << std::endl; - return; - } - } - - - - - - - - - - - - - void setupConnection(Network::TcpSocket& socket, Network::Buffer& buffer, std::shared_ptr &frame) - { - // received connectionStart frame, send connectionStartOK frame - if(_connectionStatus == 0) - { - std::cout << "set up connection"; - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr) - { - std::cout << "error casting to connectionstartframe, should work??" << std::endl; - return; - } - buffer.shrink(frame->totalSize()); - AMQP::Table *properties = new AMQP::Table(); - - std::string mechanism = "PLAIN"; - std::string *response = new std::string("\0micha\0micha", 12); - std::string locales = "en_US"; - auto *sendframe = new AMQP::ConnectionStartOKFrame(_channel, *properties, mechanism, *response, locales); - socket.write(sendframe->buffer(), sendframe->totalSize()); - - delete response; - delete sendframe; - delete properties; - _connectionStatus++; - return; // we wait for a response - } - - // received connectionTuneFrame, probably - if(_connectionStatus == 1) - { - auto f = std::dynamic_pointer_cast(frame); - if(f == nullptr) - { - std::cout << "error casting to connectiontuneframe, should work??" << std::endl; - return; - } - uint16_t channelMax = 10; - uint32_t frameMax = 131072; - uint16_t heartbeat = 0; - auto *conopframe = new AMQP::ConnectionTuneOKFrame(_channel, channelMax, frameMax, heartbeat); - socket.write(conopframe->buffer(), conopframe->totalSize()); - _connectionStatus++; - - delete conopframe; - - // we respond to the server, now send a new frame - } - if(_connectionStatus == 2) - { - // after sending the tuneok frame, send the connection open frame - std::string vhost = "/"; - // rest of the fields are deprecated, the constructor handles that for us - - auto *conopframe = new AMQP::ConnectionOpenFrame(_channel, vhost); - socket.write(conopframe->buffer(), conopframe->totalSize()); - _connectionStatus++; - - delete conopframe; - return; // we await a response - } - if(_connectionStatus == 3) - { - // we probably received the connection open ok frame - buffer.shrink(frame->totalSize()); - _status++; - std::cout << " ... DONE" << std::endl; - } - } - - /** - * Method that is called when the internal buffers are emptied and the socket - * is in a writable state again. It is also called after a call to TcpSocket::wait() - */ - virtual void onWritable(Network::TcpSocket *socket) - { - - cout << "socket writable" << endl; - } -}; - -/** - * Main procedure - * @param argc - * @param argv - * @return int - */ -int main(int argc, const char *argv[]) -{ - // our own handler - SocketHandler tcphandler; - - // create a socket - Network::TcpSocket socket(Event::MainLoop::instance(), &tcphandler); - - // connect to the socket - socket.connect(Network::Ipv4Address("127.0.0.1"), 5672); - - std::cout << "start main loop" << std::endl; - - // run the main event loop - Event::MainLoop::instance()->run(); - - std::cout << "exited main loop" << std::endl; - - // done - return 0; -} diff --git a/tests/main.cpp b/tests/main.cpp index 974f32a..5b4025e 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -9,7 +9,7 @@ /** * Global libraries that we need */ -#include +#include #include /** diff --git a/tests/midlevel.cpp b/tests/midlevel.cpp deleted file mode 100644 index b0a0fff..0000000 --- a/tests/midlevel.cpp +++ /dev/null @@ -1,214 +0,0 @@ -#include -#include "../amqp.h" -#include - -using namespace Copernica; -using namespace AMQP; -using namespace Network; - -namespace Copernica { namespace AMQP{ -class TcpSocket : - public AMQP::TcpConnection, - public Network::TcpHandler, - public AMQP::ChannelHandler, - public AMQP::ExchangeHandler, - public Event::TimerHandler -{ -private: - /** - * AMQP connection object - * @var connection - */ - Connection *_connection; - - -public: - /** - * tcpsocket to sent data to, received data from - * @var TcpSocket - */ - Network::TcpSocket *_socket; - - TcpSocket(Event::Loop *loop) - { - _socket = new Network::TcpSocket(loop, this); - } - - /** - * Destructor - */ - virtual ~TcpSocket() {} - - - /** - * TcpConnection implementation - */ - - /** - * Send data to the connection - * - * Note that the AMQP library does not do any output buffering, so the return value - * of this method should always be identical to the size of the buffer! - * - * @param buffer Data to send - * @param size Size of the buffer - * @return Number of bytes actually sent, or -1 on failure - */ - virtual ssize_t send(const char *buffer, size_t size) - { - return _socket->write(buffer, size); - } - - /** - * Close the connection - * @return bool Was the connection closed successully? - */ - virtual bool close() - { - _socket->close(); - } - - - /** ** - * Connection implementation ** - * */ - - virtual void onConnected(Network::TcpSocket *socket) - { - std::cout << "connection started, create connection object" << std::endl; - _connection = new AMQP::Connection(this); - - - Table *t = new Table(); - std::string mechanism = "PLAIN"; - std::string locale="en_US"; - std::string *credentials = new std::string("\0micha\0micha", 12); - std::string vhost = "/"; - _connection->setProperties(*t); - _connection->setMechanism(mechanism); - _connection->setLocale(locale); - _connection->setCredentials(*credentials); - - _connection->setChannelMax(10); - _connection->setFrameMax(10000); - // heartbeat every 5 seconds - _connection->setHeartbeat(5); - - _connection->setVhost(vhost); - } - - virtual void onData(Network::TcpSocket *socket, Network::Buffer *buffer) - { - std::cout << "data received" << std::endl; - size_t bytesRead = _connection->parse(buffer->data(), buffer->length()); - buffer->shrink(bytesRead); - } - - - /** ** - * ChannelHandler method implementations ** - * */ - - /** - * Method that is called when the channel was succesfully created. - * Only after the channel was created, you can use it for subsequent messages over it - * @param channel - */ - virtual void onReady(Channel *channel) - { - Exchange exchange(channel, this); - //exchange.setName("bla"); - //exchange.declare(); - - } - - /** - * Method that is called when an error occured on a channel. - * @param channel - */ - virtual void onError(Channel *channel) - {} - - /** - * Method that is called when the QOS was succesfully set - * @param channel - */ - virtual void onQos(Channel *channel) - {} - - /** - * Method that is called when a rollback has succeeded - * @param channel - */ - virtual void onRollback(Channel *channel) - {} - - - /** ** - * ExchangeHandler method implementations ** - * */ - - /** - * The exchange was correctly declared - * @param exchange - */ - virtual void onDeclared(Exchange *exchange) - { - std::cout << "exchange has been declared" << std::endl; - } - - /** - * The exchange could not be declared - * @param exchange - */ - virtual void onNotDeclared(Exchange *exchange) {} - - /** - * The exchange was correctly deleted - * @param exchange - */ - virtual void onDeleted(Exchange *exchange) {} - - /** - * The exchange could not be removed - * @param exchange - */ - virtual void onNotDeleted(Exchange *exchange) {} - - /** - * The exchange was correctly bound - * @param exchange - */ - virtual void onBound(Exchange *exchange) {} - - /** - * The bind call failed - * @param exchange - */ - virtual void onNotBound(Exchange *exchange) {} - -}; -}} - - - -int main() -{ - Copernica::AMQP::TcpSocket *tcpHandler = new Copernica::AMQP::TcpSocket(Event::MainLoop::instance()); - - //tcpHandler._socket = new Copernica::Network::Socket(Event::MainLoop::instance(), &tcpHandler); - tcpHandler->_socket->connect(Ipv4Address("127.0.0.1"), 5672); - std::cout << "run loop." << std::endl; - Event::MainLoop::instance()->run(); - std::cout << "dropped out of loop" << std::endl; - - //MyIoHandler handler(&socket); - //MyChannelHandler chanHandler; - - //Amqp amqp(tcpconnection, handler); - - - return 0; -} - - diff --git a/tests/myconnection.cpp b/tests/myconnection.cpp index 0b4dd69..17d1c2f 100644 --- a/tests/myconnection.cpp +++ b/tests/myconnection.cpp @@ -7,7 +7,7 @@ /** * Required external libraries */ -#include +#include #include #include @@ -42,8 +42,6 @@ MyConnection::~MyConnection() { // do we still have a channel? if (_channel) delete _channel; - if (_channel2) delete _channel2; - if (_channel3) delete _channel3; // do we still have a connection? if (_connection) delete _connection; @@ -81,8 +79,14 @@ void MyConnection::onConnected(Network::TcpSocket *socket) // we are connected, leap out if there already is a amqp connection if (_connection) return; - // create amqp connection - _connection = new AMQP::Connection(this, AMQP::Login("/", "buggie", "buggie")); + // create amqp connection, and a new channel + _connection = new AMQP::Connection(this, AMQP::Login("guest", "guest"), "/"); + _channel = new AMQP::Channel(_connection, this); + + // we declare a queue, an exchange and we publish a message + _channel->declareQueue("my_queue"); + _channel->declareExchange("my_exchange", AMQP::direct); + _channel->bindQueue("my_exchange", "my_queue", "key"); } /** @@ -93,10 +97,9 @@ void MyConnection::onClosed(Network::TcpSocket *socket) { // show std::cout << "myconnection closed" << std::endl; + // close the channel and connection if (_channel) delete _channel; - if (_channel2) delete _channel2; - if (_channel3) delete _channel3; if (_connection) delete _connection; // set to null @@ -115,8 +118,6 @@ void MyConnection::onLost(Network::TcpSocket *socket) // close the channel and connection if (_channel) delete _channel; - if (_channel2) delete _channel2; - if (_channel3) delete _channel3; if (_connection) delete _connection; // set to null @@ -157,6 +158,13 @@ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer) */ void MyConnection::onData(AMQP::Connection *connection, const char *buffer, size_t size) { +// // report what is going on +// std::cout << "send: " << size << std::endl; +// +// for (unsigned i=0; iid() << std::endl; - - // declare an exchange with the channel id appended to it. - std::string name = "testexchange"; - name += std::to_string(channel->id()); - channel->declareExchange(name); } /** @@ -221,26 +222,11 @@ void MyConnection::onError(AMQP::Channel *channel, const std::string &message) // show std::cout << "AMQP channel error, id: " << (int) channel->id() << " - message: " << message << std::endl; - std::cout << "deleting channel and creating new channel" << std::endl; - // does the channel still exist? - if (channel == _channel) - { - // main channel cause an error, get rid of if - delete _channel; + // main channel cause an error, get rid of if + delete _channel; - // and allocate a new one - _channel = new AMQP::Channel(_connection, this); - } - if (channel == _channel2) - { - delete _channel2; - _channel2 = new AMQP::Channel(_connection, this); - } - if (channel == _channel3) - { - delete channel; - channel = new AMQP::Channel(_connection, this); - } + // reset pointer + _channel = nullptr; } /** @@ -251,7 +237,6 @@ void MyConnection::onPaused(AMQP::Channel *channel) { // show std::cout << "AMQP channel paused" << std::endl; - channel->resume(); } /** @@ -312,24 +297,6 @@ void MyConnection::onExchangeDeclared(AMQP::Channel *channel) { // show std::cout << "AMQP exchange declared" << std::endl; - std::string name = "testexchange"; - name += std::to_string(channel->id()); - - // let's make all channels but 2 and 3 working - if(channel->id() != 2 && channel->id() != 3) - { - channel->bindExchange(name, name, ""); - channel->declareQueue("testqueue"); - } - if(channel->id() == 2) channel->bindQueue("imaginaryQueue", name, ""); - if(channel->id() == 3) channel->bindQueue("testqueue", "imaginaryExchange", ""); - if(channel->id() == 4) channel->close(); - // ĺet's force a connection error - if(channel->id() == 5) - { - std::cout << "force connection error" << std::endl; - // todo: force a connection error. - } } /** @@ -340,7 +307,6 @@ void MyConnection::onExchangeBound(AMQP::Channel *channel) { // show std::cout << "AMQP Exchange bound" << std::endl; - channel->unbindExchange("myexchange", "myexchange", ""); } /** @@ -374,18 +340,6 @@ void MyConnection::onQueueDeclared(AMQP::Channel *channel, const std::string &na { // show std::cout << "AMQP Queue declared" << std::endl; - if(channel->id() == 1) - { - channel->pause(); - channel->unbindQueue(name, "testexchange", ""); - channel->purgeQueue(name); - } else - { - std::string name = "testexchange"; - name += std::to_string(channel->id()); - channel->removeExchange(name); - } - } /** @@ -397,6 +351,9 @@ void MyConnection::onQueueBound(AMQP::Channel *channel) { // show std::cout << "AMQP Queue bound" << std::endl; + + + _channel->publish("my_exchange", "key", "this is the message"); } /** @@ -429,4 +386,5 @@ void MyConnection::onQueuePurged(AMQP::Channel *channel, uint32_t messageCount) // show std::cout << "AMQP Queue purged" << std::endl; channel->removeQueue("testqueue"); -} \ No newline at end of file +} + diff --git a/tests/myconnection.h b/tests/myconnection.h index 11b7b33..d5662d6 100644 --- a/tests/myconnection.h +++ b/tests/myconnection.h @@ -32,8 +32,6 @@ private: * @var Channel */ AMQP::Channel *_channel; - AMQP::Channel *_channel2; - AMQP::Channel *_channel3; /** * Method that is called when the connection failed diff --git a/tests/test.cpp b/tests/test.cpp deleted file mode 100644 index a2acd3f..0000000 --- a/tests/test.cpp +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Test program for the AMQP library - * - * @documentation private - */ - -/** - * Callback function that should be implemented by the calling application - * - * The frame should be deallocated by calling amqp_frame_destroy() after you've - * finished processing it. - * - * @param frame The received frame - * @param cookie Pointer to cookie data - */ -static void my_callback(amqp_frame_t *frame, void *cookie) -{ - AmqpHandler *handler = (AmqpHandler *)cookie; - - // @todo process frame - -} - -/** - * Class that handles IO for the AMQP socket - */ -class AmqpHandler : private Network::TcpHandler -{ -private: - /** - * The TCP socket - * @var Network::TcpSocket - */ - Network::TcpSocket _socket; - - - - - /** - * Method that is called when the connection failed - * @param socket Pointer to the socket - */ - virtual void onFailure(TcpSocket *socket) - { - exit(); - } - - /** - * Method that is called when the connection timed out (which also is a failure - * @param socket Pointer to the socket - */ - virtual void onTimeout(TcpSocket *socket) - { - exit(); - } - - /** - * Method that is called when the connection succeeded - * @param socket Pointer to the socket - */ - virtual void onConnected(TcpSocket *socket) - { - // eerste frame gaan versturen - of wachten op eerste frame - amqp_send_handshake(_handle, "1.0"); - } - - /** - * Method that is called when the socket is closed (as a result of a TcpSocket::close() call) - * @param socket Pointer to the socket - */ - virtual void onClosed(TcpSocket *socket) - { - exit(); - } - - /** - * Method that is called when the peer closed the connection - * @param socket Pointer to the socket - */ - virtual void onLost(TcpSocket *socket) - { - exit(); - } - - /** - * Method that is called when data is received on the socket - * @param socket Pointer to the socket - * @param buffer Pointer to the fill input buffer - */ - virtual void onData(TcpSocket *socket, Buffer *buffer) - { - ssize bytes = amqp_parse_frames(_handle, buffer->data(), buffer->size()); - - if (bytes > 0) buffer->shrink(bytes); - - } - -public: - /** - * Constructor - */ - AmqpHandler() : _socket(Event::MainLoop::instance(), this) - { - _handler = amqp_create_frame_handler(parse_frame, this); - - _handle = amqp_create_message_handler( - } - - /** - * Destructor - */ - virtual ~AmqpHandler() - { - amqp_destroy_frame_parser(_handle); - } -}; - -/** - * Main procedure - * @param argc - * @param argv - * @return integer - */ -int main(int argc, const char *argv[]) -{ - AmqpHandler handler; - - // run the application - Event::MainLoop::instance()->run(); - - return 0; -} -