From 07ad9d8e4f9de214d57918bb0c1b2945a8b825d5 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Sun, 15 Feb 2015 16:38:12 -0500 Subject: [PATCH] implement rpc example This is a bit of a contrived example, since I explcitly have left out a synchronous API for QAMQP, however, it still shows how one can use the CorrelationID and ReplyTo properties to implement RPC behavior --- .../rpc/rpc_client/fibonaccirpcclient.cpp | 72 +++++++++++++++++++ tutorials/rpc/rpc_client/fibonaccirpcclient.h | 36 ++++++++++ tutorials/rpc/rpc_client/main.cpp | 10 ++- tutorials/rpc/rpc_client/rpc_client.pro | 6 +- tutorials/rpc/rpc_server/main.cpp | 7 +- tutorials/rpc/rpc_server/rpc_server.pro | 6 +- tutorials/rpc/rpc_server/server.cpp | 71 ++++++++++++++++++ tutorials/rpc/rpc_server/server.h | 33 +++++++++ 8 files changed, 233 insertions(+), 8 deletions(-) create mode 100644 tutorials/rpc/rpc_client/fibonaccirpcclient.cpp create mode 100644 tutorials/rpc/rpc_client/fibonaccirpcclient.h create mode 100644 tutorials/rpc/rpc_server/server.cpp create mode 100644 tutorials/rpc/rpc_server/server.h diff --git a/tutorials/rpc/rpc_client/fibonaccirpcclient.cpp b/tutorials/rpc/rpc_client/fibonaccirpcclient.cpp new file mode 100644 index 0000000..a2dd07b --- /dev/null +++ b/tutorials/rpc/rpc_client/fibonaccirpcclient.cpp @@ -0,0 +1,72 @@ +#include +#include +#include + +#include "qamqpclient.h" +#include "qamqpexchange.h" +#include "qamqpqueue.h" + +#include "fibonaccirpcclient.h" + +FibonacciRpcClient::FibonacciRpcClient(QObject *parent) + : QObject(parent), + m_client(0), + m_responseQueue(0), + m_defaultExchange(0) +{ + m_client = new QAmqpClient(this); + connect(m_client, SIGNAL(connected()), this, SLOT(clientConnected())); +} + +FibonacciRpcClient::~FibonacciRpcClient() +{ +} + +bool FibonacciRpcClient::connectToServer() +{ + QEventLoop loop; + connect(this, SIGNAL(connected()), &loop, SLOT(quit())); + m_client->connectToHost(); + loop.exec(); + + return m_client->isConnected(); +} + +void FibonacciRpcClient::call(int number) +{ + qDebug() << " [x] Requesting fib(" << number << ")"; + m_correlationId = QUuid::createUuid().toString(); + QAmqpMessage::PropertyHash properties; + properties.insert(QAmqpMessage::ReplyTo, m_responseQueue->name()); + properties.insert(QAmqpMessage::CorrelationId, m_correlationId); + + m_defaultExchange->publish(QByteArray::number(number), "rpc_queue", properties); +} + +void FibonacciRpcClient::clientConnected() +{ + m_responseQueue = m_client->createQueue(); + connect(m_responseQueue, SIGNAL(declared()), this, SLOT(queueDeclared())); + connect(m_responseQueue, SIGNAL(messageReceived()), this, SLOT(responseReceived())); + m_responseQueue->declare(QAmqpQueue::Exclusive | QAmqpQueue::AutoDelete); + m_defaultExchange = m_client->createExchange(); +} + +void FibonacciRpcClient::queueDeclared() +{ + m_responseQueue->consume(); + Q_EMIT connected(); +} + +void FibonacciRpcClient::responseReceived() +{ + QAmqpMessage message = m_responseQueue->dequeue(); + if (message.property(QAmqpMessage::CorrelationId).toString() != m_correlationId) { + // requeue message, it wasn't meant for us + m_responseQueue->reject(message, true); + return; + } + + qDebug() << " [.] Got " << message.payload(); + qApp->quit(); +} diff --git a/tutorials/rpc/rpc_client/fibonaccirpcclient.h b/tutorials/rpc/rpc_client/fibonaccirpcclient.h new file mode 100644 index 0000000..0b6511a --- /dev/null +++ b/tutorials/rpc/rpc_client/fibonaccirpcclient.h @@ -0,0 +1,36 @@ +#ifndef FIBONACCIRPCCLIENT_H +#define FIBONACCIRPCCLIENT_H + +#include + +class QAmqpQueue; +class QAmqpExchange; +class QAmqpClient; +class FibonacciRpcClient : public QObject +{ + Q_OBJECT +public: + explicit FibonacciRpcClient(QObject *parent = 0); + ~FibonacciRpcClient(); + +Q_SIGNALS: + void connected(); + +public Q_SLOTS: + bool connectToServer(); + void call(int number); + +private Q_SLOTS: + void clientConnected(); + void queueDeclared(); + void responseReceived(); + +private: + QAmqpClient *m_client; + QAmqpQueue *m_responseQueue; + QAmqpExchange *m_defaultExchange; + QString m_correlationId; + +}; + +#endif // FIBONACCIRPCCLIENT_H diff --git a/tutorials/rpc/rpc_client/main.cpp b/tutorials/rpc/rpc_client/main.cpp index 9c5f00c..de28735 100644 --- a/tutorials/rpc/rpc_client/main.cpp +++ b/tutorials/rpc/rpc_client/main.cpp @@ -1,9 +1,13 @@ #include -#include +#include "fibonaccirpcclient.h" int main(int argc, char **argv) { QCoreApplication app(argc, argv); - qDebug() << "testing"; - return EXIT_SUCCESS; + FibonacciRpcClient client; + if (!client.connectToServer()) + return EXIT_FAILURE; + + client.call(30); + return app.exec(); } diff --git a/tutorials/rpc/rpc_client/rpc_client.pro b/tutorials/rpc/rpc_client/rpc_client.pro index 6222eec..fe243b8 100644 --- a/tutorials/rpc/rpc_client/rpc_client.pro +++ b/tutorials/rpc/rpc_client/rpc_client.pro @@ -6,4 +6,8 @@ INCLUDEPATH += $${QAMQP_INCLUDEPATH} LIBS += -L$${DEPTH}/src $${QAMQP_LIBS} macx:CONFIG -= app_bundle -SOURCES += main.cpp +HEADERS += \ + fibonaccirpcclient.h +SOURCES += \ + fibonaccirpcclient.cpp \ + main.cpp diff --git a/tutorials/rpc/rpc_server/main.cpp b/tutorials/rpc/rpc_server/main.cpp index 9c5f00c..324e4ff 100644 --- a/tutorials/rpc/rpc_server/main.cpp +++ b/tutorials/rpc/rpc_server/main.cpp @@ -1,9 +1,10 @@ #include -#include +#include "server.h" int main(int argc, char **argv) { QCoreApplication app(argc, argv); - qDebug() << "testing"; - return EXIT_SUCCESS; + Server server; + server.listen(); + return app.exec(); } diff --git a/tutorials/rpc/rpc_server/rpc_server.pro b/tutorials/rpc/rpc_server/rpc_server.pro index 6222eec..7a164c3 100644 --- a/tutorials/rpc/rpc_server/rpc_server.pro +++ b/tutorials/rpc/rpc_server/rpc_server.pro @@ -6,4 +6,8 @@ INCLUDEPATH += $${QAMQP_INCLUDEPATH} LIBS += -L$${DEPTH}/src $${QAMQP_LIBS} macx:CONFIG -= app_bundle -SOURCES += main.cpp +HEADERS += \ + server.h +SOURCES += \ + server.cpp \ + main.cpp diff --git a/tutorials/rpc/rpc_server/server.cpp b/tutorials/rpc/rpc_server/server.cpp new file mode 100644 index 0000000..1c45161 --- /dev/null +++ b/tutorials/rpc/rpc_server/server.cpp @@ -0,0 +1,71 @@ +#include "qamqpclient.h" +#include "qamqpqueue.h" +#include "qamqpexchange.h" + +#include "server.h" + +Server::Server(QObject *parent) + : QObject(parent), + m_client(0), + m_rpcQueue(0), + m_defaultExchange(0) +{ + m_client = new QAmqpClient(this); + connect(m_client, SIGNAL(connected()), this, SLOT(clientConnected())); +} + +Server::~Server() +{ +} + +void Server::listen() +{ + m_client->connectToHost(); +} + +int Server::fib(int n) +{ + if (n == 0) + return 0; + + if (n == 1) + return 1; + + return fib(n - 1) + fib(n - 2); +} + +void Server::clientConnected() +{ + m_rpcQueue = m_client->createQueue("rpc_queue"); + connect(m_rpcQueue, SIGNAL(declared()), this, SLOT(queueDeclared())); + connect(m_rpcQueue, SIGNAL(qosDefined()), this, SLOT(qosDefined())); + connect(m_rpcQueue, SIGNAL(messageReceived()), this, SLOT(processRpcMessage())); + m_rpcQueue->declare(); + + m_defaultExchange = m_client->createExchange(); +} + +void Server::queueDeclared() +{ + m_rpcQueue->qos(1); +} + +void Server::qosDefined() +{ + m_rpcQueue->consume(); + qDebug() << " [x] Awaiting RPC requests"; +} + +void Server::processRpcMessage() +{ + QAmqpMessage rpcMessage = m_rpcQueue->dequeue(); + int n = rpcMessage.payload().toInt(); + + int response = fib(n); + m_rpcQueue->ack(rpcMessage); + + QString replyTo = rpcMessage.property(QAmqpMessage::ReplyTo).toString(); + QAmqpMessage::PropertyHash properties; + properties.insert(QAmqpMessage::CorrelationId, rpcMessage.property(QAmqpMessage::CorrelationId)); + m_defaultExchange->publish(QByteArray::number(response), replyTo, properties); +} diff --git a/tutorials/rpc/rpc_server/server.h b/tutorials/rpc/rpc_server/server.h new file mode 100644 index 0000000..d7fa221 --- /dev/null +++ b/tutorials/rpc/rpc_server/server.h @@ -0,0 +1,33 @@ +#ifndef SERVER_H +#define SERVER_H + +#include + +class QAmqpQueue; +class QAmqpExchange; +class QAmqpClient; +class Server : public QObject +{ + Q_OBJECT +public: + explicit Server(QObject *parent = 0); + ~Server(); + +public Q_SLOTS: + void listen(); + int fib(int n); + +private Q_SLOTS: + void clientConnected(); + void queueDeclared(); + void qosDefined(); + void processRpcMessage(); + +private: + QAmqpClient *m_client; + QAmqpQueue *m_rpcQueue; + QAmqpExchange *m_defaultExchange; + +}; + +#endif // SERVER_H