add the workqueues tutorial

still incomplete because qos is temporarily disabled
This commit is contained in:
Matt Broadstone 2014-06-18 11:06:49 -04:00
parent f5763ac9bb
commit 7ff7719518
3 changed files with 114 additions and 5 deletions

View File

@ -16,6 +16,7 @@ public:
public Q_SLOTS: public Q_SLOTS:
void start() { void start() {
connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected())); connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
connect(&m_client, SIGNAL(disconnected()), qApp, SLOT(quit()));
m_client.connectToHost(); m_client.connectToHost();
} }
@ -33,7 +34,7 @@ private Q_SLOTS:
Exchange *defaultExchange = m_client.createExchange(); Exchange *defaultExchange = m_client.createExchange();
defaultExchange->publish("Hello World!", "hello"); defaultExchange->publish("Hello World!", "hello");
qDebug() << " [x] Sent 'Hello World!'"; qDebug() << " [x] Sent 'Hello World!'";
QTimer::singleShot(25, qApp, SLOT(quit())); m_client.disconnectFromHost();
} }
private: private:

View File

@ -1,9 +1,63 @@
#include <QCoreApplication> #include <QCoreApplication>
#include <QStringList>
#include <QDebug> #include <QDebug>
#include "amqp_client.h"
#include "amqp_exchange.h"
#include "amqp_queue.h"
using namespace QAMQP;
class TaskCreator : public QObject
{
Q_OBJECT
public:
TaskCreator(QObject *parent = 0) : QObject(parent) {}
public Q_SLOTS:
void start() {
connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
connect(&m_client, SIGNAL(disconnected()), qApp, SLOT(quit()));
m_client.connectToHost();
}
private Q_SLOTS:
void clientConnected() {
Queue *queue = m_client.createQueue("task_queue");
connect(queue, SIGNAL(declared()), this, SLOT(queueDeclared()));
queue->declare();
}
void queueDeclared() {
Queue *queue = qobject_cast<Queue*>(sender());
if (!queue)
return;
Exchange *defaultExchange = m_client.createExchange();
MessageProperties properties;
properties[Frame::Content::cpDeliveryMode] = "2"; // make message persistent
QString message;
if (qApp->arguments().size() < 2)
message = "Hello World!";
else
message = qApp->arguments().at(1);
defaultExchange->publish(message, "task_queue", properties);
qDebug(" [x] Sent '%s'", message.toLatin1().constData());
m_client.disconnectFromHost();
}
private:
Client m_client;
};
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
QCoreApplication app(argc, argv); QCoreApplication app(argc, argv);
qDebug() << "testing"; TaskCreator taskCreator;
return EXIT_SUCCESS; taskCreator.start();
return app.exec();
} }
#include "main.moc"

View File

@ -1,9 +1,63 @@
#include <QCoreApplication> #include <QCoreApplication>
#include <QTimer>
#include <QDebug> #include <QDebug>
#include "amqp_client.h"
#include "amqp_queue.h"
using namespace QAMQP;
class Worker : public QObject
{
Q_OBJECT
public:
Worker(QObject *parent = 0) : QObject(parent), m_queue(0) {}
public Q_SLOTS:
void start() {
connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
m_client.connectToHost();
}
private Q_SLOTS:
void clientConnected() {
m_queue = m_client.createQueue("task_queue");
connect(m_queue, SIGNAL(declared()), this, SLOT(queueDeclared()));
m_queue->declare();
qDebug() << " [*] Waiting for messages. To exit press CTRL+C";
}
void queueDeclared() {
// m_queue->setPrefetchCount(1);
m_queue->consume();
connect(m_queue, SIGNAL(messageReceived()), this, SLOT(messageReceived()));
}
void messageReceived() {
m_currentMessage = m_queue->dequeue();
qDebug() << " [x] Received " << m_currentMessage.payload();
int delay = m_currentMessage.payload().count(".") * 1000;
QTimer::singleShot(delay, this, SLOT(ackMessage()));
}
void ackMessage() {
qDebug() << " [x] Done";
m_queue->ack(m_currentMessage);
}
private:
Client m_client;
Queue *m_queue;
Message m_currentMessage;
};
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
QCoreApplication app(argc, argv); QCoreApplication app(argc, argv);
qDebug() << "testing"; Worker worker;
return EXIT_SUCCESS; worker.start();
return app.exec();
} }
#include "main.moc"