Merge pull request #15 from mdhooge/master

The next 3 tutorials
This commit is contained in:
Alexey Shcherbakov 2013-02-22 23:36:07 -08:00
commit e56a6afa9d
8 changed files with 531 additions and 5 deletions

View File

@ -6,8 +6,14 @@ INCLUDEPATH += . ./src
HEADERS += \
src/QamqpApp.h \
src/pubsub/EmitLog.h \
src/pubsub/ReceiveLog.h \
src/routing/EmitLogDirect.h \
src/routing/ReceiveLogDirect.h \
src/sendreceive/Receive.h \
src/sendreceive/Send.h \
src/workqueues/NewTask.h \
src/workqueues/Worker.h \
SOURCES += \
src/main.cpp \

View File

@ -3,6 +3,7 @@
#include <stdexcept>
#include <QCoreApplication>
#include <QDateTime>
#include <QDebug>
#include <QStringList>
#include <QTextStream>
@ -12,8 +13,14 @@
#include "qamqp/amqp_exchange.h"
#include "qamqp/amqp_queue.h"
#include "pubsub/EmitLog.h"
#include "pubsub/ReceiveLog.h"
#include "routing/EmitLogDirect.h"
#include "routing/ReceiveLogDirect.h"
#include "sendreceive/Send.h"
#include "sendreceive/Receive.h"
#include "workqueues/NewTask.h"
#include "workqueues/Worker.h"
namespace QAMQP
{
@ -31,6 +38,7 @@ public:
explicit QamqpApp(int& argc, char** argv)
: super(argc, argv)
{
qsrand(QDateTime::currentMSecsSinceEpoch());
QTimer::singleShot(0, this, SLOT(run()));
}
@ -77,11 +85,61 @@ protected slots:
else if ("receive" == command)
{
if (args.size() < 3)
throw std::runtime_error("Mandatory argument(s) missing!");
throw std::runtime_error("Mandatory argument missing!");
QString url = args[2];
commandImpl = new Receive(url, this);
}
else if ("new_task" == command)
{
if (args.size() < 3)
throw std::runtime_error("Mandatory argument missing!");
QString url = args[2];
commandImpl = new NewTask(url, this);
}
else if ("worker" == command)
{
if (args.size() < 3)
throw std::runtime_error("Mandatory argument missing!");
QString url = args[2];
commandImpl = new Worker(url, this);
}
else if ("emit_log" == command)
{
if (args.size() < 3)
throw std::runtime_error("Mandatory argument missing!");
QString url = args[2];
commandImpl = new EmitLog(url, this);
}
else if ("receive_log" == command)
{
if (args.size() < 3)
throw std::runtime_error("Mandatory argument missing!");
QString url = args[2];
commandImpl = new ReceiveLog(url, this);
}
else if ("emit_log_direct" == command)
{
if (args.size() < 4)
throw std::runtime_error("Mandatory argument(s) missing!");
QString url = args[2];
QString lst = args[3];
commandImpl = new EmitLogDirect(url, lst, this);
}
else if ("receive_log_direct" == command)
{
if (args.size() < 4)
throw std::runtime_error("Mandatory argument(s) missing!");
QString url = args[2];
QString lst = args[3];
commandImpl = new ReceiveLogDirect(url, lst, this);
}
else
{
throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString());
@ -103,14 +161,37 @@ protected:
QString executable = arguments().at(0);
out << QString(
"\n\
USAGE: %1 [-h|--help] -- Show this help message.\n\
USAGE: %1 [-h|--help] -- Show this help message.\n\
\n\
USAGE: %1 send <server-url> <message> -- Send a message.\n\
%1 receive <server-url> -- Receive messages.\n\
USAGE: %1 send <server-url> <message> -- Send messages.\n\
%1 receive <server-url> -- Receive messages.\n\
\n\
Send-Receive Sample:\n\
%1 new_task <server-url> -- Ask for long-running tasks.\n\
%1 worker <server-url> -- Execute tasks.\n\
\n\
%1 emit_log <server-url> -- Publish log messages.\n\
%1 receive_log <server-url> -- Subscribe to logs.\n\
\n\
%1 emit_log_direct <server-url> <comma separated list of categories>\n\
-- Publish messages by category.\n\
%1 receive_log_direct <server-url> <comma separated list of categories>\n\
-- Subscribe to chosen categories.\n\
\n\
Simple \"Hello World!\":\n\
* Producer: %1 send amqp://guest:guest@127.0.0.1:5672/ \"Hello World\"\n\
* Consumer: %1 receive amqp://guest:guest@127.0.0.1:5672/\n\
\n\
Work Queues:\n\
* Producer: %1 new_task amqp://guest:guest@127.0.0.1:5672/\n\
* Consumer: %1 worker amqp://guest:guest@127.0.0.1:5672/\n\
\n\
Publish/Subscribe:\n\
* Producer: %1 emit_log amqp://guest:guest@127.0.0.1:5672/\n\
* Consumer: %1 receive_log amqp://guest:guest@127.0.0.1:5672/\n\
\n\
Routing:\n\
* Producer: %1 emit_log_direct amqp://guest:guest@127.0.0.1:5672/ red,blue,green\n\
* Consumer: %1 receive_log_direct amqp://guest:guest@127.0.0.1:5672/ blue,yellow\n\
\n").arg(executable);
}
};

