From 7ff7719518e02d0d02221d900e4d833336776b7e Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Wed, 18 Jun 2014 11:06:49 -0400 Subject: [PATCH] add the workqueues tutorial still incomplete because qos is temporarily disabled --- tutorials/helloworld/send/main.cpp | 3 +- tutorials/workqueues/new_task/main.cpp | 58 +++++++++++++++++++++++++- tutorials/workqueues/worker/main.cpp | 58 +++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 5 deletions(-) diff --git a/tutorials/helloworld/send/main.cpp b/tutorials/helloworld/send/main.cpp index 73f3661..387ddcf 100644 --- a/tutorials/helloworld/send/main.cpp +++ b/tutorials/helloworld/send/main.cpp @@ -16,6 +16,7 @@ public: public Q_SLOTS: void start() { connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected())); + connect(&m_client, SIGNAL(disconnected()), qApp, SLOT(quit())); m_client.connectToHost(); } @@ -33,7 +34,7 @@ private Q_SLOTS: Exchange *defaultExchange = m_client.createExchange(); defaultExchange->publish("Hello World!", "hello"); qDebug() << " [x] Sent 'Hello World!'"; - QTimer::singleShot(25, qApp, SLOT(quit())); + m_client.disconnectFromHost(); } private: diff --git a/tutorials/workqueues/new_task/main.cpp b/tutorials/workqueues/new_task/main.cpp index 9c5f00c..2a66d0d 100644 --- a/tutorials/workqueues/new_task/main.cpp +++ b/tutorials/workqueues/new_task/main.cpp @@ -1,9 +1,63 @@ #include +#include #include +#include "amqp_client.h" +#include "amqp_exchange.h" +#include "amqp_queue.h" +using namespace QAMQP; + +class TaskCreator : public QObject +{ + Q_OBJECT +public: + TaskCreator(QObject *parent = 0) : QObject(parent) {} + +public Q_SLOTS: + void start() { + connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected())); + connect(&m_client, SIGNAL(disconnected()), qApp, SLOT(quit())); + m_client.connectToHost(); + } + +private Q_SLOTS: + void clientConnected() { + Queue *queue = m_client.createQueue("task_queue"); + connect(queue, SIGNAL(declared()), this, SLOT(queueDeclared())); + queue->declare(); + } + + void queueDeclared() { + Queue *queue = qobject_cast(sender()); + if (!queue) + return; + + Exchange *defaultExchange = m_client.createExchange(); + MessageProperties properties; + properties[Frame::Content::cpDeliveryMode] = "2"; // make message persistent + + QString message; + if (qApp->arguments().size() < 2) + message = "Hello World!"; + else + message = qApp->arguments().at(1); + + defaultExchange->publish(message, "task_queue", properties); + qDebug(" [x] Sent '%s'", message.toLatin1().constData()); + m_client.disconnectFromHost(); + } + +private: + Client m_client; + +}; + int main(int argc, char **argv) { QCoreApplication app(argc, argv); - qDebug() << "testing"; - return EXIT_SUCCESS; + TaskCreator taskCreator; + taskCreator.start(); + return app.exec(); } + +#include "main.moc" diff --git a/tutorials/workqueues/worker/main.cpp b/tutorials/workqueues/worker/main.cpp index 9c5f00c..07c33a9 100644 --- a/tutorials/workqueues/worker/main.cpp +++ b/tutorials/workqueues/worker/main.cpp @@ -1,9 +1,63 @@ #include +#include #include +#include "amqp_client.h" +#include "amqp_queue.h" +using namespace QAMQP; + +class Worker : public QObject +{ + Q_OBJECT +public: + Worker(QObject *parent = 0) : QObject(parent), m_queue(0) {} + +public Q_SLOTS: + void start() { + connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected())); + m_client.connectToHost(); + } + +private Q_SLOTS: + void clientConnected() { + m_queue = m_client.createQueue("task_queue"); + connect(m_queue, SIGNAL(declared()), this, SLOT(queueDeclared())); + m_queue->declare(); + qDebug() << " [*] Waiting for messages. To exit press CTRL+C"; + } + + void queueDeclared() { + // m_queue->setPrefetchCount(1); + m_queue->consume(); + connect(m_queue, SIGNAL(messageReceived()), this, SLOT(messageReceived())); + } + + void messageReceived() { + m_currentMessage = m_queue->dequeue(); + qDebug() << " [x] Received " << m_currentMessage.payload(); + + int delay = m_currentMessage.payload().count(".") * 1000; + QTimer::singleShot(delay, this, SLOT(ackMessage())); + } + + void ackMessage() { + qDebug() << " [x] Done"; + m_queue->ack(m_currentMessage); + } + +private: + Client m_client; + Queue *m_queue; + Message m_currentMessage; + +}; + int main(int argc, char **argv) { QCoreApplication app(argc, argv); - qDebug() << "testing"; - return EXIT_SUCCESS; + Worker worker; + worker.start(); + return app.exec(); } + +#include "main.moc"