The "Publish/Subscribe" Tutorial

This commit is contained in:
Michel D'HOOGE 2013-02-20 17:06:23 +01:00
parent 2e4530c382
commit 1d9e43357b
4 changed files with 161 additions and 0 deletions

View File

@ -6,6 +6,8 @@ INCLUDEPATH += . ./src
HEADERS += \
src/QamqpApp.h \
src/pubsub/EmitLog.h \
src/pubsub/ReceiveLog.h \
src/sendreceive/Receive.h \
src/sendreceive/Send.h \
src/workqueues/NewTask.h \

View File

@ -13,6 +13,8 @@
#include "qamqp/amqp_exchange.h"
#include "qamqp/amqp_queue.h"
#include "pubsub/EmitLog.h"
#include "pubsub/ReceiveLog.h"
#include "sendreceive/Send.h"
#include "sendreceive/Receive.h"
#include "workqueues/NewTask.h"
@ -102,6 +104,22 @@ protected slots:
QString url = args[2];
commandImpl = new Worker(url, this);
}
else if ("emit_log" == command)
{
if (args.size() < 3)
throw std::runtime_error("Mandatory argument missing!");
QString url = args[2];
commandImpl = new EmitLog(url, this);
}
else if ("receive_log" == command)
{
if (args.size() < 3)
throw std::runtime_error("Mandatory argument missing!");
QString url = args[2];
commandImpl = new ReceiveLog(url, this);
}
else
{
throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString());
@ -135,6 +153,10 @@ Simple \"Hello World!\":\n\
Work Queues:\n\
* Producer: %1 new_task amqp://guest:guest@127.0.0.1:5672/\n\
* Consumer: %1 worker amqp://guest:guest@127.0.0.1:5672/\n\
\n\
Publish/Subscribe:\n\
* Producer: %1 emit_log amqp://guest:guest@127.0.0.1:5672/\n\
* Consumer: %1 receive_log amqp://guest:guest@127.0.0.1:5672/\n\
\n").arg(executable);
}
};

68
src/pubsub/EmitLog.h Normal file
View File

@ -0,0 +1,68 @@
#ifndef EMITLOG_H
#define EMITLOG_H
#include <QObject>
#include <QRunnable>
#include <QDebug>
#include <QTimer>
#include <QDateTime>
#include "qamqp/amqp.h"
#include "qamqp/amqp_exchange.h"
namespace QAMQP
{
namespace samples
{
class EmitLog : public QObject, public QRunnable
{
Q_OBJECT
typedef QObject super;
public:
explicit EmitLog(const QString& address, QObject *parent)
: super(parent)
{
// Create AMQP client
QAMQP::Client* client = new QAMQP::Client(this);
client->open(QUrl(address));
// Create the "logs" fanout exchange
exchange_ = client->createExchange("logs");
exchange_->declare("fanout");
}
void run()
{
QTimer* timer = new QTimer(this);
timer->setInterval(1000);
connect(timer, SIGNAL(timeout()), SLOT(emitlogMessage()));
timer->start();
}
protected slots:
void emitlogMessage()
{
static quint64 counter = 0;
QString message(QString("[%1: %2] Hello World!")
.arg(++counter)
.arg(QDateTime::currentDateTime().toString(Qt::ISODate))
);
qDebug() << "EmitLog::emitlogMessage " << message;
exchange_->publish(message, "");
}
private:
QAMQP::Exchange* exchange_;
};
}
}
#endif // EMITLOG_H

69
src/pubsub/ReceiveLog.h Normal file
View File

@ -0,0 +1,69 @@
#ifndef RECEIVELOG_H
#define RECEIVELOG_H
#include <QObject>
#include <QRunnable>
#include <QDebug>
#include <QThread>
#include <QTime>
#include "qamqp/amqp.h"
#include "qamqp/amqp_queue.h"
namespace QAMQP
{
namespace samples
{
class ReceiveLog : public QObject, public QRunnable
{
Q_OBJECT
typedef QObject super;
public:
explicit ReceiveLog(const QString& address, QObject* parent)
: super(parent)
{
// Create AMQP client
QAMQP::Client* client = new QAMQP::Client(this);
client->open(QUrl(address));
// Create an exclusive queue
queue_ = client->createQueue();
queue_->declare("", Queue::Exclusive);
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
}
void run()
{
}
protected slots:
void declared()
{
// Bind the queue to the "logs" exchange (declared by publisher)
queue_->bind("logs", "");
queue_->consume(QAMQP::Queue::coNoAck);
}
void newMessage()
{
// Retrieve message
QAMQP::MessagePtr message = queue_->getMessage();
qDebug() << "ReceiveLog::newMessage " << message->payload;
}
private:
QAMQP::Queue* queue_;
};
}
}
#endif // RECEIVELOG_H