The "Work Queues" example
This commit is contained in:
parent
2a2dc67f16
commit
2e4530c382
|
|
@ -8,6 +8,8 @@ HEADERS += \
|
||||||
src/QamqpApp.h \
|
src/QamqpApp.h \
|
||||||
src/sendreceive/Receive.h \
|
src/sendreceive/Receive.h \
|
||||||
src/sendreceive/Send.h \
|
src/sendreceive/Send.h \
|
||||||
|
src/workqueues/NewTask.h \
|
||||||
|
src/workqueues/Worker.h \
|
||||||
|
|
||||||
SOURCES += \
|
SOURCES += \
|
||||||
src/main.cpp \
|
src/main.cpp \
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
|
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <QCoreApplication>
|
#include <QCoreApplication>
|
||||||
|
#include <QDateTime>
|
||||||
#include <QDebug>
|
#include <QDebug>
|
||||||
#include <QStringList>
|
#include <QStringList>
|
||||||
#include <QTextStream>
|
#include <QTextStream>
|
||||||
|
|
@ -14,6 +15,8 @@
|
||||||
|
|
||||||
#include "sendreceive/Send.h"
|
#include "sendreceive/Send.h"
|
||||||
#include "sendreceive/Receive.h"
|
#include "sendreceive/Receive.h"
|
||||||
|
#include "workqueues/NewTask.h"
|
||||||
|
#include "workqueues/Worker.h"
|
||||||
|
|
||||||
namespace QAMQP
|
namespace QAMQP
|
||||||
{
|
{
|
||||||
|
|
@ -31,6 +34,7 @@ public:
|
||||||
explicit QamqpApp(int& argc, char** argv)
|
explicit QamqpApp(int& argc, char** argv)
|
||||||
: super(argc, argv)
|
: super(argc, argv)
|
||||||
{
|
{
|
||||||
|
qsrand(QDateTime::currentMSecsSinceEpoch());
|
||||||
QTimer::singleShot(0, this, SLOT(run()));
|
QTimer::singleShot(0, this, SLOT(run()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -77,11 +81,27 @@ protected slots:
|
||||||
else if ("receive" == command)
|
else if ("receive" == command)
|
||||||
{
|
{
|
||||||
if (args.size() < 3)
|
if (args.size() < 3)
|
||||||
throw std::runtime_error("Mandatory argument(s) missing!");
|
throw std::runtime_error("Mandatory argument missing!");
|
||||||
|
|
||||||
QString url = args[2];
|
QString url = args[2];
|
||||||
commandImpl = new Receive(url, this);
|
commandImpl = new Receive(url, this);
|
||||||
}
|
}
|
||||||
|
else if ("new_task" == command)
|
||||||
|
{
|
||||||
|
if (args.size() < 3)
|
||||||
|
throw std::runtime_error("Mandatory argument missing!");
|
||||||
|
|
||||||
|
QString url = args[2];
|
||||||
|
commandImpl = new NewTask(url, this);
|
||||||
|
}
|
||||||
|
else if ("worker" == command)
|
||||||
|
{
|
||||||
|
if (args.size() < 3)
|
||||||
|
throw std::runtime_error("Mandatory argument missing!");
|
||||||
|
|
||||||
|
QString url = args[2];
|
||||||
|
commandImpl = new Worker(url, this);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString());
|
throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString());
|
||||||
|
|
@ -108,9 +128,13 @@ USAGE: %1 [-h|--help] -- Show this help message.\n\
|
||||||
USAGE: %1 send <server-url> <message> -- Send a message.\n\
|
USAGE: %1 send <server-url> <message> -- Send a message.\n\
|
||||||
%1 receive <server-url> -- Receive messages.\n\
|
%1 receive <server-url> -- Receive messages.\n\
|
||||||
\n\
|
\n\
|
||||||
Send-Receive Sample:\n\
|
Simple \"Hello World!\":\n\
|
||||||
* Producer: %1 send amqp://guest:guest@127.0.0.1:5672/ \"Hello World\"\n\
|
* Producer: %1 send amqp://guest:guest@127.0.0.1:5672/ \"Hello World\"\n\
|
||||||
* Consumer: %1 receive amqp://guest:guest@127.0.0.1:5672/\n\
|
* Consumer: %1 receive amqp://guest:guest@127.0.0.1:5672/\n\
|
||||||
|
\n\
|
||||||
|
Work Queues:\n\
|
||||||
|
* Producer: %1 new_task amqp://guest:guest@127.0.0.1:5672/\n\
|
||||||
|
* Consumer: %1 worker amqp://guest:guest@127.0.0.1:5672/\n\
|
||||||
\n").arg(executable);
|
\n").arg(executable);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,76 @@
|
||||||
|
#ifndef NEWTASK_H
|
||||||
|
#define NEWTASK_H
|
||||||
|
|
||||||
|
#include <QObject>
|
||||||
|
#include <QRunnable>
|
||||||
|
#include <QDebug>
|
||||||
|
#include <QTimer>
|
||||||
|
#include <QDateTime>
|
||||||
|
|
||||||
|
#include "qamqp/amqp.h"
|
||||||
|
#include "qamqp/amqp_exchange.h"
|
||||||
|
#include "qamqp/amqp_queue.h"
|
||||||
|
|
||||||
|
namespace QAMQP
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace samples
|
||||||
|
{
|
||||||
|
|
||||||
|
class NewTask : public QObject, public QRunnable
|
||||||
|
{
|
||||||
|
Q_OBJECT
|
||||||
|
|
||||||
|
typedef QObject super;
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit NewTask(const QString& address, QObject *parent)
|
||||||
|
: super(parent)
|
||||||
|
{
|
||||||
|
// Create AMQP client
|
||||||
|
QAMQP::Client* client = new QAMQP::Client(this);
|
||||||
|
client->open(QUrl(address));
|
||||||
|
|
||||||
|
// Retrieve the "Default" exchange
|
||||||
|
exchange_ = client->createExchange();
|
||||||
|
|
||||||
|
// Create the "task_queue" queue, with the "durable" option set
|
||||||
|
queue_ = client->createQueue(exchange_->channelNumber());
|
||||||
|
queue_->declare("task_queue", Queue::Durable);
|
||||||
|
}
|
||||||
|
|
||||||
|
void run()
|
||||||
|
{
|
||||||
|
QTimer* timer = new QTimer(this);
|
||||||
|
timer->setInterval(1000);
|
||||||
|
connect(timer, SIGNAL(timeout()), SLOT(newtaskMessage()));
|
||||||
|
timer->start();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected slots:
|
||||||
|
void newtaskMessage()
|
||||||
|
{
|
||||||
|
static quint64 counter = 0;
|
||||||
|
|
||||||
|
QAMQP::Exchange::MessageProperties properties;
|
||||||
|
properties[QAMQP::Frame::Content::cpDeliveryMode] = 2; // Make message persistent
|
||||||
|
|
||||||
|
QString message(QString("[%1: %2] Hello World! %3")
|
||||||
|
.arg(++counter)
|
||||||
|
.arg(QDateTime::currentDateTime().toString(Qt::ISODate))
|
||||||
|
.arg(QString('.').repeated(qrand() % 10)));
|
||||||
|
qDebug() << "NewTask::newtaskMessage " << message;
|
||||||
|
|
||||||
|
exchange_->publish(message, queue_->name(), properties);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
QAMQP::Exchange* exchange_;
|
||||||
|
QAMQP::Queue* queue_;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // NEWTASK_H
|
||||||
|
|
@ -0,0 +1,73 @@
|
||||||
|
#ifndef WORKER_H
|
||||||
|
#define WORKER_H
|
||||||
|
|
||||||
|
#include <QObject>
|
||||||
|
#include <QRunnable>
|
||||||
|
#include <QDebug>
|
||||||
|
#include <QThread>
|
||||||
|
#include <QTime>
|
||||||
|
|
||||||
|
#include "qamqp/amqp.h"
|
||||||
|
#include "qamqp/amqp_queue.h"
|
||||||
|
|
||||||
|
|
||||||
|
namespace QAMQP
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace samples
|
||||||
|
{
|
||||||
|
|
||||||
|
class Worker : public QObject, public QRunnable
|
||||||
|
{
|
||||||
|
Q_OBJECT
|
||||||
|
|
||||||
|
typedef QObject super;
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit Worker(const QString& address, QObject* parent)
|
||||||
|
: super(parent)
|
||||||
|
{
|
||||||
|
QAMQP::Client* client = new QAMQP::Client(this);
|
||||||
|
client->open(QUrl(address));
|
||||||
|
|
||||||
|
queue_ = client->createQueue();
|
||||||
|
queue_->declare("task_queue", Queue::Durable);
|
||||||
|
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
||||||
|
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
|
||||||
|
}
|
||||||
|
|
||||||
|
void run()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
protected slots:
|
||||||
|
void declared()
|
||||||
|
{
|
||||||
|
queue_->setQOS(0,1);
|
||||||
|
queue_->consume();
|
||||||
|
}
|
||||||
|
|
||||||
|
void newMessage()
|
||||||
|
{
|
||||||
|
// Retrieve message
|
||||||
|
QAMQP::MessagePtr message = queue_->getMessage();
|
||||||
|
qDebug() << "Worker::newMessage " << message->payload;
|
||||||
|
|
||||||
|
// Simulate long processing
|
||||||
|
int wait = message->payload.count('.');
|
||||||
|
QTime dieTime = QTime::currentTime().addMSecs(400 * wait);
|
||||||
|
while( QTime::currentTime() < dieTime );
|
||||||
|
|
||||||
|
// Ack to server
|
||||||
|
queue_->ack(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
QAMQP::Queue* queue_;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // WORKER_H
|
||||||
Loading…
Reference in New Issue