The "Routing" Tutorial
This commit is contained in:
parent
1d9e43357b
commit
d4edcb7008
|
|
@ -8,6 +8,8 @@ HEADERS += \
|
||||||
src/QamqpApp.h \
|
src/QamqpApp.h \
|
||||||
src/pubsub/EmitLog.h \
|
src/pubsub/EmitLog.h \
|
||||||
src/pubsub/ReceiveLog.h \
|
src/pubsub/ReceiveLog.h \
|
||||||
|
src/routing/EmitLogDirect.h \
|
||||||
|
src/routing/ReceiveLogDirect.h \
|
||||||
src/sendreceive/Receive.h \
|
src/sendreceive/Receive.h \
|
||||||
src/sendreceive/Send.h \
|
src/sendreceive/Send.h \
|
||||||
src/workqueues/NewTask.h \
|
src/workqueues/NewTask.h \
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,8 @@
|
||||||
|
|
||||||
#include "pubsub/EmitLog.h"
|
#include "pubsub/EmitLog.h"
|
||||||
#include "pubsub/ReceiveLog.h"
|
#include "pubsub/ReceiveLog.h"
|
||||||
|
#include "routing/EmitLogDirect.h"
|
||||||
|
#include "routing/ReceiveLogDirect.h"
|
||||||
#include "sendreceive/Send.h"
|
#include "sendreceive/Send.h"
|
||||||
#include "sendreceive/Receive.h"
|
#include "sendreceive/Receive.h"
|
||||||
#include "workqueues/NewTask.h"
|
#include "workqueues/NewTask.h"
|
||||||
|
|
@ -120,6 +122,24 @@ protected slots:
|
||||||
QString url = args[2];
|
QString url = args[2];
|
||||||
commandImpl = new ReceiveLog(url, this);
|
commandImpl = new ReceiveLog(url, this);
|
||||||
}
|
}
|
||||||
|
else if ("emit_log_direct" == command)
|
||||||
|
{
|
||||||
|
if (args.size() < 4)
|
||||||
|
throw std::runtime_error("Mandatory argument(s) missing!");
|
||||||
|
|
||||||
|
QString url = args[2];
|
||||||
|
QString lst = args[3];
|
||||||
|
commandImpl = new EmitLogDirect(url, lst, this);
|
||||||
|
}
|
||||||
|
else if ("receive_log_direct" == command)
|
||||||
|
{
|
||||||
|
if (args.size() < 4)
|
||||||
|
throw std::runtime_error("Mandatory argument(s) missing!");
|
||||||
|
|
||||||
|
QString url = args[2];
|
||||||
|
QString lst = args[3];
|
||||||
|
commandImpl = new ReceiveLogDirect(url, lst, this);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString());
|
throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString());
|
||||||
|
|
@ -141,10 +161,21 @@ protected:
|
||||||
QString executable = arguments().at(0);
|
QString executable = arguments().at(0);
|
||||||
out << QString(
|
out << QString(
|
||||||
"\n\
|
"\n\
|
||||||
USAGE: %1 [-h|--help] -- Show this help message.\n\
|
USAGE: %1 [-h|--help] -- Show this help message.\n\
|
||||||
\n\
|
\n\
|
||||||
USAGE: %1 send <server-url> <message> -- Send a message.\n\
|
USAGE: %1 send <server-url> <message> -- Send messages.\n\
|
||||||
%1 receive <server-url> -- Receive messages.\n\
|
%1 receive <server-url> -- Receive messages.\n\
|
||||||
|
\n\
|
||||||
|
%1 new_task <server-url> -- Ask for long-running tasks.\n\
|
||||||
|
%1 worker <server-url> -- Execute tasks.\n\
|
||||||
|
\n\
|
||||||
|
%1 emit_log <server-url> -- Publish log messages.\n\
|
||||||
|
%1 receive_log <server-url> -- Subscribe to logs.\n\
|
||||||
|
\n\
|
||||||
|
%1 emit_log_direct <server-url> <comma separated list of categories>\n\
|
||||||
|
-- Publish messages by category.\n\
|
||||||
|
%1 receive_log_direct <server-url> <comma separated list of categories>\n\
|
||||||
|
-- Subscribe to chosen categories.\n\
|
||||||
\n\
|
\n\
|
||||||
Simple \"Hello World!\":\n\
|
Simple \"Hello World!\":\n\
|
||||||
* Producer: %1 send amqp://guest:guest@127.0.0.1:5672/ \"Hello World\"\n\
|
* Producer: %1 send amqp://guest:guest@127.0.0.1:5672/ \"Hello World\"\n\
|
||||||
|
|
@ -157,6 +188,10 @@ Work Queues:\n\
|
||||||
Publish/Subscribe:\n\
|
Publish/Subscribe:\n\
|
||||||
* Producer: %1 emit_log amqp://guest:guest@127.0.0.1:5672/\n\
|
* Producer: %1 emit_log amqp://guest:guest@127.0.0.1:5672/\n\
|
||||||
* Consumer: %1 receive_log amqp://guest:guest@127.0.0.1:5672/\n\
|
* Consumer: %1 receive_log amqp://guest:guest@127.0.0.1:5672/\n\
|
||||||
|
\n\
|
||||||
|
Routing:\n\
|
||||||
|
* Producer: %1 emit_log_direct amqp://guest:guest@127.0.0.1:5672/ red,blue,green\n\
|
||||||
|
* Consumer: %1 receive_log_direct amqp://guest:guest@127.0.0.1:5672/ blue,yellow\n\
|
||||||
\n").arg(executable);
|
\n").arg(executable);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,77 @@
|
||||||
|
#ifndef EMITLOGDIRECT_H
|
||||||
|
#define EMITLOGDIRECT_H
|
||||||
|
|
||||||
|
#include <QObject>
|
||||||
|
#include <QRunnable>
|
||||||
|
#include <QDebug>
|
||||||
|
#include <QStringList>
|
||||||
|
#include <QTimer>
|
||||||
|
#include <QDateTime>
|
||||||
|
|
||||||
|
#include "qamqp/amqp.h"
|
||||||
|
#include "qamqp/amqp_exchange.h"
|
||||||
|
|
||||||
|
namespace QAMQP
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace samples
|
||||||
|
{
|
||||||
|
|
||||||
|
class EmitLogDirect : public QObject, public QRunnable
|
||||||
|
{
|
||||||
|
Q_OBJECT
|
||||||
|
|
||||||
|
typedef QObject super;
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit EmitLogDirect(const QString& address, const QString& list, QObject *parent)
|
||||||
|
: super(parent)
|
||||||
|
, list_(list.split(',', QString::SkipEmptyParts))
|
||||||
|
{
|
||||||
|
// Create AMQP client
|
||||||
|
QAMQP::Client* client = new QAMQP::Client(this);
|
||||||
|
client->open(QUrl(address));
|
||||||
|
|
||||||
|
// Create the "direct_logs" fanout exchange
|
||||||
|
exchange_ = client->createExchange("direct_logs");
|
||||||
|
exchange_->declare("direct");
|
||||||
|
}
|
||||||
|
|
||||||
|
void run()
|
||||||
|
{
|
||||||
|
QTimer* timer = new QTimer(this);
|
||||||
|
timer->setInterval(1000);
|
||||||
|
connect(timer, SIGNAL(timeout()), SLOT(emitLogMessage()));
|
||||||
|
timer->start();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected slots:
|
||||||
|
void emitLogMessage()
|
||||||
|
{
|
||||||
|
static quint64 counter = 0;
|
||||||
|
|
||||||
|
// Choose random key
|
||||||
|
QString key(list_.at(qrand() % list_.size()));
|
||||||
|
|
||||||
|
// Create Message
|
||||||
|
QString message(QString("[%1: %2] %3")
|
||||||
|
.arg(++counter)
|
||||||
|
.arg(QDateTime::currentDateTime().toString(Qt::ISODate))
|
||||||
|
.arg(key)
|
||||||
|
);
|
||||||
|
qDebug() << "EmitLogDirect::emitLogMessage " << message;
|
||||||
|
|
||||||
|
// Publish
|
||||||
|
exchange_->publish(message, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
QStringList list_;
|
||||||
|
QAMQP::Exchange* exchange_;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // EMITLOGDIRECT_H
|
||||||
|
|
@ -0,0 +1,76 @@
|
||||||
|
#ifndef RECEIVELOGDIRECT_H
|
||||||
|
#define RECEIVELOGDIRECT_H
|
||||||
|
|
||||||
|
#include <QObject>
|
||||||
|
#include <QRunnable>
|
||||||
|
#include <QDebug>
|
||||||
|
#include <QStringList>
|
||||||
|
#include <QThread>
|
||||||
|
#include <QTime>
|
||||||
|
|
||||||
|
#include "qamqp/amqp.h"
|
||||||
|
#include "qamqp/amqp_queue.h"
|
||||||
|
|
||||||
|
|
||||||
|
namespace QAMQP
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace samples
|
||||||
|
{
|
||||||
|
|
||||||
|
class ReceiveLogDirect : public QObject, public QRunnable
|
||||||
|
{
|
||||||
|
Q_OBJECT
|
||||||
|
|
||||||
|
typedef QObject super;
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit ReceiveLogDirect(const QString& address, const QString& list, QObject* parent)
|
||||||
|
: super(parent)
|
||||||
|
, list_(list)
|
||||||
|
{
|
||||||
|
// Create AMQP client
|
||||||
|
QAMQP::Client* client = new QAMQP::Client(this);
|
||||||
|
client->open(QUrl(address));
|
||||||
|
|
||||||
|
// Create an exclusive queue
|
||||||
|
queue_ = client->createQueue();
|
||||||
|
queue_->declare("", Queue::Exclusive);
|
||||||
|
|
||||||
|
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
||||||
|
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
|
||||||
|
}
|
||||||
|
|
||||||
|
void run()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
protected slots:
|
||||||
|
void declared()
|
||||||
|
{
|
||||||
|
// Loop on the list to bind with the keys
|
||||||
|
QStringList split(list_.split(',', QString::SkipEmptyParts));
|
||||||
|
for(int i = 0; i < split.size(); ++i)
|
||||||
|
queue_->bind("direct_logs", split.at(i));
|
||||||
|
|
||||||
|
// Start consuming
|
||||||
|
queue_->consume(QAMQP::Queue::coNoAck);
|
||||||
|
}
|
||||||
|
|
||||||
|
void newMessage()
|
||||||
|
{
|
||||||
|
// Retrieve message
|
||||||
|
QAMQP::MessagePtr message = queue_->getMessage();
|
||||||
|
qDebug() << "ReceiveLogDirect::newMessage " << message->payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
QString list_;
|
||||||
|
QAMQP::Queue* queue_;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // RECEIVELOGDIRECT_H
|
||||||
Loading…
Reference in New Issue