68
src/pubsub/EmitLog.h Normal file
View File

@ -0,0 +1,68 @@
#ifndef EMITLOG_H
#define EMITLOG_H
#include <QObject>
#include <QRunnable>
#include <QDebug>
#include <QTimer>
#include <QDateTime>
#include "qamqp/amqp.h"
#include "qamqp/amqp_exchange.h"
namespace QAMQP
{
namespace samples
{
class EmitLog : public QObject, public QRunnable
{
Q_OBJECT
typedef QObject super;
public:
explicit EmitLog(const QString& address, QObject *parent)
: super(parent)
{
// Create AMQP client
QAMQP::Client* client = new QAMQP::Client(this);
client->open(QUrl(address));
// Create the "logs" fanout exchange
exchange_ = client->createExchange("logs");
exchange_->declare("fanout");
}
void run()
{
QTimer* timer = new QTimer(this);
timer->setInterval(1000);
connect(timer, SIGNAL(timeout()), SLOT(emitlogMessage()));
timer->start();
}
protected slots:
void emitlogMessage()
{
static quint64 counter = 0;
QString message(QString("[%1: %2] Hello World!")
.arg(++counter)
.arg(QDateTime::currentDateTime().toString(Qt::ISODate))
);
qDebug() << "EmitLog::emitlogMessage " << message;
exchange_->publish(message, "");
}
private:
QAMQP::Exchange* exchange_;
};
}
}
#endif // EMITLOG_H

69
src/pubsub/ReceiveLog.h Normal file
View File

@ -0,0 +1,69 @@
#ifndef RECEIVELOG_H
#define RECEIVELOG_H
#include <QObject>
#include <QRunnable>
#include <QDebug>
#include <QThread>
#include <QTime>
#include "qamqp/amqp.h"
#include "qamqp/amqp_queue.h"
namespace QAMQP
{
namespace samples
{
class ReceiveLog : public QObject, public QRunnable
{
Q_OBJECT
typedef QObject super;
public:
explicit ReceiveLog(const QString& address, QObject* parent)
: super(parent)
{
// Create AMQP client
QAMQP::Client* client = new QAMQP::Client(this);
client->open(QUrl(address));
// Create an exclusive queue
queue_ = client->createQueue();
queue_->declare("", Queue::Exclusive);
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
}
void run()
{
}
protected slots:
void declared()
{
// Bind the queue to the "logs" exchange (declared by publisher)
queue_->bind("logs", "");
queue_->consume(QAMQP::Queue::coNoAck);
}
void newMessage()
{
// Retrieve message
QAMQP::MessagePtr message = queue_->getMessage();
qDebug() << "ReceiveLog::newMessage " << message->payload;
}
private:
QAMQP::Queue* queue_;
};
}
}
#endif // RECEIVELOG_H

View File

@ -0,0 +1,77 @@
#ifndef EMITLOGDIRECT_H
#define EMITLOGDIRECT_H
#include <QObject>
#include <QRunnable>
#include <QDebug>
#include <QStringList>
#include <QTimer>
#include <QDateTime>
#include "qamqp/amqp.h"
#include "qamqp/amqp_exchange.h"
namespace QAMQP
{
namespace samples
{
class EmitLogDirect : public QObject, public QRunnable
{
Q_OBJECT
typedef QObject super;
public:
explicit EmitLogDirect(const QString& address, const QString& list, QObject *parent)
: super(parent)
, list_(list.split(',', QString::SkipEmptyParts))
{
// Create AMQP client
QAMQP::Client* client = new QAMQP::Client(this);
client->open(QUrl(address));
// Create the "direct_logs" fanout exchange
exchange_ = client->createExchange("direct_logs");
exchange_->declare("direct");
}
void run()
{
QTimer* timer = new QTimer(this);
timer->setInterval(1000);
connect(timer, SIGNAL(timeout()), SLOT(emitLogMessage()));
timer->start();
}
protected slots:
void emitLogMessage()
{
static quint64 counter = 0;
// Choose random key
QString key(list_.at(qrand() % list_.size()));
// Create Message
QString message(QString("[%1: %2] %3")
.arg(++counter)
.arg(QDateTime::currentDateTime().toString(Qt::ISODate))
.arg(key)
);
qDebug() << "EmitLogDirect::emitLogMessage " << message;
// Publish
exchange_->publish(message, key);
}
private:
QStringList list_;
QAMQP::Exchange* exchange_;
};
}
}
#endif // EMITLOGDIRECT_H

