implement rpc example

This is a bit of a contrived example, since I explcitly have left
out a synchronous API for QAMQP, however, it still shows how one can
use the CorrelationID and ReplyTo properties to implement RPC behavior
This commit is contained in:
Matt Broadstone 2015-02-15 16:38:12 -05:00
parent 3c2a039c08
commit 07ad9d8e4f
8 changed files with 233 additions and 8 deletions

View File

@ -0,0 +1,72 @@
#include <QCoreApplication>
#include <QEventLoop>
#include <QUuid>
#include "qamqpclient.h"
#include "qamqpexchange.h"
#include "qamqpqueue.h"
#include "fibonaccirpcclient.h"
FibonacciRpcClient::FibonacciRpcClient(QObject *parent)
: QObject(parent),
m_client(0),
m_responseQueue(0),
m_defaultExchange(0)
{
m_client = new QAmqpClient(this);
connect(m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
}
FibonacciRpcClient::~FibonacciRpcClient()
{
}
bool FibonacciRpcClient::connectToServer()
{
QEventLoop loop;
connect(this, SIGNAL(connected()), &loop, SLOT(quit()));
m_client->connectToHost();
loop.exec();
return m_client->isConnected();
}
void FibonacciRpcClient::call(int number)
{
qDebug() << " [x] Requesting fib(" << number << ")";
m_correlationId = QUuid::createUuid().toString();
QAmqpMessage::PropertyHash properties;
properties.insert(QAmqpMessage::ReplyTo, m_responseQueue->name());
properties.insert(QAmqpMessage::CorrelationId, m_correlationId);
m_defaultExchange->publish(QByteArray::number(number), "rpc_queue", properties);
}
void FibonacciRpcClient::clientConnected()
{
m_responseQueue = m_client->createQueue();
connect(m_responseQueue, SIGNAL(declared()), this, SLOT(queueDeclared()));
connect(m_responseQueue, SIGNAL(messageReceived()), this, SLOT(responseReceived()));
m_responseQueue->declare(QAmqpQueue::Exclusive | QAmqpQueue::AutoDelete);
m_defaultExchange = m_client->createExchange();
}
void FibonacciRpcClient::queueDeclared()
{
m_responseQueue->consume();
Q_EMIT connected();
}
void FibonacciRpcClient::responseReceived()
{
QAmqpMessage message = m_responseQueue->dequeue();
if (message.property(QAmqpMessage::CorrelationId).toString() != m_correlationId) {
// requeue message, it wasn't meant for us
m_responseQueue->reject(message, true);
return;
}
qDebug() << " [.] Got " << message.payload();
qApp->quit();
}

View File

@ -0,0 +1,36 @@
#ifndef FIBONACCIRPCCLIENT_H
#define FIBONACCIRPCCLIENT_H
#include <QObject>
class QAmqpQueue;
class QAmqpExchange;
class QAmqpClient;
class FibonacciRpcClient : public QObject
{
Q_OBJECT
public:
explicit FibonacciRpcClient(QObject *parent = 0);
~FibonacciRpcClient();
Q_SIGNALS:
void connected();
public Q_SLOTS:
bool connectToServer();
void call(int number);
private Q_SLOTS:
void clientConnected();
void queueDeclared();
void responseReceived();
private:
QAmqpClient *m_client;
QAmqpQueue *m_responseQueue;
QAmqpExchange *m_defaultExchange;
QString m_correlationId;
};
#endif // FIBONACCIRPCCLIENT_H

View File

@ -1,9 +1,13 @@
#include <QCoreApplication> #include <QCoreApplication>
#include <QDebug> #include "fibonaccirpcclient.h"
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
QCoreApplication app(argc, argv); QCoreApplication app(argc, argv);
qDebug() << "testing"; FibonacciRpcClient client;
return EXIT_SUCCESS; if (!client.connectToServer())
return EXIT_FAILURE;
client.call(30);
return app.exec();
} }

View File

@ -6,4 +6,8 @@ INCLUDEPATH += $${QAMQP_INCLUDEPATH}
LIBS += -L$${DEPTH}/src $${QAMQP_LIBS} LIBS += -L$${DEPTH}/src $${QAMQP_LIBS}
macx:CONFIG -= app_bundle macx:CONFIG -= app_bundle
SOURCES += main.cpp HEADERS += \
fibonaccirpcclient.h
SOURCES += \
fibonaccirpcclient.cpp \
main.cpp

View File

@ -1,9 +1,10 @@
#include <QCoreApplication> #include <QCoreApplication>
#include <QDebug> #include "server.h"
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
QCoreApplication app(argc, argv); QCoreApplication app(argc, argv);
qDebug() << "testing"; Server server;
return EXIT_SUCCESS; server.listen();
return app.exec();
} }

View File

@ -6,4 +6,8 @@ INCLUDEPATH += $${QAMQP_INCLUDEPATH}
LIBS += -L$${DEPTH}/src $${QAMQP_LIBS} LIBS += -L$${DEPTH}/src $${QAMQP_LIBS}
macx:CONFIG -= app_bundle macx:CONFIG -= app_bundle
SOURCES += main.cpp HEADERS += \
server.h
SOURCES += \
server.cpp \
main.cpp

View File

@ -0,0 +1,71 @@
#include "qamqpclient.h"
#include "qamqpqueue.h"
#include "qamqpexchange.h"
#include "server.h"
Server::Server(QObject *parent)
: QObject(parent),
m_client(0),
m_rpcQueue(0),
m_defaultExchange(0)
{
m_client = new QAmqpClient(this);
connect(m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
}
Server::~Server()
{
}
void Server::listen()
{
m_client->connectToHost();
}
int Server::fib(int n)
{
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
void Server::clientConnected()
{
m_rpcQueue = m_client->createQueue("rpc_queue");
connect(m_rpcQueue, SIGNAL(declared()), this, SLOT(queueDeclared()));
connect(m_rpcQueue, SIGNAL(qosDefined()), this, SLOT(qosDefined()));
connect(m_rpcQueue, SIGNAL(messageReceived()), this, SLOT(processRpcMessage()));
m_rpcQueue->declare();
m_defaultExchange = m_client->createExchange();
}
void Server::queueDeclared()
{
m_rpcQueue->qos(1);
}
void Server::qosDefined()
{
m_rpcQueue->consume();
qDebug() << " [x] Awaiting RPC requests";
}
void Server::processRpcMessage()
{
QAmqpMessage rpcMessage = m_rpcQueue->dequeue();
int n = rpcMessage.payload().toInt();
int response = fib(n);
m_rpcQueue->ack(rpcMessage);
QString replyTo = rpcMessage.property(QAmqpMessage::ReplyTo).toString();
QAmqpMessage::PropertyHash properties;
properties.insert(QAmqpMessage::CorrelationId, rpcMessage.property(QAmqpMessage::CorrelationId));
m_defaultExchange->publish(QByteArray::number(response), replyTo, properties);
}

View File

@ -0,0 +1,33 @@
#ifndef SERVER_H
#define SERVER_H
#include <QObject>
class QAmqpQueue;
class QAmqpExchange;
class QAmqpClient;
class Server : public QObject
{
Q_OBJECT
public:
explicit Server(QObject *parent = 0);
~Server();
public Q_SLOTS:
void listen();
int fib(int n);
private Q_SLOTS:
void clientConnected();
void queueDeclared();
void qosDefined();
void processRpcMessage();
private:
QAmqpClient *m_client;
QAmqpQueue *m_rpcQueue;
QAmqpExchange *m_defaultExchange;
};
#endif // SERVER_H