From 0aa1d8d36bdcb049d8d58a88e7aafa7cf64569ce Mon Sep 17 00:00:00 2001 From: Michel D'HOOGE Date: Tue, 19 Feb 2013 18:56:16 +0100 Subject: [PATCH] The "Hello World!" tutorial --- qamqp.pro | 19 +++--- src/QamqpApp.h | 122 ++++++++++++++++++++++++++++++++++++++ src/main.cpp | 34 +---------- src/sendreceive/Receive.h | 72 ++++++++++++++++++++++ src/sendreceive/Send.h | 79 ++++++++++++++++++++++++ 5 files changed, 286 insertions(+), 40 deletions(-) create mode 100644 src/QamqpApp.h create mode 100644 src/sendreceive/Receive.h create mode 100644 src/sendreceive/Send.h diff --git a/qamqp.pro b/qamqp.pro index 49ec61a..8842e65 100644 --- a/qamqp.pro +++ b/qamqp.pro @@ -1,14 +1,15 @@ TEMPLATE = app TARGET = qamqp -DEPENDPATH += . \ - src - +DEPENDPATH += . src + INCLUDEPATH += . ./src -# Input -HEADERS += src/test.h +HEADERS += \ + src/QamqpApp.h \ + src/sendreceive/Receive.h \ + src/sendreceive/Send.h \ -SOURCES += src/main.cpp \ - src/test.cpp - -include(src/qamqp/qamqp.pri) \ No newline at end of file +SOURCES += \ + src/main.cpp \ + +include(src/qamqp/qamqp.pri) diff --git a/src/QamqpApp.h b/src/QamqpApp.h new file mode 100644 index 0000000..7e7dc9c --- /dev/null +++ b/src/QamqpApp.h @@ -0,0 +1,122 @@ +#ifndef QAMQPAPP_H +#define QAMQPAPP_H + +#include +#include +#include +#include +#include +#include + +#include "qamqp/amqp.h" +#include "qamqp/amqp_exchange.h" +#include "qamqp/amqp_queue.h" + +#include "sendreceive/Send.h" +#include "sendreceive/Receive.h" + +namespace QAMQP +{ + +namespace samples +{ + +class QamqpApp : public QCoreApplication +{ + Q_OBJECT + + typedef QCoreApplication super; + +public: + explicit QamqpApp(int& argc, char** argv) + : super(argc, argv) + { + QTimer::singleShot(0, this, SLOT(run())); + } + + bool notify(QObject *obj, QEvent *event) + { + try + { + return super::notify(obj, event); + } + catch (std::exception& ex) + { + qWarning() << ex.what(); + return false; + } + } + +protected slots: + void run() + { + QTextStream cout(stdout); + try + { + QStringList args = arguments(); + + if (args.size() == 1 || "-h" == args[1] || "--help" == args[1]) + { + printUsage(cout); + quit(); + return; + } + + QString command = args[1]; + QRunnable* commandImpl = 0; + + if ("send" == command) + { + if (args.size() < 4) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString url = args[2]; + QString msg = args[3]; + commandImpl = new Send(url, msg, this); + } + else if ("receive" == command) + { + if (args.size() < 3) + throw std::runtime_error("Mandatory argument(s) missing!"); + + QString url = args[2]; + commandImpl = new Receive(url, this); + } + else + { + throw std::runtime_error(QString("Unknown command: '%1'").arg(command).toStdString()); + } + + // Run command. + commandImpl->run(); + } + catch (std::exception& ex) + { + qWarning() << ex.what(); + exit(-1); + } + } + +protected: + void printUsage(QTextStream& out) + { + QString executable = arguments().at(0); + out << QString( +"\n\ +USAGE: %1 [-h|--help] -- Show this help message.\n\ +\n\ +USAGE: %1 send -- Send a message.\n\ + %1 receive -- Receive messages.\n\ +\n\ +Send-Receive Sample:\n\ +* Producer: %1 send amqp://guest:guest@127.0.0.1:5672/ \"Hello World\"\n\ +* Consumer: %1 receive amqp://guest:guest@127.0.0.1:5672/\n\ +\n").arg(executable); + } +}; + +} + +} + +#endif // QAMQPAPP_H diff --git a/src/main.cpp b/src/main.cpp index 63109ee..a204a39 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,39 +1,11 @@ - -#include - #include -#include "test.h" -void myMessageOutput(QtMsgType type, const char *msg) -{ - switch (type) { - case QtDebugMsg: +#include "QamqpApp.h" - fprintf(stderr, "# %s\n", msg); - - break; - case QtWarningMsg: - fprintf(stderr, "%s\n", msg); - break; - case QtCriticalMsg: - fprintf(stderr, "Critical: %s\n", msg); - break; - case QtFatalMsg: - fprintf(stderr, "Fatal: %s\n", msg); - abort(); - default: - break; - } -} int main(int argc, char *argv[]) { - qInstallMsgHandler(myMessageOutput); - QCoreApplication a(argc, argv); + QAMQP::samples::QamqpApp qamqpApp(argc, argv); - - Test test[1]; - Q_UNUSED(test); - - return a.exec(); + return qamqpApp.exec(); } diff --git a/src/sendreceive/Receive.h b/src/sendreceive/Receive.h new file mode 100644 index 0000000..7b99e8c --- /dev/null +++ b/src/sendreceive/Receive.h @@ -0,0 +1,72 @@ +#ifndef RECEIVE_H +#define RECEIVE_H + +#include +#include +#include +#include +#include +#include +#include + +#include "qamqp/amqp.h" +#include "qamqp/amqp_queue.h" + + +namespace QAMQP +{ + +namespace samples +{ + +class Receive : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit Receive(const QString& address, QObject* parent) + : super(parent) + { + QAMQP::Client* client = new QAMQP::Client(this); + client->open(QUrl(address)); + + queue_ = client->createQueue("hello"); + queue_->declare(); + connect(queue_, SIGNAL(declared()), this, SLOT(declared())); + connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); + } + + void run() + { + } + +protected slots: + void declared() + { + queue_->consume(); + } + + void newMessage() + { + while (queue_->hasMessage()) + { + QAMQP::MessagePtr message = queue_->getMessage(); + qDebug() << "Receive::newMessage " << message->payload; + if(!queue_->noAck()) + { + queue_->ack(message); + } + } + } + +private: + QAMQP::Queue* queue_; +}; + +} + +} + +#endif // RECEIVE_H diff --git a/src/sendreceive/Send.h b/src/sendreceive/Send.h new file mode 100644 index 0000000..d015663 --- /dev/null +++ b/src/sendreceive/Send.h @@ -0,0 +1,79 @@ +#ifndef SEND_H +#define SEND_H + +#include +#include +#include +#include +#include +#include +#include + +#include "qamqp/amqp.h" +#include "qamqp/amqp_exchange.h" +#include "qamqp/amqp_queue.h" + +namespace QAMQP +{ + +namespace samples +{ + +class Send : public QObject, public QRunnable +{ + Q_OBJECT + + typedef QObject super; + +public: + explicit Send(const QString& address, const QString& sendMsg, QObject *parent) + : super(parent), sendMsg_(sendMsg) + { + // Create AMQP client + QAMQP::Client* client = new QAMQP::Client(this); + client->open(QUrl(address)); + + // Retrieve the "Default" exchange + // No need to declare (i.e. to create), nor to bind to a queue + exchange_ = client->createExchange(); + + // Create the "hello" queue + // This isn't mandatory but if it doesn't exist, the messages are lost + client + ->createQueue("hello", exchange_->channelNumber()) + ->declare(); + } + + void run() + { + QTimer* timer = new QTimer(this); + timer->setInterval(2468); + connect(timer, SIGNAL(timeout()), SLOT(sendMessage())); + timer->start(); + } + +protected slots: + void sendMessage() + { + static quint64 counter = 0; + + QString message(QString("[%1: %2] %3") + .arg(++counter) + .arg(QDateTime::currentDateTime().toString(Qt::ISODate)) + .arg(sendMsg_)); + qDebug() << "Send::sendMessage " << message; + + exchange_->publish(message, "hello"); + } + +private: + QString sendMsg_; + + QAMQP::Exchange* exchange_; +}; + +} + +} + +#endif // SEND_H