diff --git a/tests/manual/manual.pro b/tests/manual/manual.pro deleted file mode 100644 index 19b9c30..0000000 --- a/tests/manual/manual.pro +++ /dev/null @@ -1,3 +0,0 @@ -TEMPLATE = subdirs -SUBDIRS = \ - qamqp diff --git a/tests/manual/qamqp/QamqpApp.h b/tests/manual/qamqp/QamqpApp.h deleted file mode 100644 index e816474..0000000 --- a/tests/manual/qamqp/QamqpApp.h +++ /dev/null @@ -1,203 +0,0 @@ -#ifndef QAMQPAPP_H -#define QAMQPAPP_H - -#include -#include -#include -#include -#include -#include -#include - -#include "amqp_client.h" -#include "amqp_exchange.h" -#include "amqp_queue.h" - -#include "pubsub/EmitLog.h" -#include "pubsub/ReceiveLog.h" -#include "routing/EmitLogDirect.h" -#include "routing/ReceiveLogDirect.h" -#include "sendreceive/Send.h" -#include "sendreceive/Receive.h" -#include "workqueues/NewTask.h" -#include "workqueues/Worker.h" - -namespace QAMQP -{ - -namespace samples -{ - -class QamqpApp : public QCoreApplication -{ - Q_OBJECT - - typedef QCoreApplication super; - -public: - explicit QamqpApp(int& argc, char** argv) - : super(argc, argv) - { - qsrand(QDateTime::currentMSecsSinceEpoch()); - QTimer::singleShot(0, this, SLOT(run())); - } - - bool notify(QObject *obj, QEvent *event) - { - try - { - return super::notify(obj, event); - } - catch (std::exception& ex) - { - qWarning() << ex.what(); - return false; - } - } - -protected slots: - void run() - { - QTextStream cout(stdout); - try - { - QStringList args = arguments(); - - if (args.size() == 1 || "-h" == args[1] || "--help" == args[1]) - { - printUsage(cout); - quit(); - return; - } - - QString command = args[1]; - QRunnable* commandImpl = 0; - - if ("send" == command) - { - if (args.size() < 4) - throw std::runtime_error("Mandatory argument(s) missing!"); - - QString url = args[2]; - QString msg = args[3]; - commandImpl = new Send(url, msg, this); - } - else if ("receive" == command) - { - if (args.size() < 3) - throw std::runtime_error("Mandatory argument missing!"); - - QString url = args[2]; - commandImpl = new Receive(url, this); - } - else if ("new_task" == command) - { - if (args.size() < 3) - throw std::runtime_error("Mandatory argument missing!"); - - QString url = args[2]; - commandImpl = new NewTask(url, this); - } - else if ("worker" == command) - { - if (args.size() < 3) - throw std::runtime_error("Mandatory argument missing!"); - - QString url = args[2]; - commandImpl = new Worker(url, this); - } - else if ("emit_log" == command) - { - if (args.size() < 3) - throw std::runtime_error("Mandatory argument missing!"); - - QString url = args[2]; - commandImpl = new EmitLog(url, this); - } - else if ("receive_log" == command) - { - if (args.size() < 3) - throw std::runtime_error("Mandatory argument missing!"); - - QString url = args[2]; - commandImpl = new ReceiveLog(url, this); - } - else if ("emit_log_direct" == command) - { - if (args.size() < 4) - throw std::runtime_error("Mandatory argument(s) missing!"); - - QString url = args[2]; - QString lst = args[3]; - commandImpl = new EmitLogDirect(url, lst, this); - } - else if ("receive_log_direct" == command) - { - if (args.size() < 4) - throw std::runtime_error("Mandatory argument(s) missing!"); - - QString url = args[2]; - QString lst = args[3]; - commandImpl = new ReceiveLogDirect(url, lst, this); - } - else - { - throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString()); - } - - // Run command. - commandImpl->run(); - } - catch (std::exception& ex) - { - qWarning() << ex.what(); - exit(-1); - } - } - -protected: - void printUsage(QTextStream& out) - { - QString executable = arguments().at(0); - out << QString( -"\n\ -USAGE: %1 [-h|--help] -- Show this help message.\n\ -\n\ -USAGE: %1 send -- Send messages.\n\ - %1 receive -- Receive messages.\n\ -\n\ - %1 new_task -- Ask for long-running tasks.\n\ - %1 worker -- Execute tasks.\n\ -\n\ - %1 emit_log -- Publish log messages.\n\ - %1 receive_log -- Subscribe to logs.\n\ -\n\ - %1 emit_log_direct \n\ - -- Publish messages by category.\n\ - %1 receive_log_direct \n\ - -- Subscribe to chosen categories.\n\ -\n\ -Simple \"Hello World!\":\n\ -* Producer: %1 send amqp://guest:guest@127.0.0.1:5672/ \"Hello World\"\n\ -* Consumer: %1 receive amqp://guest:guest@127.0.0.1:5672/\n\ -\n\ -Work Queues:\n\ -* Producer: %1 new_task amqp://guest:guest@127.0.0.1:5672/\n\ -* Consumer: %1 worker amqp://guest:guest@127.0.0.1:5672/\n\ -\n\ -Publish/Subscribe:\n\ -* Producer: %1 emit_log amqp://guest:guest@127.0.0.1:5672/\n\ -* Consumer: %1 receive_log amqp://guest:guest@127.0.0.1:5672/\n\ -\n\ -Routing:\n\ -* Producer: %1 emit_log_direct amqp://guest:guest@127.0.0.1:5672/ red,blue,green\n\ -* Consumer: %1 receive_log_direct amqp://guest:guest@127.0.0.1:5672/ blue,yellow\n\ -\n").arg(executable); - } -}; - -} - -} - -#endif // QAMQPAPP_H diff --git a/tests/manual/qamqp/main.cpp b/tests/manual/qamqp/main.cpp deleted file mode 100644 index c6b9f99..0000000 --- a/tests/manual/qamqp/main.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include - -#include "QamqpApp.h" - -int main(int argc, char *argv[]) -{ - QAMQP::samples::QamqpApp qamqpApp(argc, argv); - - return qamqpApp.exec(); -} diff --git a/tests/manual/qamqp/pubsub/EmitLog.h b/tests/manual/qamqp/pubsub/EmitLog.h deleted file mode 100644 index ac1dcbe..0000000 --- a/tests/manual/qamqp/pubsub/EmitLog.h +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef EMITLOG_H -#define EMITLOG_H - -#include -#include -#include -#include -#include - -#include "amqp_client.h" -#include "amqp_exchange.h" - -namespace QAMQP -{ - -namespace samples -{ - -class EmitLog : public QObject, public QRunnable -{ - Q_OBJECT - - typedef QObject super; - -public: - explicit EmitLog(const QString& address, QObject *parent) - : super(parent) - { - // Create AMQP client - QAMQP::Client* client = new QAMQP::Client(this); - client->connectToHost(address); - - // Create the "logs" fanout exchange - exchange_ = client->createExchange("logs"); - exchange_->declare("fanout"); - } - - void run() - { - QTimer* timer = new QTimer(this); - timer->setInterval(1000); - connect(timer, SIGNAL(timeout()), SLOT(emitlogMessage())); - timer->start(); - } - -protected slots: - void emitlogMessage() - { - static quint64 counter = 0; - - QString message(QString("[%1: %2] Hello World!") - .arg(++counter) - .arg(QDateTime::currentDateTime().toString(Qt::ISODate)) - ); - qDebug() << "EmitLog::emitlogMessage " << message; - - exchange_->publish("receive-log", message); - } - -private: - QAMQP::Exchange* exchange_; -}; - -} - -} - -#endif // EMITLOG_H diff --git a/tests/manual/qamqp/pubsub/ReceiveLog.h b/tests/manual/qamqp/pubsub/ReceiveLog.h deleted file mode 100644 index daca514..0000000 --- a/tests/manual/qamqp/pubsub/ReceiveLog.h +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef RECEIVELOG_H -#define RECEIVELOG_H - -#include -#include -#include -#include -#include - -#include "amqp_client.h" -#include "amqp_queue.h" - -namespace QAMQP -{ - -namespace samples -{ - -class ReceiveLog : public QObject, public QRunnable -{ - Q_OBJECT - - typedef QObject super; - -public: - explicit ReceiveLog(const QString& address, QObject* parent) - : super(parent) - { - // Create AMQP client - QAMQP::Client* client = new QAMQP::Client(this); - client->connectToHost(address); - - // Create an exclusive queue - queue_ = client->createQueue("receive-log"); - queue_->declare(Queue::Exclusive); - - connect(queue_, SIGNAL(declared()), this, SLOT(declared())); - connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); - } - - void run() - { - } - -protected slots: - void declared() - { - // Bind the queue to the "logs" exchange (declared by publisher) - queue_->bind("logs", "receive-log"); - queue_->consume(QAMQP::Queue::coNoAck); - } - - void newMessage() - { - // Retrieve message - QAMQP::Message message = queue_->dequeue(); - qDebug() << "ReceiveLog::newMessage " << message.payload(); - } - -private: - QAMQP::Queue* queue_; -}; - -} - -} - -#endif // RECEIVELOG_H diff --git a/tests/manual/qamqp/qamqp.pro b/tests/manual/qamqp/qamqp.pro deleted file mode 100644 index 2b3de81..0000000 --- a/tests/manual/qamqp/qamqp.pro +++ /dev/null @@ -1,20 +0,0 @@ -DEPTH = ../../.. -include($${DEPTH}/qamqp.pri) -include($${DEPTH}/tests/tests.pri) -CONFIG -= testcase - -TEMPLATE = app -TARGET = qamqp -HEADERS += \ - QamqpApp.h \ - pubsub/EmitLog.h \ - pubsub/ReceiveLog.h \ - routing/EmitLogDirect.h \ - routing/ReceiveLogDirect.h \ - sendreceive/Receive.h \ - sendreceive/Send.h \ - workqueues/NewTask.h \ - workqueues/Worker.h - -SOURCES += \ - main.cpp diff --git a/tests/manual/qamqp/routing/EmitLogDirect.h b/tests/manual/qamqp/routing/EmitLogDirect.h deleted file mode 100644 index eb92c8d..0000000 --- a/tests/manual/qamqp/routing/EmitLogDirect.h +++ /dev/null @@ -1,77 +0,0 @@ -#ifndef EMITLOGDIRECT_H -#define EMITLOGDIRECT_H - -#include -#include -#include -#include -#include -#include - -#include "amqp_client.h" -#include "amqp_exchange.h" - -namespace QAMQP -{ - -namespace samples -{ - -class EmitLogDirect : public QObject, public QRunnable -{ - Q_OBJECT - - typedef QObject super; - -public: - explicit EmitLogDirect(const QString& address, const QString& list, QObject *parent) - : super(parent) - , list_(list.split(',', QString::SkipEmptyParts)) - { - // Create AMQP client - QAMQP::Client* client = new QAMQP::Client(this); - client->connectToHost(address); - - // Create the "direct_logs" fanout exchange - exchange_ = client->createExchange("direct_logs"); - exchange_->declare("direct"); - } - - void run() - { - QTimer* timer = new QTimer(this); - timer->setInterval(1000); - connect(timer, SIGNAL(timeout()), SLOT(emitLogMessage())); - timer->start(); - } - -protected slots: - void emitLogMessage() - { - static quint64 counter = 0; - - // Choose random key - QString key(list_.at(qrand() % list_.size())); - - // Create Message - QString message(QString("[%1: %2] %3") - .arg(++counter) - .arg(QDateTime::currentDateTime().toString(Qt::ISODate)) - .arg(key) - ); - qDebug() << "EmitLogDirect::emitLogMessage " << message; - - // Publish - exchange_->publish(key, message); - } - -private: - QStringList list_; - QAMQP::Exchange* exchange_; -}; - -} - -} - -#endif // EMITLOGDIRECT_H diff --git a/tests/manual/qamqp/routing/ReceiveLogDirect.h b/tests/manual/qamqp/routing/ReceiveLogDirect.h deleted file mode 100644 index 82f7a20..0000000 --- a/tests/manual/qamqp/routing/ReceiveLogDirect.h +++ /dev/null @@ -1,76 +0,0 @@ -#ifndef RECEIVELOGDIRECT_H -#define RECEIVELOGDIRECT_H - -#include -#include -#include -#include -#include -#include - -#include "amqp_client.h" -#include "amqp_queue.h" - - -namespace QAMQP -{ - -namespace samples -{ - -class ReceiveLogDirect : public QObject, public QRunnable -{ - Q_OBJECT - - typedef QObject super; - -public: - explicit ReceiveLogDirect(const QString& address, const QString& list, QObject* parent) - : super(parent) - , list_(list) - { - // Create AMQP client - QAMQP::Client* client = new QAMQP::Client(this); - client->connectToHost(address); - - // Create an exclusive queue - queue_ = client->createQueue("receive-log-direct"); - queue_->declare(Queue::Exclusive); - - connect(queue_, SIGNAL(declared()), this, SLOT(declared())); - connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); - } - - void run() - { - } - -protected slots: - void declared() - { - // Loop on the list to bind with the keys - QStringList split(list_.split(',', QString::SkipEmptyParts)); - for(int i = 0; i < split.size(); ++i) - queue_->bind("direct_logs", split.at(i)); - - // Start consuming - queue_->consume(QAMQP::Queue::coNoAck); - } - - void newMessage() - { - // Retrieve message - QAMQP::Message message = queue_->dequeue(); - qDebug() << "ReceiveLogDirect::newMessage " << message.payload(); - } - -private: - QString list_; - QAMQP::Queue* queue_; -}; - -} - -} - -#endif // RECEIVELOGDIRECT_H diff --git a/tests/manual/qamqp/sendreceive/Receive.h b/tests/manual/qamqp/sendreceive/Receive.h deleted file mode 100644 index ab17ccf..0000000 --- a/tests/manual/qamqp/sendreceive/Receive.h +++ /dev/null @@ -1,69 +0,0 @@ -#ifndef RECEIVE_H -#define RECEIVE_H - -#include -#include -#include -#include -#include -#include -#include - -#include "amqp_client.h" -#include "amqp_queue.h" - - -namespace QAMQP -{ - -namespace samples -{ - -class Receive : public QObject, public QRunnable -{ - Q_OBJECT - - typedef QObject super; - -public: - explicit Receive(const QString& address, QObject* parent) - : super(parent) - { - QAMQP::Client* client = new QAMQP::Client(this); - client->connectToHost(address); - - queue_ = client->createQueue("hello"); - queue_->declare(); - connect(queue_, SIGNAL(declared()), this, SLOT(declared())); - connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); - } - - void run() - { - } - -protected slots: - void declared() - { - queue_->consume(); - } - - void newMessage() - { - while (!queue_->isEmpty()) { - QAMQP::Message message = queue_->dequeue(); - qDebug() << "Receive::newMessage " << message.payload(); - if (!queue_->noAck()) - queue_->ack(message); - } - } - -private: - QAMQP::Queue* queue_; -}; - -} - -} - -#endif // RECEIVE_H diff --git a/tests/manual/qamqp/sendreceive/Send.h b/tests/manual/qamqp/sendreceive/Send.h deleted file mode 100644 index 5b756c7..0000000 --- a/tests/manual/qamqp/sendreceive/Send.h +++ /dev/null @@ -1,79 +0,0 @@ -#ifndef SEND_H -#define SEND_H - -#include -#include -#include -#include -#include -#include -#include - -#include "amqp_client.h" -#include "amqp_exchange.h" -#include "amqp_queue.h" - -namespace QAMQP -{ - -namespace samples -{ - -class Send : public QObject, public QRunnable -{ - Q_OBJECT - - typedef QObject super; - -public: - explicit Send(const QString& address, const QString& sendMsg, QObject *parent) - : super(parent), sendMsg_(sendMsg) - { - // Create AMQP client - QAMQP::Client* client = new QAMQP::Client(this); - client->connectToHost(address); - - // Retrieve the "Default" exchange - // No need to declare (i.e. to create), nor to bind to a queue - exchange_ = client->createExchange(); - - // Create the "hello" queue - // This isn't mandatory but if it doesn't exist, the messages are lost - client - ->createQueue("hello", exchange_->channelNumber()) - ->declare(); - } - - void run() - { - QTimer* timer = new QTimer(this); - timer->setInterval(2468); - connect(timer, SIGNAL(timeout()), SLOT(sendMessage())); - timer->start(); - } - -protected slots: - void sendMessage() - { - static quint64 counter = 0; - - QString message(QString("[%1: %2] %3") - .arg(++counter) - .arg(QDateTime::currentDateTime().toString(Qt::ISODate)) - .arg(sendMsg_)); - qDebug() << "Send::sendMessage " << message; - - exchange_->publish("hello", message); - } - -private: - QString sendMsg_; - - QAMQP::Exchange* exchange_; -}; - -} - -} - -#endif // SEND_H diff --git a/tests/manual/qamqp/workqueues/NewTask.h b/tests/manual/qamqp/workqueues/NewTask.h deleted file mode 100644 index e2eae9a..0000000 --- a/tests/manual/qamqp/workqueues/NewTask.h +++ /dev/null @@ -1,76 +0,0 @@ -#ifndef NEWTASK_H -#define NEWTASK_H - -#include -#include -#include -#include -#include - -#include "amqp_client.h" -#include "amqp_exchange.h" -#include "amqp_queue.h" - -namespace QAMQP -{ - -namespace samples -{ - -class NewTask : public QObject, public QRunnable -{ - Q_OBJECT - - typedef QObject super; - -public: - explicit NewTask(const QString& address, QObject *parent) - : super(parent) - { - // Create AMQP client - QAMQP::Client* client = new QAMQP::Client(this); - client->connectToHost(address); - - // Retrieve the "Default" exchange - exchange_ = client->createExchange(); - - // Create the "task_queue" queue, with the "durable" option set - queue_ = client->createQueue("task_queue", exchange_->channelNumber()); - queue_->declare(Queue::Durable); - } - - void run() - { - QTimer* timer = new QTimer(this); - timer->setInterval(1000); - connect(timer, SIGNAL(timeout()), SLOT(newtaskMessage())); - timer->start(); - } - -protected slots: - void newtaskMessage() - { - static quint64 counter = 0; - - QAMQP::MessageProperties properties; - properties[QAMQP::Frame::Content::cpDeliveryMode] = 2; // Make message persistent - - QString message(QString("[%1: %2] Hello World! %3") - .arg(++counter) - .arg(QDateTime::currentDateTime().toString(Qt::ISODate)) - .arg(QString('.').repeated(qrand() % 10))); - qDebug() << "NewTask::newtaskMessage " << message; - - exchange_->publish(queue_->name(), message, properties); - } - -private: - QAMQP::Exchange* exchange_; - QAMQP::Queue* queue_; -}; - -} - -} - -#endif // NEWTASK_H diff --git a/tests/manual/qamqp/workqueues/Worker.h b/tests/manual/qamqp/workqueues/Worker.h deleted file mode 100644 index 4ac93c5..0000000 --- a/tests/manual/qamqp/workqueues/Worker.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef WORKER_H -#define WORKER_H - -#include -#include -#include -#include -#include - -#include "amqp_client.h" -#include "amqp_queue.h" - -namespace QAMQP -{ - -namespace samples -{ - -class Worker : public QObject, public QRunnable -{ - Q_OBJECT - - typedef QObject super; - -public: - explicit Worker(const QString& address, QObject* parent) - : super(parent) - { - QAMQP::Client* client = new QAMQP::Client(this); - client->connectToHost(address); - - queue_ = client->createQueue("task_queue"); - queue_->declare(Queue::Durable); - connect(queue_, SIGNAL(declared()), this, SLOT(declared())); - connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); - } - - void run() - { - } - -protected slots: - void declared() - { - queue_->setQOS(0,1); - queue_->consume(); - } - - void newMessage() - { - // Retrieve message - QAMQP::Message message = queue_->dequeue(); - qDebug() << "Worker::newMessage " << message.payload(); - - // Simulate long processing - int wait = message.payload().count('.'); - QTime dieTime = QTime::currentTime().addMSecs(400 * wait); - while( QTime::currentTime() < dieTime ); - - // Ack to server - queue_->ack(message); - } - -private: - QAMQP::Queue* queue_; -}; - -} - -} - -#endif // WORKER_H diff --git a/tests/tests.pro b/tests/tests.pro index f544ab9..b8445a7 100644 --- a/tests/tests.pro +++ b/tests/tests.pro @@ -1,4 +1,3 @@ TEMPLATE = subdirs SUBDIRS = \ - auto \ - manual + auto