From b9a85b30517a17d8d442422265def70dfb9c981c Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Thu, 19 Jun 2014 10:50:40 -0400 Subject: [PATCH] added the routing tutorial --- tutorials/routing/emit_log_direct/main.cpp | 60 +++++++++++++- .../routing/receive_logs_direct/main.cpp | 79 ++++++++++++++++++- 2 files changed, 135 insertions(+), 4 deletions(-) diff --git a/tutorials/routing/emit_log_direct/main.cpp b/tutorials/routing/emit_log_direct/main.cpp index 9c5f00c..07b30ed 100644 --- a/tutorials/routing/emit_log_direct/main.cpp +++ b/tutorials/routing/emit_log_direct/main.cpp @@ -1,9 +1,65 @@ #include +#include #include +#include "amqp_client.h" +#include "amqp_exchange.h" +#include "amqp_queue.h" +using namespace QAMQP; + +class DirectLogEmitter : public QObject +{ + Q_OBJECT +public: + DirectLogEmitter(QObject *parent = 0) : QObject(parent) {} + +public Q_SLOTS: + void start() { + connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected())); + connect(&m_client, SIGNAL(disconnected()), qApp, SLOT(quit())); + m_client.connectToHost(); + } + +private Q_SLOTS: + void clientConnected() { + Exchange *exchange = m_client.createExchange("direct_logs"); + connect(exchange, SIGNAL(declared()), this, SLOT(exchangeDeclared())); + exchange->declare(Exchange::Direct); + } + + void exchangeDeclared() { + Exchange *exchange = qobject_cast(sender()); + if (!exchange) + return; + + QStringList args = qApp->arguments(); + args.takeFirst(); // remove executable name + + QString severity = (args.isEmpty() ? "info" : args.first()); + QString message; + if (args.size() > 1) { + args.takeFirst(); + message = args.join(" "); + } else { + message = "Hello World!"; + } + + exchange->publish(message, severity); + qDebug(" [x] Sent %s:%s", severity.toLatin1().constData(), message.toLatin1().constData()); + m_client.disconnectFromHost(); + } + +private: + Client m_client; + +}; + int main(int argc, char **argv) { QCoreApplication app(argc, argv); - qDebug() << "testing"; - return EXIT_SUCCESS; + DirectLogEmitter logEmitter; + logEmitter.start(); + return app.exec(); } + +#include "main.moc" diff --git a/tutorials/routing/receive_logs_direct/main.cpp b/tutorials/routing/receive_logs_direct/main.cpp index 9c5f00c..8fedab6 100644 --- a/tutorials/routing/receive_logs_direct/main.cpp +++ b/tutorials/routing/receive_logs_direct/main.cpp @@ -1,9 +1,84 @@ #include +#include #include +#include "amqp_client.h" +#include "amqp_exchange.h" +#include "amqp_queue.h" +using namespace QAMQP; + +class LogReceiver : public QObject +{ + Q_OBJECT +public: + LogReceiver(QObject *parent = 0) : QObject(parent) {} + +public Q_SLOTS: + void start(const QStringList &severities) { + m_severities = severities; + connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected())); + m_client.connectToHost(); + } + +private Q_SLOTS: + void clientConnected() { + Exchange *exchange = m_client.createExchange("direct_logs"); + connect(exchange, SIGNAL(declared()), this, SLOT(exchangeDeclared())); + exchange->declare(Exchange::Direct); + } + + void exchangeDeclared() { + Queue *temporaryQueue = m_client.createQueue(); + connect(temporaryQueue, SIGNAL(declared()), this, SLOT(queueDeclared())); + connect(temporaryQueue, SIGNAL(bound()), this, SLOT(queueBound())); + connect(temporaryQueue, SIGNAL(messageReceived()), this, SLOT(messageReceived())); + temporaryQueue->declare(Queue::Exclusive); + } + + void queueDeclared() { + Queue *temporaryQueue = qobject_cast(sender()); + if (!temporaryQueue) + return; + + foreach (QString severity, m_severities) + temporaryQueue->bind("direct_logs", severity); + qDebug() << " [*] Waiting for logs. To exit press CTRL+C"; + } + + void queueBound() { + Queue *temporaryQueue = qobject_cast(sender()); + if (!temporaryQueue) + return; + temporaryQueue->consume(Queue::coNoAck); + } + + void messageReceived() { + Queue *temporaryQueue = qobject_cast(sender()); + if (!temporaryQueue) + return; + + Message message = temporaryQueue->dequeue(); + qDebug() << " [x] " << message.routingKey() << ":" << message.payload(); + } + +private: + Client m_client; + QStringList m_severities; + +}; + int main(int argc, char **argv) { QCoreApplication app(argc, argv); - qDebug() << "testing"; - return EXIT_SUCCESS; + QStringList severities = app.arguments().mid(1); + if (severities.isEmpty()) { + qDebug("usage: %s [info] [warning] [error]", argv[0]); + return 1; + } + + LogReceiver logReceiver; + logReceiver.start(severities); + return app.exec(); } + +#include "main.moc"