added topics tutorial
This commit is contained in:
parent
b9a85b3051
commit
27d8329065
|
|
@ -1,9 +1,65 @@
|
|||
#include <QCoreApplication>
|
||||
#include <QStringList>
|
||||
#include <QDebug>
|
||||
|
||||
#include "amqp_client.h"
|
||||
#include "amqp_exchange.h"
|
||||
#include "amqp_queue.h"
|
||||
using namespace QAMQP;
|
||||
|
||||
class TopicLogEmitter : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
TopicLogEmitter(QObject *parent = 0) : QObject(parent) {}
|
||||
|
||||
public Q_SLOTS:
|
||||
void start() {
|
||||
connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
|
||||
connect(&m_client, SIGNAL(disconnected()), qApp, SLOT(quit()));
|
||||
m_client.connectToHost();
|
||||
}
|
||||
|
||||
private Q_SLOTS:
|
||||
void clientConnected() {
|
||||
Exchange *topic_logs = m_client.createExchange("topic_logs");
|
||||
connect(topic_logs, SIGNAL(declared()), this, SLOT(exchangeDeclared()));
|
||||
topic_logs->declare(Exchange::Topic);
|
||||
}
|
||||
|
||||
void exchangeDeclared() {
|
||||
Exchange *topic_logs = qobject_cast<Exchange*>(sender());
|
||||
if (!topic_logs)
|
||||
return;
|
||||
|
||||
QStringList args = qApp->arguments();
|
||||
args.takeFirst(); // remove executable name
|
||||
|
||||
QString routingKey = (args.isEmpty() ? "anonymous.info" : args.first());
|
||||
QString message;
|
||||
if (args.size() > 1) {
|
||||
args.takeFirst();
|
||||
message = args.join(" ");
|
||||
} else {
|
||||
message = "Hello World!";
|
||||
}
|
||||
|
||||
topic_logs->publish(message, routingKey);
|
||||
qDebug(" [x] Sent %s:%s", routingKey.toLatin1().constData(), message.toLatin1().constData());
|
||||
m_client.disconnectFromHost();
|
||||
}
|
||||
|
||||
private:
|
||||
Client m_client;
|
||||
|
||||
};
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
QCoreApplication app(argc, argv);
|
||||
qDebug() << "testing";
|
||||
return EXIT_SUCCESS;
|
||||
TopicLogEmitter logEmitter;
|
||||
logEmitter.start();
|
||||
return app.exec();
|
||||
}
|
||||
|
||||
#include "main.moc"
|
||||
|
|
|
|||
|
|
@ -1,9 +1,84 @@
|
|||
#include <QCoreApplication>
|
||||
#include <QStringList>
|
||||
#include <QDebug>
|
||||
|
||||
#include "amqp_client.h"
|
||||
#include "amqp_exchange.h"
|
||||
#include "amqp_queue.h"
|
||||
using namespace QAMQP;
|
||||
|
||||
class TopicLogReceiver : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
TopicLogReceiver(QObject *parent = 0) : QObject(parent) {}
|
||||
|
||||
public Q_SLOTS:
|
||||
void start(const QStringList &bindingKeys) {
|
||||
m_bindingKeys = bindingKeys;
|
||||
connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
|
||||
m_client.connectToHost();
|
||||
}
|
||||
|
||||
private Q_SLOTS:
|
||||
void clientConnected() {
|
||||
Exchange *topic_logs = m_client.createExchange("topic_logs");
|
||||
connect(topic_logs, SIGNAL(declared()), this, SLOT(exchangeDeclared()));
|
||||
topic_logs->declare(Exchange::Topic);
|
||||
}
|
||||
|
||||
void exchangeDeclared() {
|
||||
Queue *temporaryQueue = m_client.createQueue();
|
||||
connect(temporaryQueue, SIGNAL(declared()), this, SLOT(queueDeclared()));
|
||||
connect(temporaryQueue, SIGNAL(bound()), this, SLOT(queueBound()));
|
||||
connect(temporaryQueue, SIGNAL(messageReceived()), this, SLOT(messageReceived()));
|
||||
temporaryQueue->declare(Queue::Exclusive);
|
||||
}
|
||||
|
||||
void queueDeclared() {
|
||||
Queue *temporaryQueue = qobject_cast<Queue*>(sender());
|
||||
if (!temporaryQueue)
|
||||
return;
|
||||
|
||||
foreach (QString bindingKey, m_bindingKeys)
|
||||
temporaryQueue->bind("topic_logs", bindingKey);
|
||||
qDebug() << " [*] Waiting for logs. To exit press CTRL+C";
|
||||
}
|
||||
|
||||
void queueBound() {
|
||||
Queue *temporaryQueue = qobject_cast<Queue*>(sender());
|
||||
if (!temporaryQueue)
|
||||
return;
|
||||
temporaryQueue->consume(Queue::coNoAck);
|
||||
}
|
||||
|
||||
void messageReceived() {
|
||||
Queue *temporaryQueue = qobject_cast<Queue*>(sender());
|
||||
if (!temporaryQueue)
|
||||
return;
|
||||
|
||||
Message message = temporaryQueue->dequeue();
|
||||
qDebug() << " [x] " << message.routingKey() << ":" << message.payload();
|
||||
}
|
||||
|
||||
private:
|
||||
Client m_client;
|
||||
QStringList m_bindingKeys;
|
||||
|
||||
};
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
QCoreApplication app(argc, argv);
|
||||
qDebug() << "testing";
|
||||
return EXIT_SUCCESS;
|
||||
QStringList bindingKeys = app.arguments().mid(1);
|
||||
if (bindingKeys.isEmpty()) {
|
||||
qDebug("usage: %s [binding_key] ...", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
TopicLogReceiver logReceiver;
|
||||
logReceiver.start(bindingKeys);
|
||||
return app.exec();
|
||||
}
|
||||
|
||||
#include "main.moc"
|
||||
|
|
|
|||
Loading…
Reference in New Issue