From 1d9e43357bff370f4856ea2a35f497bee796376a Mon Sep 17 00:00:00 2001 From: Michel D'HOOGE Date: Wed, 20 Feb 2013 17:06:23 +0100 Subject: [PATCH] The "Publish/Subscribe" Tutorial --- qamqp.pro | 2 ++ src/QamqpApp.h | 22 +++++++++++++ src/pubsub/EmitLog.h | 68 ++++++++++++++++++++++++++++++++++++++++ src/pubsub/ReceiveLog.h | 69 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 161 insertions(+) create mode 100644 src/pubsub/EmitLog.h create mode 100644 src/pubsub/ReceiveLog.h diff --git a/qamqp.pro b/qamqp.pro index 97ac9d7..3e5521b 100644 --- a/qamqp.pro +++ b/qamqp.pro @@ -6,6 +6,8 @@ INCLUDEPATH += . ./src HEADERS += \ src/QamqpApp.h \ + src/pubsub/EmitLog.h \ + src/pubsub/ReceiveLog.h \ src/sendreceive/Receive.h \ src/sendreceive/Send.h \ src/workqueues/NewTask.h \ diff --git a/src/QamqpApp.h b/src/QamqpApp.h index 773c382..7d05be5 100644 --- a/src/QamqpApp.h +++ b/src/QamqpApp.h @@ -13,6 +13,8 @@ #include "qamqp/amqp_exchange.h" #include "qamqp/amqp_queue.h" +#include "pubsub/EmitLog.h" +#include "pubsub/ReceiveLog.h" #include "sendreceive/Send.h" #include "sendreceive/Receive.h" #include "workqueues/NewTask.h" @@ -102,6 +104,22 @@ protected slots: QString url = args[2]; commandImpl = new Worker(url, this); } + else if ("emit_log" == command) + { + if (args.size() < 3) + throw std::runtime_error("Mandatory argument missing!"); + + QString url = args[2]; + commandImpl = new EmitLog(url, this); + } + else if ("receive_log" == command) + { + if (args.size() < 3) + throw std::runtime_error("Mandatory argument missing!"); + + QString url = args[2]; + commandImpl = new ReceiveLog(url, this); + } else { throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString()); @@ -135,6 +153,10 @@ Simple \"Hello World!\":\n\ Work Queues:\n\ * Producer: %1 new_task amqp://guest:guest@127.0.0.1:5672/\n\ * Consumer: %1 worker amqp://guest:guest@127.0.0.1:5672/\n\ +\n\ +Publish/Subscribe:\n\ +* Producer: %1 emit_log amqp://guest:guest@127.0.0.1:5672/\n\ +* Consumer: %1 receive_log amqp://guest:guest@127.0.0.1:5672/\n\ \n").arg(executable); } }; diff --git a/src/pubsub/EmitLog.h b/src/pubsub/EmitLog.h new file mode 100644 index 0000000..6d4dde6 --- /dev/null +++ b/src/pubsub/EmitLog.h @@ -0,0 +1,68 @@ +#ifndef EMITLOG_H +#define EMITLOG_H + +#include +#include +#include +#include +#include + +#include "qamqp/amqp.h" +#include "qamqp/amqp_exchange.h" + +namespace QAMQP +{ + +namespace samples +{ + +class EmitLog : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit EmitLog(const QString& address, QObject *parent) + : super(parent) + { + // Create AMQP client + QAMQP::Client* client = new QAMQP::Client(this); + client->open(QUrl(address)); + + // Create the "logs" fanout exchange + exchange_ = client->createExchange("logs"); + exchange_->declare("fanout"); + } + + void run() + { + QTimer* timer = new QTimer(this); + timer->setInterval(1000); + connect(timer, SIGNAL(timeout()), SLOT(emitlogMessage())); + timer->start(); + } + +protected slots: + void emitlogMessage() + { + static quint64 counter = 0; + + QString message(QString("[%1: %2] Hello World!") + .arg(++counter) + .arg(QDateTime::currentDateTime().toString(Qt::ISODate)) + ); + qDebug() << "EmitLog::emitlogMessage " << message; + + exchange_->publish(message, ""); + } + +private: + QAMQP::Exchange* exchange_; +}; + +} + +} + +#endif // EMITLOG_H diff --git a/src/pubsub/ReceiveLog.h b/src/pubsub/ReceiveLog.h new file mode 100644 index 0000000..5993f68 --- /dev/null +++ b/src/pubsub/ReceiveLog.h @@ -0,0 +1,69 @@ +#ifndef RECEIVELOG_H +#define RECEIVELOG_H + +#include +#include +#include +#include +#include + +#include "qamqp/amqp.h" +#include "qamqp/amqp_queue.h" + + +namespace QAMQP +{ + +namespace samples +{ + +class ReceiveLog : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit ReceiveLog(const QString& address, QObject* parent) + : super(parent) + { + // Create AMQP client + QAMQP::Client* client = new QAMQP::Client(this); + client->open(QUrl(address)); + + // Create an exclusive queue + queue_ = client->createQueue(); + queue_->declare("", Queue::Exclusive); + + connect(queue_, SIGNAL(declared()), this, SLOT(declared())); + connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); + } + + void run() + { + } + +protected slots: + void declared() + { + // Bind the queue to the "logs" exchange (declared by publisher) + queue_->bind("logs", ""); + queue_->consume(QAMQP::Queue::coNoAck); + } + + void newMessage() + { + // Retrieve message + QAMQP::MessagePtr message = queue_->getMessage(); + qDebug() << "ReceiveLog::newMessage " << message->payload; + } + +private: + QAMQP::Queue* queue_; +}; + +} + +} + +#endif // RECEIVELOG_H