diff --git a/tutorials/topics/emit_log_topic/main.cpp b/tutorials/topics/emit_log_topic/main.cpp index 9c5f00c..90bb089 100644 --- a/tutorials/topics/emit_log_topic/main.cpp +++ b/tutorials/topics/emit_log_topic/main.cpp @@ -1,9 +1,65 @@ #include +#include #include +#include "amqp_client.h" +#include "amqp_exchange.h" +#include "amqp_queue.h" +using namespace QAMQP; + +class TopicLogEmitter : public QObject +{ + Q_OBJECT +public: + TopicLogEmitter(QObject *parent = 0) : QObject(parent) {} + +public Q_SLOTS: + void start() { + connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected())); + connect(&m_client, SIGNAL(disconnected()), qApp, SLOT(quit())); + m_client.connectToHost(); + } + +private Q_SLOTS: + void clientConnected() { + Exchange *topic_logs = m_client.createExchange("topic_logs"); + connect(topic_logs, SIGNAL(declared()), this, SLOT(exchangeDeclared())); + topic_logs->declare(Exchange::Topic); + } + + void exchangeDeclared() { + Exchange *topic_logs = qobject_cast(sender()); + if (!topic_logs) + return; + + QStringList args = qApp->arguments(); + args.takeFirst(); // remove executable name + + QString routingKey = (args.isEmpty() ? "anonymous.info" : args.first()); + QString message; + if (args.size() > 1) { + args.takeFirst(); + message = args.join(" "); + } else { + message = "Hello World!"; + } + + topic_logs->publish(message, routingKey); + qDebug(" [x] Sent %s:%s", routingKey.toLatin1().constData(), message.toLatin1().constData()); + m_client.disconnectFromHost(); + } + +private: + Client m_client; + +}; + int main(int argc, char **argv) { QCoreApplication app(argc, argv); - qDebug() << "testing"; - return EXIT_SUCCESS; + TopicLogEmitter logEmitter; + logEmitter.start(); + return app.exec(); } + +#include "main.moc" diff --git a/tutorials/topics/receive_logs_topic/main.cpp b/tutorials/topics/receive_logs_topic/main.cpp index 9c5f00c..41e73fd 100644 --- a/tutorials/topics/receive_logs_topic/main.cpp +++ b/tutorials/topics/receive_logs_topic/main.cpp @@ -1,9 +1,84 @@ #include +#include #include +#include "amqp_client.h" +#include "amqp_exchange.h" +#include "amqp_queue.h" +using namespace QAMQP; + +class TopicLogReceiver : public QObject +{ + Q_OBJECT +public: + TopicLogReceiver(QObject *parent = 0) : QObject(parent) {} + +public Q_SLOTS: + void start(const QStringList &bindingKeys) { + m_bindingKeys = bindingKeys; + connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected())); + m_client.connectToHost(); + } + +private Q_SLOTS: + void clientConnected() { + Exchange *topic_logs = m_client.createExchange("topic_logs"); + connect(topic_logs, SIGNAL(declared()), this, SLOT(exchangeDeclared())); + topic_logs->declare(Exchange::Topic); + } + + void exchangeDeclared() { + Queue *temporaryQueue = m_client.createQueue(); + connect(temporaryQueue, SIGNAL(declared()), this, SLOT(queueDeclared())); + connect(temporaryQueue, SIGNAL(bound()), this, SLOT(queueBound())); + connect(temporaryQueue, SIGNAL(messageReceived()), this, SLOT(messageReceived())); + temporaryQueue->declare(Queue::Exclusive); + } + + void queueDeclared() { + Queue *temporaryQueue = qobject_cast(sender()); + if (!temporaryQueue) + return; + + foreach (QString bindingKey, m_bindingKeys) + temporaryQueue->bind("topic_logs", bindingKey); + qDebug() << " [*] Waiting for logs. To exit press CTRL+C"; + } + + void queueBound() { + Queue *temporaryQueue = qobject_cast(sender()); + if (!temporaryQueue) + return; + temporaryQueue->consume(Queue::coNoAck); + } + + void messageReceived() { + Queue *temporaryQueue = qobject_cast(sender()); + if (!temporaryQueue) + return; + + Message message = temporaryQueue->dequeue(); + qDebug() << " [x] " << message.routingKey() << ":" << message.payload(); + } + +private: + Client m_client; + QStringList m_bindingKeys; + +}; + int main(int argc, char **argv) { QCoreApplication app(argc, argv); - qDebug() << "testing"; - return EXIT_SUCCESS; + QStringList bindingKeys = app.arguments().mid(1); + if (bindingKeys.isEmpty()) { + qDebug("usage: %s [binding_key] ...", argv[0]); + return 1; + } + + TopicLogReceiver logReceiver; + logReceiver.start(bindingKeys); + return app.exec(); } + +#include "main.moc"