From 2e4530c38258d63e21f69814a862aa8a41434403 Mon Sep 17 00:00:00 2001 From: Michel D'HOOGE Date: Wed, 20 Feb 2013 16:26:10 +0100 Subject: [PATCH] The "Work Queues" example --- qamqp.pro | 2 ++ src/QamqpApp.h | 28 +++++++++++++-- src/workqueues/NewTask.h | 76 ++++++++++++++++++++++++++++++++++++++++ src/workqueues/Worker.h | 73 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 177 insertions(+), 2 deletions(-) create mode 100644 src/workqueues/NewTask.h create mode 100644 src/workqueues/Worker.h diff --git a/qamqp.pro b/qamqp.pro index 8842e65..97ac9d7 100644 --- a/qamqp.pro +++ b/qamqp.pro @@ -8,6 +8,8 @@ HEADERS += \ src/QamqpApp.h \ src/sendreceive/Receive.h \ src/sendreceive/Send.h \ + src/workqueues/NewTask.h \ + src/workqueues/Worker.h \ SOURCES += \ src/main.cpp \ diff --git a/src/QamqpApp.h b/src/QamqpApp.h index 7e7dc9c..773c382 100644 --- a/src/QamqpApp.h +++ b/src/QamqpApp.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -14,6 +15,8 @@ #include "sendreceive/Send.h" #include "sendreceive/Receive.h" +#include "workqueues/NewTask.h" +#include "workqueues/Worker.h" namespace QAMQP { @@ -31,6 +34,7 @@ public: explicit QamqpApp(int& argc, char** argv) : super(argc, argv) { + qsrand(QDateTime::currentMSecsSinceEpoch()); QTimer::singleShot(0, this, SLOT(run())); } @@ -77,11 +81,27 @@ protected slots: else if ("receive" == command) { if (args.size() < 3) - throw std::runtime_error("Mandatory argument(s) missing!"); + throw std::runtime_error("Mandatory argument missing!"); QString url = args[2]; commandImpl = new Receive(url, this); } + else if ("new_task" == command) + { + if (args.size() < 3) + throw std::runtime_error("Mandatory argument missing!"); + + QString url = args[2]; + commandImpl = new NewTask(url, this); + } + else if ("worker" == command) + { + if (args.size() < 3) + throw std::runtime_error("Mandatory argument missing!"); + + QString url = args[2]; + commandImpl = new Worker(url, this); + } else { throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString()); @@ -108,9 +128,13 @@ USAGE: %1 [-h|--help] -- Show this help message.\n\ USAGE: %1 send -- Send a message.\n\ %1 receive -- Receive messages.\n\ \n\ -Send-Receive Sample:\n\ +Simple \"Hello World!\":\n\ * Producer: %1 send amqp://guest:guest@127.0.0.1:5672/ \"Hello World\"\n\ * Consumer: %1 receive amqp://guest:guest@127.0.0.1:5672/\n\ +\n\ +Work Queues:\n\ +* Producer: %1 new_task amqp://guest:guest@127.0.0.1:5672/\n\ +* Consumer: %1 worker amqp://guest:guest@127.0.0.1:5672/\n\ \n").arg(executable); } }; diff --git a/src/workqueues/NewTask.h b/src/workqueues/NewTask.h new file mode 100644 index 0000000..d6c6d4e --- /dev/null +++ b/src/workqueues/NewTask.h @@ -0,0 +1,76 @@ +#ifndef NEWTASK_H +#define NEWTASK_H + +#include +#include +#include +#include +#include + +#include "qamqp/amqp.h" +#include "qamqp/amqp_exchange.h" +#include "qamqp/amqp_queue.h" + +namespace QAMQP +{ + +namespace samples +{ + +class NewTask : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit NewTask(const QString& address, QObject *parent) + : super(parent) + { + // Create AMQP client + QAMQP::Client* client = new QAMQP::Client(this); + client->open(QUrl(address)); + + // Retrieve the "Default" exchange + exchange_ = client->createExchange(); + + // Create the "task_queue" queue, with the "durable" option set + queue_ = client->createQueue(exchange_->channelNumber()); + queue_->declare("task_queue", Queue::Durable); + } + + void run() + { + QTimer* timer = new QTimer(this); + timer->setInterval(1000); + connect(timer, SIGNAL(timeout()), SLOT(newtaskMessage())); + timer->start(); + } + +protected slots: + void newtaskMessage() + { + static quint64 counter = 0; + + QAMQP::Exchange::MessageProperties properties; + properties[QAMQP::Frame::Content::cpDeliveryMode] = 2; // Make message persistent + + QString message(QString("[%1: %2] Hello World! %3") + .arg(++counter) + .arg(QDateTime::currentDateTime().toString(Qt::ISODate)) + .arg(QString('.').repeated(qrand() % 10))); + qDebug() << "NewTask::newtaskMessage " << message; + + exchange_->publish(message, queue_->name(), properties); + } + +private: + QAMQP::Exchange* exchange_; + QAMQP::Queue* queue_; +}; + +} + +} + +#endif // NEWTASK_H diff --git a/src/workqueues/Worker.h b/src/workqueues/Worker.h new file mode 100644 index 0000000..eadd373 --- /dev/null +++ b/src/workqueues/Worker.h @@ -0,0 +1,73 @@ +#ifndef WORKER_H +#define WORKER_H + +#include +#include +#include +#include +#include + +#include "qamqp/amqp.h" +#include "qamqp/amqp_queue.h" + + +namespace QAMQP +{ + +namespace samples +{ + +class Worker : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit Worker(const QString& address, QObject* parent) + : super(parent) + { + QAMQP::Client* client = new QAMQP::Client(this); + client->open(QUrl(address)); + + queue_ = client->createQueue(); + queue_->declare("task_queue", Queue::Durable); + connect(queue_, SIGNAL(declared()), this, SLOT(declared())); + connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); + } + + void run() + { + } + +protected slots: + void declared() + { + queue_->setQOS(0,1); + queue_->consume(); + } + + void newMessage() + { + // Retrieve message + QAMQP::MessagePtr message = queue_->getMessage(); + qDebug() << "Worker::newMessage " << message->payload; + + // Simulate long processing + int wait = message->payload.count('.'); + QTime dieTime = QTime::currentTime().addMSecs(400 * wait); + while( QTime::currentTime() < dieTime ); + + // Ack to server + queue_->ack(message); + } + +private: + QAMQP::Queue* queue_; +}; + +} + +} + +#endif // WORKER_H