diff --git a/qamqp.pro b/qamqp.pro index 3e5521b..0f22db4 100644 --- a/qamqp.pro +++ b/qamqp.pro @@ -8,6 +8,8 @@ HEADERS += \ src/QamqpApp.h \ src/pubsub/EmitLog.h \ src/pubsub/ReceiveLog.h \ + src/routing/EmitLogDirect.h \ + src/routing/ReceiveLogDirect.h \ src/sendreceive/Receive.h \ src/sendreceive/Send.h \ src/workqueues/NewTask.h \ diff --git a/src/QamqpApp.h b/src/QamqpApp.h index 7d05be5..2651e35 100644 --- a/src/QamqpApp.h +++ b/src/QamqpApp.h @@ -15,6 +15,8 @@ #include "pubsub/EmitLog.h" #include "pubsub/ReceiveLog.h" +#include "routing/EmitLogDirect.h" +#include "routing/ReceiveLogDirect.h" #include "sendreceive/Send.h" #include "sendreceive/Receive.h" #include "workqueues/NewTask.h" @@ -120,6 +122,24 @@ protected slots: QString url = args[2]; commandImpl = new ReceiveLog(url, this); } + else if ("emit_log_direct" == command) + { + if (args.size() < 4) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString url = args[2]; + QString lst = args[3]; + commandImpl = new EmitLogDirect(url, lst, this); + } + else if ("receive_log_direct" == command) + { + if (args.size() < 4) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString url = args[2]; + QString lst = args[3]; + commandImpl = new ReceiveLogDirect(url, lst, this); + } else { throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString()); @@ -141,10 +161,21 @@ protected: QString executable = arguments().at(0); out << QString( "\n\ -USAGE: %1 [-h|--help] -- Show this help message.\n\ +USAGE: %1 [-h|--help] -- Show this help message.\n\ \n\ -USAGE: %1 send -- Send a message.\n\ - %1 receive -- Receive messages.\n\ +USAGE: %1 send -- Send messages.\n\ + %1 receive -- Receive messages.\n\ +\n\ + %1 new_task -- Ask for long-running tasks.\n\ + %1 worker -- Execute tasks.\n\ +\n\ + %1 emit_log -- Publish log messages.\n\ + %1 receive_log -- Subscribe to logs.\n\ +\n\ + %1 emit_log_direct \n\ + -- Publish messages by category.\n\ + %1 receive_log_direct \n\ + -- Subscribe to chosen categories.\n\ \n\ Simple \"Hello World!\":\n\ * Producer: %1 send amqp://guest:guest@127.0.0.1:5672/ \"Hello World\"\n\ @@ -157,6 +188,10 @@ Work Queues:\n\ Publish/Subscribe:\n\ * Producer: %1 emit_log amqp://guest:guest@127.0.0.1:5672/\n\ * Consumer: %1 receive_log amqp://guest:guest@127.0.0.1:5672/\n\ +\n\ +Routing:\n\ +* Producer: %1 emit_log_direct amqp://guest:guest@127.0.0.1:5672/ red,blue,green\n\ +* Consumer: %1 receive_log_direct amqp://guest:guest@127.0.0.1:5672/ blue,yellow\n\ \n").arg(executable); } }; diff --git a/src/routing/EmitLogDirect.h b/src/routing/EmitLogDirect.h new file mode 100644 index 0000000..6346430 --- /dev/null +++ b/src/routing/EmitLogDirect.h @@ -0,0 +1,77 @@ +#ifndef EMITLOGDIRECT_H +#define EMITLOGDIRECT_H + +#include +#include +#include +#include +#include +#include + +#include "qamqp/amqp.h" +#include "qamqp/amqp_exchange.h" + +namespace QAMQP +{ + +namespace samples +{ + +class EmitLogDirect : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit EmitLogDirect(const QString& address, const QString& list, QObject *parent) + : super(parent) + , list_(list.split(',', QString::SkipEmptyParts)) + { + // Create AMQP client + QAMQP::Client* client = new QAMQP::Client(this); + client->open(QUrl(address)); + + // Create the "direct_logs" fanout exchange + exchange_ = client->createExchange("direct_logs"); + exchange_->declare("direct"); + } + + void run() + { + QTimer* timer = new QTimer(this); + timer->setInterval(1000); + connect(timer, SIGNAL(timeout()), SLOT(emitLogMessage())); + timer->start(); + } + +protected slots: + void emitLogMessage() + { + static quint64 counter = 0; + + // Choose random key + QString key(list_.at(qrand() % list_.size())); + + // Create Message + QString message(QString("[%1: %2] %3") + .arg(++counter) + .arg(QDateTime::currentDateTime().toString(Qt::ISODate)) + .arg(key) + ); + qDebug() << "EmitLogDirect::emitLogMessage " << message; + + // Publish + exchange_->publish(message, key); + } + +private: + QStringList list_; + QAMQP::Exchange* exchange_; +}; + +} + +} + +#endif // EMITLOGDIRECT_H diff --git a/src/routing/ReceiveLogDirect.h b/src/routing/ReceiveLogDirect.h new file mode 100644 index 0000000..cedbabf --- /dev/null +++ b/src/routing/ReceiveLogDirect.h @@ -0,0 +1,76 @@ +#ifndef RECEIVELOGDIRECT_H +#define RECEIVELOGDIRECT_H + +#include +#include +#include +#include +#include +#include + +#include "qamqp/amqp.h" +#include "qamqp/amqp_queue.h" + + +namespace QAMQP +{ + +namespace samples +{ + +class ReceiveLogDirect : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit ReceiveLogDirect(const QString& address, const QString& list, QObject* parent) + : super(parent) + , list_(list) + { + // Create AMQP client + QAMQP::Client* client = new QAMQP::Client(this); + client->open(QUrl(address)); + + // Create an exclusive queue + queue_ = client->createQueue(); + queue_->declare("", Queue::Exclusive); + + connect(queue_, SIGNAL(declared()), this, SLOT(declared())); + connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); + } + + void run() + { + } + +protected slots: + void declared() + { + // Loop on the list to bind with the keys + QStringList split(list_.split(',', QString::SkipEmptyParts)); + for(int i = 0; i < split.size(); ++i) + queue_->bind("direct_logs", split.at(i)); + + // Start consuming + queue_->consume(QAMQP::Queue::coNoAck); + } + + void newMessage() + { + // Retrieve message + QAMQP::MessagePtr message = queue_->getMessage(); + qDebug() << "ReceiveLogDirect::newMessage " << message->payload; + } + +private: + QString list_; + QAMQP::Queue* queue_; +}; + +} + +} + +#endif // RECEIVELOGDIRECT_H