View File

@ -0,0 +1,76 @@
#ifndef RECEIVELOGDIRECT_H
#define RECEIVELOGDIRECT_H
#include <QObject>
#include <QRunnable>
#include <QDebug>
#include <QStringList>
#include <QThread>
#include <QTime>
#include "qamqp/amqp.h"
#include "qamqp/amqp_queue.h"
namespace QAMQP
{
namespace samples
{
class ReceiveLogDirect : public QObject, public QRunnable
{
Q_OBJECT
typedef QObject super;
public:
explicit ReceiveLogDirect(const QString& address, const QString& list, QObject* parent)
: super(parent)
, list_(list)
{
// Create AMQP client
QAMQP::Client* client = new QAMQP::Client(this);
client->open(QUrl(address));
// Create an exclusive queue
queue_ = client->createQueue();
queue_->declare("", Queue::Exclusive);
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
}
void run()
{
}
protected slots:
void declared()
{
// Loop on the list to bind with the keys
QStringList split(list_.split(',', QString::SkipEmptyParts));
for(int i = 0; i < split.size(); ++i)
queue_->bind("direct_logs", split.at(i));
// Start consuming
queue_->consume(QAMQP::Queue::coNoAck);
}
void newMessage()
{
// Retrieve message
QAMQP::MessagePtr message = queue_->getMessage();
qDebug() << "ReceiveLogDirect::newMessage " << message->payload;
}
private:
QString list_;
QAMQP::Queue* queue_;
};
}
}
#endif // RECEIVELOGDIRECT_H

76
src/workqueues/NewTask.h Normal file
View File

@ -0,0 +1,76 @@
#ifndef NEWTASK_H
#define NEWTASK_H
#include <QObject>
#include <QRunnable>
#include <QDebug>
#include <QTimer>
#include <QDateTime>
#include "qamqp/amqp.h"
#include "qamqp/amqp_exchange.h"
#include "qamqp/amqp_queue.h"
namespace QAMQP
{
namespace samples
{
class NewTask : public QObject, public QRunnable
{
Q_OBJECT
typedef QObject super;
public:
explicit NewTask(const QString& address, QObject *parent)
: super(parent)
{
// Create AMQP client
QAMQP::Client* client = new QAMQP::Client(this);
client->open(QUrl(address));
// Retrieve the "Default" exchange
exchange_ = client->createExchange();
// Create the "task_queue" queue, with the "durable" option set
queue_ = client->createQueue(exchange_->channelNumber());
queue_->declare("task_queue", Queue::Durable);
}
void run()
{
QTimer* timer = new QTimer(this);
timer->setInterval(1000);
connect(timer, SIGNAL(timeout()), SLOT(newtaskMessage()));
timer->start();
}
protected slots:
void newtaskMessage()
{
static quint64 counter = 0;
QAMQP::Exchange::MessageProperties properties;
properties[QAMQP::Frame::Content::cpDeliveryMode] = 2; // Make message persistent
QString message(QString("[%1: %2] Hello World! %3")
.arg(++counter)
.arg(QDateTime::currentDateTime().toString(Qt::ISODate))
.arg(QString('.').repeated(qrand() % 10)));
qDebug() << "NewTask::newtaskMessage " << message;
exchange_->publish(message, queue_->name(), properties);
}
private:
QAMQP::Exchange* exchange_;
QAMQP::Queue* queue_;
};
}
}
#endif // NEWTASK_H

73
src/workqueues/Worker.h Normal file
View File

@ -0,0 +1,73 @@
#ifndef WORKER_H
#define WORKER_H
#include <QObject>
#include <QRunnable>
#include <QDebug>
#include <QThread>
#include <QTime>
#include "qamqp/amqp.h"
#include "qamqp/amqp_queue.h"
namespace QAMQP
{
namespace samples
{
class Worker : public QObject, public QRunnable
{
Q_OBJECT
typedef QObject super;
public:
explicit Worker(const QString& address, QObject* parent)
: super(parent)
{
QAMQP::Client* client = new QAMQP::Client(this);
client->open(QUrl(address));
queue_ = client->createQueue();
queue_->declare("task_queue", Queue::Durable);
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
}
void run()
{
}
protected slots:
void declared()
{
queue_->setQOS(0,1);
queue_->consume();
}
void newMessage()
{
// Retrieve message
QAMQP::MessagePtr message = queue_->getMessage();
qDebug() << "Worker::newMessage " << message->payload;
// Simulate long processing
int wait = message->payload.count('.');
QTime dieTime = QTime::currentTime().addMSecs(400 * wait);
while( QTime::currentTime() < dieTime );
// Ack to server
queue_->ack(message);
}
private:
QAMQP::Queue* queue_;
};
}
}
#endif // WORKER_H