From d4b1824b465538cacac9ba7c1e2ce7bc27d7cf68 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Tue, 3 Jun 2014 16:11:30 -0400 Subject: [PATCH] merged Client Connection and Network classes, fixed a bug allowing for proper connecting and disconnecting from a broker --- src/amqp_channel.cpp | 6 +- src/amqp_client.cpp | 441 ++++++++++++++++++++++++++++++++++------ src/amqp_client.h | 12 +- src/amqp_client_p.h | 69 ++++++- src/amqp_connection.cpp | 347 ------------------------------- src/amqp_connection_p.h | 67 ------ src/amqp_network.cpp | 301 --------------------------- src/amqp_network_p.h | 65 ------ src/src.pro | 4 - 9 files changed, 450 insertions(+), 862 deletions(-) delete mode 100644 src/amqp_connection.cpp delete mode 100644 src/amqp_connection_p.h delete mode 100644 src/amqp_network.cpp delete mode 100644 src/amqp_network_p.h diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 01134b4..1fc8c15 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -2,8 +2,6 @@ #include "amqp_channel_p.h" #include "amqp_client.h" #include "amqp_client_p.h" -#include "amqp_connection_p.h" -#include "amqp_network_p.h" #include #include @@ -89,7 +87,7 @@ void ChannelPrivate::_q_open() void ChannelPrivate::sendFrame(const Frame::Base &frame) { if (client) - client->d_func()->network_->sendFrame(frame); + client->d_func()->sendFrame(frame); } void ChannelPrivate::open() @@ -97,7 +95,7 @@ void ChannelPrivate::open() if (!needOpen || opened) return; - if (!client->d_func()->connection_->isConnected()) + if (!client->isConnected()) return; qDebug("Open channel #%d", number); diff --git a/src/amqp_client.cpp b/src/amqp_client.cpp index b8da66b..abbe0d7 100644 --- a/src/amqp_client.cpp +++ b/src/amqp_client.cpp @@ -3,13 +3,14 @@ #include "amqp_global.h" #include "amqp_exchange.h" #include "amqp_exchange_p.h" -#include "amqp_network_p.h" #include "amqp_queue.h" #include "amqp_queue_p.h" -#include "amqp_connection_p.h" #include "amqp_authenticator.h" +#include +#include #include +#include using namespace QAMQP; @@ -17,6 +18,9 @@ ClientPrivate::ClientPrivate(Client *q) : port(AMQPPORT), host(QString::fromLatin1(AMQPHOST)), virtualHost(QString::fromLatin1(AMQPVHOST)), + socket(0), + closed(false), + connected(false), q_ptr(q) { } @@ -28,21 +32,17 @@ ClientPrivate::~ClientPrivate() void ClientPrivate::init(const QUrl &connectionString) { Q_Q(Client); - if (!network_) { - network_ = new Network(q); - QObject::connect(network_.data(), SIGNAL(connected()), q, SIGNAL(connected())); - QObject::connect(network_.data(), SIGNAL(disconnected()), q, SIGNAL(disconnected())); - } + socket = new QTcpSocket(q); + QObject::connect(socket, SIGNAL(connected()), q, SLOT(_q_socketConnected())); + QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(_q_readyRead())); + QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), + q, SLOT(_q_socketError(QAbstractSocket::SocketError))); - if (!connection_) - connection_ = new Connection(network_, q); - network_->setMethodHandlerConnection(connection_); + heartbeatTimer = new QTimer(q); + QObject::connect(heartbeatTimer, SIGNAL(timeout()), q, SLOT(_q_heartbeat())); - auth_ = QSharedPointer( - new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD))); - - QObject::connect(connection_, SIGNAL(connected()), q, SIGNAL(connected())); - QObject::connect(connection_, SIGNAL(disconnected()), q, SIGNAL(disconnected())); + authenticator = QSharedPointer( + new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD))); if (connectionString.isValid()) { parseConnectionString(connectionString); @@ -50,13 +50,6 @@ void ClientPrivate::init(const QUrl &connectionString) } } -void ClientPrivate::connect() -{ - if (network_->state() != QAbstractSocket::UnconnectedState) - disconnect(); - network_->connectTo(host, port); -} - void ClientPrivate::parseConnectionString(const QUrl &connectionString) { Q_Q(Client); @@ -66,7 +59,6 @@ void ClientPrivate::parseConnectionString(const QUrl &connectionString) return; } - q->setSsl(connectionString.scheme() == AMQPSSCHEME); q->setPassword(connectionString.password()); q->setUser(connectionString.userName()); q->setPort(connectionString.port(AMQPPORT)); @@ -74,21 +66,354 @@ void ClientPrivate::parseConnectionString(const QUrl &connectionString) q->setVirtualHost(connectionString.path()); } +void ClientPrivate::connect() +{ + if (socket->state() != QAbstractSocket::UnconnectedState) { + qDebug() << Q_FUNC_INFO << "socket already connected, disconnecting.."; + disconnect(); + } + + socket->connectToHost(host, port); +} + void ClientPrivate::disconnect() { - if (network_->state() == QAbstractSocket::UnconnectedState) { + if (socket->state() == QAbstractSocket::UnconnectedState) { qDebug() << Q_FUNC_INFO << "already disconnected"; return; } - connection_->close(); - network_->disconnect(); + close(200, "client.disconnect"); // NOTE: this should be handled by signals, no need for dptr // access here. // connection_->d_func()->connected = false; } +// private slots +void ClientPrivate::_q_socketConnected() +{ + timeout = 0; + char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1}; + socket->write(header, 8); +} + +void ClientPrivate::_q_heartbeat() +{ + Frame::Heartbeat frame; + sendFrame(frame); +} + +void ClientPrivate::_q_socketError(QAbstractSocket::SocketError error) +{ + if (timeout == 0) { + timeout = 1000; + } else { + if (timeout < 120000) + timeout *= 5; + } + + switch (error) { + case QAbstractSocket::ConnectionRefusedError: + case QAbstractSocket::RemoteHostClosedError: + case QAbstractSocket::SocketTimeoutError: + case QAbstractSocket::NetworkError: + case QAbstractSocket::ProxyConnectionClosedError: + case QAbstractSocket::ProxyConnectionRefusedError: + case QAbstractSocket::ProxyConnectionTimeoutError: + + default: + qWarning() << "AMQP: Socket Error: " << socket->errorString(); + break; + } + +// if (autoReconnect && connect) +// QTimer::singleShot(timeout, this, SLOT(connectTo())); +} + +void ClientPrivate::_q_readyRead() +{ + while (socket->bytesAvailable() >= Frame::HEADER_SIZE) { + char *headerData = buffer.data(); + socket->peek(headerData, Frame::HEADER_SIZE); + const quint32 payloadSize = qFromBigEndian(*(quint32*)&headerData[3]); + const qint64 readSize = Frame::HEADER_SIZE + payloadSize + Frame::FRAME_END_SIZE; + + if (socket->bytesAvailable() >= readSize) { + buffer.resize(readSize); + socket->read(buffer.data(), readSize); + const char *bufferData = buffer.constData(); + const quint8 type = *(quint8*)&bufferData[0]; + const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize]; + if (magic != Frame::FRAME_END) + qWarning() << "Wrong end frame"; + + QDataStream streamB(&buffer, QIODevice::ReadOnly); + switch(Frame::Type(type)) { + case Frame::ftMethod: + { + Frame::Method frame(streamB); + if (frame.methodClass() == Frame::fcConnection) { + _q_method(frame); + } else { + foreach (Frame::MethodHandler *methodHandler, methodHandlersByChannel[frame.channel()]) + methodHandler->_q_method(frame); + } + } + break; + case Frame::ftHeader: + { + Frame::Content frame(streamB); + foreach (Frame::ContentHandler *methodHandler, contentHandlerByChannel[frame.channel()]) + methodHandler->_q_content(frame); + } + break; + case Frame::ftBody: + { + Frame::ContentBody frame(streamB); + foreach (Frame::ContentBodyHandler *methodHandler, bodyHandlersByChannel[frame.channel()]) + methodHandler->_q_body(frame); + } + break; + case Frame::ftHeartbeat: + qDebug("AMQP: Heartbeat"); + break; + default: + qWarning() << "AMQP: Unknown frame type: " << type; + } + } else { + break; + } + } +} + +void ClientPrivate::sendFrame(const Frame::Base &frame) +{ + if (socket->state() != QAbstractSocket::ConnectedState) { + qDebug() << Q_FUNC_INFO << "socket not connected: " << socket->state(); + return; + } + + QDataStream stream(socket); + frame.toStream(stream); +} + +void ClientPrivate::_q_method(const Frame::Method &frame) +{ + Q_ASSERT(frame.methodClass() == Frame::fcConnection); + if (frame.methodClass() != Frame::fcConnection) + return; + + qDebug() << "Connection:"; + if (closed) { + if (frame.id() == ClientPrivate::miCloseOk) + closeOk(frame); + return; + } + + switch (ClientPrivate::MethodId(frame.id())) { + case ClientPrivate::miStart: + start(frame); + break; + case ClientPrivate::miSecure: + secure(frame); + break; + case ClientPrivate::miTune: + tune(frame); + break; + case ClientPrivate::miOpenOk: + openOk(frame); + break; + case ClientPrivate::miClose: + close(frame); + break; + case ClientPrivate::miCloseOk: + closeOk(frame); + break; + default: + qWarning("Unknown method-id %d", frame.id()); + } +} + +void ClientPrivate::start(const Frame::Method &frame) +{ + qDebug(">> Start"); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + quint8 version_major = 0; + quint8 version_minor = 0; + + stream >> version_major >> version_minor; + + Frame::TableField table; + Frame::deserialize(stream, table); + + QString mechanisms = Frame::readField('S', stream).toString(); + QString locales = Frame::readField('S', stream).toString(); + + qDebug(">> version_major: %d", version_major); + qDebug(">> version_minor: %d", version_minor); + + Frame::print(table); + + qDebug(">> mechanisms: %s", qPrintable(mechanisms)); + qDebug(">> locales: %s", qPrintable(locales)); + + startOk(); +} + +void ClientPrivate::secure(const Frame::Method &frame) +{ + Q_UNUSED(frame) + qDebug() << Q_FUNC_INFO << "called!"; +} + +void ClientPrivate::tune(const Frame::Method &frame) +{ + qDebug(">> Tune"); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + + qint16 channel_max = 0, + heartbeat = 0; + qint32 frame_max = 0; + + stream >> channel_max; + stream >> frame_max; + stream >> heartbeat; + + qDebug(">> channel_max: %d", channel_max); + qDebug(">> frame_max: %d", frame_max); + qDebug(">> heartbeat: %d", heartbeat); + + if (heartbeatTimer) { + heartbeatTimer->setInterval(heartbeat * 1000); + if (heartbeatTimer->interval()) + heartbeatTimer->start(); + else + heartbeatTimer->stop(); + } + + tuneOk(); + open(); +} + +void ClientPrivate::openOk(const Frame::Method &frame) +{ + Q_Q(Client); + Q_UNUSED(frame) + qDebug(">> OpenOK"); + connected = true; + Q_EMIT q->connected(); +} + +void ClientPrivate::closeOk(const Frame::Method &frame) +{ + Q_Q(Client); + Q_UNUSED(frame) + qDebug() << Q_FUNC_INFO << "received"; + connected = false; + if (heartbeatTimer) + heartbeatTimer->stop(); + Q_EMIT q->disconnected(); +} + +void ClientPrivate::close(const Frame::Method &frame) +{ + Q_Q(Client); + qDebug(">> CLOSE"); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + qint16 code = 0, classId, methodId; + stream >> code; + QString text(Frame::readField('s', stream).toString()); + stream >> classId; + stream >> methodId; + + qDebug(">> code: %d", code); + qDebug(">> text: %s", qPrintable(text)); + qDebug(">> class-id: %d", classId); + qDebug(">> method-id: %d", methodId); + connected = false; + Q_EMIT q->disconnected(); +} + +void ClientPrivate::startOk() +{ + Frame::Method frame(Frame::fcConnection, ClientPrivate::miStartOk); + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + Frame::TableField clientProperties; + clientProperties["version"] = QString(QAMQP_VERSION); + clientProperties["platform"] = QString("Qt %1").arg(qVersion()); + clientProperties["product"] = QString("QAMQP"); + clientProperties.unite(customProperty); + Frame::serialize(stream, clientProperties); + + authenticator->write(stream); + Frame::writeField('s', stream, "en_US"); + + frame.setArguments(arguments); + sendFrame(frame); +} + +void ClientPrivate::secureOk() +{ + qDebug() << Q_FUNC_INFO; +} + +void ClientPrivate::tuneOk() +{ + Frame::Method frame(Frame::fcConnection, ClientPrivate::miTuneOk); + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + stream << qint16(0); //channel_max + stream << qint32(FRAME_MAX); //frame_max + stream << qint16(heartbeatTimer->interval() / 1000); //heartbeat + + frame.setArguments(arguments); + sendFrame(frame); +} + +void ClientPrivate::open() +{ + Frame::Method frame(Frame::fcConnection, ClientPrivate::miOpen); + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + Frame::writeField('s',stream, virtualHost); + + stream << qint8(0); + stream << qint8(0); + + frame.setArguments(arguments); + sendFrame(frame); +} + +void ClientPrivate::close(int code, const QString &text, int classId, int methodId) +{ + Frame::Method frame(Frame::fcConnection, ClientPrivate::miClose); + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + stream << qint16(code); + Frame::writeField('s', stream, text); + stream << qint16(classId); + stream << qint16(methodId); + + frame.setArguments(arguments); + sendFrame(frame); +} + +void ClientPrivate::closeOk() +{ + Frame::Method frame(Frame::fcConnection, ClientPrivate::miCloseOk); + connected = false; + sendFrame(frame); +} + ////////////////////////////////////////////////////////////////////////// Client::Client(QObject *parent) @@ -109,6 +434,15 @@ Client::Client(const QUrl &connectionString, QObject *parent) Client::~Client() { + Q_D(Client); + if (d->connected) + d->disconnect(); +} + +bool Client::isConnected() const +{ + Q_D(const Client); + return d->connected; } quint16 Client::port() const @@ -150,7 +484,7 @@ void Client::setVirtualHost(const QString &virtualHost) QString Client::user() const { Q_D(const Client); - const Authenticator *auth = d->auth_.data(); + const Authenticator *auth = d->authenticator.data(); if (auth && auth->type() == QLatin1String("AMQPLAIN")) { const AMQPlainAuthenticator *a = static_cast(auth); return a->login(); @@ -162,7 +496,7 @@ QString Client::user() const void Client::setUser(const QString &user) { Q_D(const Client); - Authenticator *auth = d->auth_.data(); + Authenticator *auth = d->authenticator.data(); if (auth && auth->type() == QLatin1String("AMQPLAIN")) { AMQPlainAuthenticator *a = static_cast(auth); a->setLogin(user); @@ -172,7 +506,7 @@ void Client::setUser(const QString &user) QString Client::password() const { Q_D(const Client); - const Authenticator *auth = d->auth_.data(); + const Authenticator *auth = d->authenticator.data(); if (auth && auth->type() == "AMQPLAIN") { const AMQPlainAuthenticator *a = static_cast(auth); return a->password(); @@ -184,7 +518,7 @@ QString Client::password() const void Client::setPassword(const QString &password) { Q_D(Client); - Authenticator *auth = d->auth_.data(); + Authenticator *auth = d->authenticator.data(); if (auth && auth->type() == QLatin1String("AMQPLAIN")) { AMQPlainAuthenticator *a = static_cast(auth); a->setPassword(password); @@ -200,8 +534,8 @@ Exchange *Client::createExchange(const QString &name, int channelNumber) { Q_D(Client); Exchange *exchange = new Exchange(channelNumber, this); - d->network_->addMethodHandlerForChannel(exchange->channelNumber(), exchange); - connect(d->connection_, SIGNAL(connected()), exchange, SLOT(_q_open())); + d->methodHandlersByChannel[exchange->channelNumber()].append(exchange); + connect(this, SIGNAL(connected()), exchange, SLOT(_q_open())); exchange->d_func()->open(); connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected())); if (!name.isEmpty()) @@ -218,11 +552,10 @@ Queue *Client::createQueue(const QString &name, int channelNumber) { Q_D(Client); Queue *queue = new Queue(channelNumber, this); - d->network_->addMethodHandlerForChannel(queue->channelNumber(), queue); - d->network_->addContentHandlerForChannel(queue->channelNumber(), queue); - d->network_->addContentBodyHandlerForChannel(queue->channelNumber(), queue); - - connect(d->connection_, SIGNAL(connected()), queue, SLOT(_q_open())); + d->methodHandlersByChannel[queue->channelNumber()].append(queue); + d->contentHandlerByChannel[queue->channelNumber()].append(queue); + d->bodyHandlersByChannel[queue->channelNumber()].append(queue); + connect(this, SIGNAL(connected()), queue, SLOT(_q_open())); queue->d_func()->open(); connect(this, SIGNAL(disconnected()), queue, SLOT(_q_disconnected())); @@ -231,58 +564,40 @@ Queue *Client::createQueue(const QString &name, int channelNumber) return queue; } -void Client::setAuth(Authenticator *auth) +void Client::setAuth(Authenticator *authenticator) { Q_D(Client); - d->auth_ = QSharedPointer(auth); + d->authenticator = QSharedPointer(authenticator); } Authenticator *Client::auth() const { Q_D(const Client); - return d->auth_.data(); -} - -bool Client::isSsl() const -{ - Q_D(const Client); - return d->network_->isSsl(); -} - -void Client::setSsl(bool value) -{ - Q_D(Client); - d->network_->setSsl(value); + return d->authenticator.data(); } bool Client::autoReconnect() const { Q_D(const Client); - return d->network_->autoReconnect(); + return d->autoReconnect; } void Client::setAutoReconnect(bool value) { Q_D(Client); - d->network_->setAutoReconnect(value); -} - -bool Client::isConnected() const -{ - Q_D(const Client); - return d->connection_->isConnected(); + d->autoReconnect = value; } void Client::addCustomProperty(const QString &name, const QString &value) { Q_D(Client); - return d->connection_->addCustomProperty(name, value); + d->customProperty.insert(name, value); } QString Client::customProperty(const QString &name) const { Q_D(const Client); - return d->connection_->customProperty(name); + return d->customProperty.value(name).toString(); } void Client::connectToHost(const QString &connectionString) @@ -310,3 +625,5 @@ void Client::disconnectFromHost() Q_D(Client); d->disconnect(); } + +#include "moc_amqp_client.cpp" diff --git a/src/amqp_client.h b/src/amqp_client.h index 4dc75d1..b95b60a 100644 --- a/src/amqp_client.h +++ b/src/amqp_client.h @@ -22,9 +22,7 @@ class QAMQP_EXPORT Client : public QObject Q_PROPERTY(QString virtualHost READ virtualHost WRITE setVirtualHost) Q_PROPERTY(QString user READ user WRITE setUser) Q_PROPERTY(QString password READ password WRITE setPassword) - Q_PROPERTY(bool ssl READ isSsl WRITE setSsl) Q_PROPERTY(bool autoReconnect READ autoReconnect WRITE setAutoReconnect) - Q_PROPERTY(bool connected READ isConnected) public: Client(QObject *parent = 0); @@ -58,16 +56,13 @@ public: void setAuth(Authenticator *auth); Authenticator *auth() const; - bool isSsl() const; - void setSsl(bool value); - bool autoReconnect() const; void setAutoReconnect(bool value); bool isConnected() const; void connectToHost(const QString &connectionString = QString()); - void connectToHost(const QHostAddress &address, quint16 port); + void connectToHost(const QHostAddress &address, quint16 port = AMQPPORT); void disconnectFromHost(); Q_SIGNALS: @@ -79,6 +74,11 @@ private: Q_DECLARE_PRIVATE(Client) QScopedPointer d_ptr; + Q_PRIVATE_SLOT(d_func(), void _q_socketConnected()) + Q_PRIVATE_SLOT(d_func(), void _q_readyRead()) + Q_PRIVATE_SLOT(d_func(), void _q_socketError(QAbstractSocket::SocketError error)) + Q_PRIVATE_SLOT(d_func(), void _q_heartbeat()) + friend class ChannelPrivate; }; diff --git a/src/amqp_client_p.h b/src/amqp_client_p.h index 2727474..cbe48ec 100644 --- a/src/amqp_client_p.h +++ b/src/amqp_client_p.h @@ -1,37 +1,94 @@ #ifndef amqp_client_p_h__ #define amqp_client_p_h__ +#include #include #include +#include +#include "amqp_frame.h" + +#define METHOD_ID_ENUM(name, id) name = id, name ## Ok + +class QTimer; +class QTcpSocket; namespace QAMQP { +class Client; class Queue; class Exchange; class Network; class Connection; class Authenticator; -class ClientPrivate +class ClientPrivate : public Frame::MethodHandler { public: + enum MethodId { + METHOD_ID_ENUM(miStart, 10), + METHOD_ID_ENUM(miSecure, 20), + METHOD_ID_ENUM(miTune, 30), + METHOD_ID_ENUM(miOpen, 40), + METHOD_ID_ENUM(miClose, 50) + }; + ClientPrivate(Client *q); - ~ClientPrivate(); + virtual ~ClientPrivate(); void init(const QUrl &connectionString = QUrl()); void connect(); void disconnect(); void parseConnectionString(const QUrl &connectionString); + void sendFrame(const Frame::Base &frame); + + // private slots + void _q_socketConnected(); + void _q_readyRead(); + void _q_socketError(QAbstractSocket::SocketError error); + void _q_heartbeat(); + + virtual void _q_method(const Frame::Method &frame); + + // method handlers, FROM server + void start(const Frame::Method &frame); + void secure(const Frame::Method &frame); + void tune(const Frame::Method &frame); + void openOk(const Frame::Method &frame); + void closeOk(const Frame::Method &frame); + + // method handlers, TO server + void startOk(); + void secureOk(); + void tuneOk(); + void open(); + + // method handlers, BOTH ways + void close(int code, const QString &text, int classId = 0, int methodId = 0); + void close(const Frame::Method &frame); + void closeOk(); quint32 port; QString host; QString virtualHost; - QPointer network_; - QPointer connection_; - QSharedPointer auth_; + QSharedPointer authenticator; - bool isSSl() const; + // Network + QByteArray buffer; + bool autoReconnect; + int timeout; + bool connecting; + QTcpSocket *socket; + + QHash > methodHandlersByChannel; + QHash > contentHandlerByChannel; + QHash > bodyHandlersByChannel; + + // Connection + bool closed; + bool connected; + QPointer heartbeatTimer; + Frame::TableField customProperty; Client * const q_ptr; Q_DECLARE_PUBLIC(Client) diff --git a/src/amqp_connection.cpp b/src/amqp_connection.cpp deleted file mode 100644 index 1457ebe..0000000 --- a/src/amqp_connection.cpp +++ /dev/null @@ -1,347 +0,0 @@ -#include "amqp_connection_p.h" -#include "amqp_client.h" -#include "amqp_client_p.h" -#include "amqp_frame.h" -#include "amqp_global.h" -#include "amqp_network_p.h" -#include "amqp_authenticator.h" - -#include -#include -#include - -#define METHOD_ID_ENUM(name, id) name = id, name ## Ok - -namespace QAMQP { - -class ConnectionPrivate -{ -public: - enum MethodId { - METHOD_ID_ENUM(miStart, 10), - METHOD_ID_ENUM(miSecure, 20), - METHOD_ID_ENUM(miTune, 30), - METHOD_ID_ENUM(miOpen, 40), - METHOD_ID_ENUM(miClose, 50) - }; - - ConnectionPrivate(Connection *q); - - // private slots - void _q_heartbeat(); - - QPointer client; - QPointer network; - bool closed; - bool connected; - QPointer heartbeatTimer; - Frame::TableField customProperty; - - Q_DECLARE_PUBLIC(Connection) - Connection * const q_ptr; -}; - -ConnectionPrivate::ConnectionPrivate(Connection *q) - : closed(false), - connected(false), - q_ptr(q) -{ -} - -void ConnectionPrivate::_q_heartbeat() -{ - Frame::Heartbeat frame; - network->sendFrame(frame); -} - -} // namespace QAMQP - -////////////////////////////////////////////////////////////////////////// - -using namespace QAMQP; -Connection::Connection(Network *network, Client *client) - : QObject(client), - d_ptr(new ConnectionPrivate(this)) -{ - Q_D(Connection); - d->client = client; - d->network = network; - d->heartbeatTimer = new QTimer(this); - connect(d->heartbeatTimer, SIGNAL(timeout()), this, SLOT(_q_heartbeat())); -} - -Connection::~Connection() -{ -} - -void Connection::start(const Frame::Method &frame) -{ - qDebug(">> Start"); - QByteArray data = frame.arguments(); - QDataStream stream(&data, QIODevice::ReadOnly); - quint8 version_major = 0; - quint8 version_minor = 0; - - stream >> version_major >> version_minor; - - Frame::TableField table; - Frame::deserialize(stream, table); - - QString mechanisms = Frame::readField('S', stream).toString(); - QString locales = Frame::readField('S', stream).toString(); - - qDebug(">> version_major: %d", version_major); - qDebug(">> version_minor: %d", version_minor); - - Frame::print(table); - - qDebug(">> mechanisms: %s", qPrintable(mechanisms)); - qDebug(">> locales: %s", qPrintable(locales)); - - startOk(); -} - -void Connection::secure(const Frame::Method &frame) -{ - Q_UNUSED(frame) - qDebug() << Q_FUNC_INFO << "called!"; -} - -void Connection::tune(const Frame::Method &frame) -{ - Q_D(Connection); - qDebug(">> Tune"); - QByteArray data = frame.arguments(); - QDataStream stream(&data, QIODevice::ReadOnly); - - qint16 channel_max = 0, - heartbeat = 0; - qint32 frame_max = 0; - - stream >> channel_max; - stream >> frame_max; - stream >> heartbeat; - - qDebug(">> channel_max: %d", channel_max); - qDebug(">> frame_max: %d", frame_max); - qDebug(">> heartbeat: %d", heartbeat); - - if (d->heartbeatTimer) { - d->heartbeatTimer->setInterval(heartbeat * 1000); - if (d->heartbeatTimer->interval()) - d->heartbeatTimer->start(); - else - d->heartbeatTimer->stop(); - } - - tuneOk(); - open(); -} - -void Connection::openOk(const Frame::Method &frame) -{ - Q_UNUSED(frame) - Q_D(Connection); - qDebug(">> OpenOK"); - d->connected = true; - Q_EMIT connected(); -} - -void Connection::closeOk(const Frame::Method &frame) -{ - Q_UNUSED(frame) - Q_D(Connection); - - qDebug() << Q_FUNC_INFO << "received"; - - d->connected = false; - if (d->heartbeatTimer) - d->heartbeatTimer->stop(); - Q_EMIT disconnected(); -} - -void Connection::close(const Frame::Method &frame) -{ - Q_D(Connection); - qDebug(">> CLOSE"); - QByteArray data = frame.arguments(); - QDataStream stream(&data, QIODevice::ReadOnly); - qint16 code_ = 0, classId, methodId; - stream >> code_; - QString text(Frame::readField('s', stream).toString()); - stream >> classId; - stream >> methodId; - - qDebug(">> code: %d", code_); - qDebug(">> text: %s", qPrintable(text)); - qDebug(">> class-id: %d", classId); - qDebug(">> method-id: %d", methodId); - d->connected = false; - d->network->error(QAbstractSocket::RemoteHostClosedError); - Q_EMIT disconnected(); -} - -void Connection::startOk() -{ - Q_D(Connection); - Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miStartOk); - QByteArray arguments; - QDataStream stream(&arguments, QIODevice::WriteOnly); - - Frame::TableField clientProperties; - clientProperties["version"] = QString(QAMQP_VERSION); - clientProperties["platform"] = QString("Qt %1").arg(qVersion()); - clientProperties["product"] = QString("QAMQP"); - clientProperties.unite(d->customProperty); - Frame::serialize(stream, clientProperties); - - d->client->auth()->write(stream); - Frame::writeField('s', stream, "en_US"); - - frame.setArguments(arguments); - d->network->sendFrame(frame); -} - -void Connection::secureOk() -{ - qDebug() << Q_FUNC_INFO; -} - -void Connection::tuneOk() -{ - Q_D(Connection); - Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miTuneOk); - QByteArray arguments; - QDataStream stream(&arguments, QIODevice::WriteOnly); - - stream << qint16(0); //channel_max - stream << qint32(FRAME_MAX); //frame_max - stream << qint16(d->heartbeatTimer->interval() / 1000); //heartbeat - - frame.setArguments(arguments); - d->network->sendFrame(frame); -} - -void Connection::open() -{ - Q_D(Connection); - Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miOpen); - QByteArray arguments; - QDataStream stream(&arguments, QIODevice::WriteOnly); - - Frame::writeField('s',stream, d->client->virtualHost()); - - stream << qint8(0); - stream << qint8(0); - - frame.setArguments(arguments); - d->network->sendFrame(frame); -} - -void Connection::close(int code, const QString &text, int classId, int methodId) -{ - Q_D(Connection); - Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miClose); - QByteArray arguments; - QDataStream stream(&arguments, QIODevice::WriteOnly); - - Frame::writeField('s',stream, d->client->virtualHost()); - - stream << qint16(code); - Frame::writeField('s', stream, text); - stream << qint16(classId); - stream << qint16(methodId); - - frame.setArguments(arguments); - d->network->sendFrame(frame); -} - -void Connection::closeOk() -{ - Q_D(Connection); - Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miCloseOk); - d->connected = false; - d->network->sendFrame(frame); -} - -void Connection::_q_method(const Frame::Method &frame) -{ - Q_D(Connection); - Q_ASSERT(frame.methodClass() == Frame::fcConnection); - if (frame.methodClass() != Frame::fcConnection) - return; - - qDebug() << "Connection:"; - - if (d->closed) { - if (frame.id() == ConnectionPrivate::miCloseOk) - closeOk(frame); - return; - } - - switch (ConnectionPrivate::MethodId(frame.id())) { - case ConnectionPrivate::miStart: - start(frame); - break; - case ConnectionPrivate::miSecure: - secure(frame); - break; - case ConnectionPrivate::miTune: - tune(frame); - break; - case ConnectionPrivate::miOpenOk: - openOk(frame); - break; - case ConnectionPrivate::miClose: - close(frame); - break; - case ConnectionPrivate::miCloseOk: - closeOk(frame); - break; - default: - qWarning("Unknown method-id %d", frame.id()); - } -} - -bool Connection::isConnected() const -{ - Q_D(const Connection); - return d->connected; -} - -void Connection::setQOS(qint32 prefetchSize, quint16 prefetchCount) -{ - Q_D(Connection); - - // NOTE: these were hardcoded values, could be bad - int channel = 0; - bool global = true; - - Frame::Method frame(Frame::fcBasic, 10); - frame.setChannel(channel); - QByteArray arguments; - QDataStream out(&arguments, QIODevice::WriteOnly); - - out << prefetchSize; - out << prefetchCount; - out << qint8(global ? 1 : 0); - - frame.setArguments(arguments); - d->network->sendFrame(frame); -} - -void Connection::addCustomProperty(const QString &name, const QString &value) -{ - Q_D(Connection); - d->customProperty[name] = value; -} - -QString Connection::customProperty(const QString &name) const -{ - Q_D(const Connection); - if (d->customProperty.contains(name)) - return d->customProperty.value(name).toString(); - return QString(); -} - -#include "moc_amqp_connection_p.cpp" diff --git a/src/amqp_connection_p.h b/src/amqp_connection_p.h deleted file mode 100644 index 145bbcf..0000000 --- a/src/amqp_connection_p.h +++ /dev/null @@ -1,67 +0,0 @@ -#ifndef amqp_connection_p_h__ -#define amqp_connection_p_h__ - -#include -#include "amqp_frame.h" - -namespace QAMQP -{ - -class Client; -class Network; -class ClientPrivate; -class ConnectionPrivate; -class QAMQP_EXPORT Connection : public QObject, public Frame::MethodHandler -{ - Q_OBJECT - Q_PROPERTY(bool connected READ isConnected CONSTANT) - -public: - virtual ~Connection(); - - void addCustomProperty(const QString &name, const QString &value); - QString customProperty(const QString &name) const; - - bool isConnected() const; - void setQOS(qint32 prefetchSize, quint16 prefetchCount); - - // method handlers, FROM server - void start(const Frame::Method &frame); - void secure(const Frame::Method &frame); - void tune(const Frame::Method &frame); - void openOk(const Frame::Method &frame); - void closeOk(const Frame::Method &frame); - - // method handlers, TO server - void startOk(); - void secureOk(); - void tuneOk(); - void open(); - - // method handlers, BOTH ways - void close(int code, const QString &text, int classId = 0, int methodId = 0); - void close(const Frame::Method &frame); - void closeOk(); - -Q_SIGNALS: - void disconnected(); - void connected(); - -private: - explicit Connection(Network *network, Client *parent); - - Q_DISABLE_COPY(Connection) - Q_DECLARE_PRIVATE(Connection) - QScopedPointer d_ptr; - - Q_PRIVATE_SLOT(d_func(), void _q_heartbeat()) - friend class ClientPrivate; - - // should be moved to private - void openOk(); - void _q_method(const Frame::Method &frame); -}; - -} // namespace QAMQP - -#endif // amqp_connection_h__ diff --git a/src/amqp_network.cpp b/src/amqp_network.cpp deleted file mode 100644 index 68f8b07..0000000 --- a/src/amqp_network.cpp +++ /dev/null @@ -1,301 +0,0 @@ -#include "amqp_network_p.h" - -#include -#include -#include - -namespace QAMQP { - -class NetworkPrivate { -public: - NetworkPrivate(Network *qq); - void initSocket(bool ssl = false); - - static int s_frameMethodMetaType; - - QPointer socket; - QByteArray buffer; - QString lastHost; - int lastPort; - bool autoReconnect; - int timeOut; - bool connect; - - Frame::MethodHandler *connectionMethodHandler; - QHash > methodHandlersByChannel; - QHash > contentHandlerByChannel; - QHash > bodyHandlersByChannel; - - Q_DECLARE_PUBLIC(Network) - Network * const q_ptr; -}; - -int NetworkPrivate::s_frameMethodMetaType = qRegisterMetaType("QAMQP::Frame::Method"); -NetworkPrivate::NetworkPrivate(Network *qq) - : lastPort(0), - autoReconnect(false), - timeOut(1000), - connect(false), - q_ptr(qq) -{ - buffer.reserve(Frame::HEADER_SIZE); -} - -void NetworkPrivate::initSocket(bool ssl) -{ - Q_Q(Network); - if (socket) { - socket->deleteLater(); - socket = 0; - } - - if (ssl) { -#ifndef QT_NO_SSL - socket = new QSslSocket(q); - QSslSocket *sslSocket = static_cast(socket.data()); - sslSocket->setProtocol(QSsl::AnyProtocol); - QObject::connect(socket, SIGNAL(sslErrors(const QList &)), q, SLOT(sslErrors())); - QObject::connect(socket, SIGNAL(connected()), q, SLOT(conectionReady())); -#else - qWarning("AMQP: You library has builded with QT_NO_SSL option."); -#endif - } else { - socket = new QTcpSocket(q); - QObject::connect(socket, SIGNAL(connected()), q, SLOT(conectionReady())); - } - - if (socket) { - QObject::connect(socket, SIGNAL(disconnected()), q, SIGNAL(disconnected())); - QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(readyRead())); - QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), - q, SLOT(error(QAbstractSocket::SocketError))); - } -} - -} // namespace QAMQP - -////////////////////////////////////////////////////////////////////////// - -using namespace QAMQP; -Network::Network(QObject *parent) - : QObject(parent), - d_ptr(new NetworkPrivate(this)) -{ - Q_D(Network); - d->initSocket(false); -} - -Network::~Network() -{ - disconnect(); -} - -void Network::connectTo(const QString &host, quint16 port) -{ - Q_D(Network); - if (!d->socket) { - qWarning("AMQP: Socket didn't create."); - return; - } - - QString h(host); - int p(port); - d->connect = true; - if (host.isEmpty()) - h = d->lastHost; - if (port == 0) - p = d->lastPort; - - if (isSsl()) { -#ifndef QT_NO_SSL - static_cast(d->socket.data())->connectToHostEncrypted(h, p); -#else - qWarning("AMQP: You library has builded with QT_NO_SSL option."); -#endif - } else { - d->socket->connectToHost(h, p); - } - - d->lastHost = h; - d->lastPort = p; -} - -void Network::disconnect() -{ - Q_D(Network); - d->connect = false; - if (d->socket) - d->socket->close(); -} - -void Network::error(QAbstractSocket::SocketError socketError) -{ - Q_D(Network); - if (d->timeOut == 0) { - d->timeOut = 1000; - } else { - if (d->timeOut < 120000) - d->timeOut *= 5; - } - - switch (socketError) { - case QAbstractSocket::ConnectionRefusedError: - case QAbstractSocket::RemoteHostClosedError: - case QAbstractSocket::SocketTimeoutError: - case QAbstractSocket::NetworkError: - case QAbstractSocket::ProxyConnectionClosedError: - case QAbstractSocket::ProxyConnectionRefusedError: - case QAbstractSocket::ProxyConnectionTimeoutError: - - default: - qWarning() << "AMQP: Socket Error: " << d->socket->errorString(); - break; - } - - if (d->autoReconnect && d->connect) - QTimer::singleShot(d->timeOut, this, SLOT(connectTo())); -} - -void Network::readyRead() -{ - Q_D(Network); - while (d->socket->bytesAvailable() >= Frame::HEADER_SIZE) { - char *headerData = d->buffer.data(); - d->socket->peek(headerData, Frame::HEADER_SIZE); - const quint32 payloadSize = qFromBigEndian(*(quint32*)&headerData[3]); - const qint64 readSize = Frame::HEADER_SIZE + payloadSize + Frame::FRAME_END_SIZE; - - if (d->socket->bytesAvailable() >= readSize) { - d->buffer.resize(readSize); - d->socket->read(d->buffer.data(), readSize); - const char *bufferData = d->buffer.constData(); - const quint8 type = *(quint8*)&bufferData[0]; - const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize]; - if (magic != Frame::FRAME_END) - qWarning() << "Wrong end frame"; - - QDataStream streamB(&d->buffer, QIODevice::ReadOnly); - switch(Frame::Type(type)) { - case Frame::ftMethod: - { - Frame::Method frame(streamB); - if (frame.methodClass() == Frame::fcConnection) { - d->connectionMethodHandler->_q_method(frame); - } else { - foreach (Frame::MethodHandler *methodHandler, d->methodHandlersByChannel[frame.channel()]) - methodHandler->_q_method(frame); - } - } - break; - case Frame::ftHeader: - { - Frame::Content frame(streamB); - foreach (Frame::ContentHandler *methodHandler, d->contentHandlerByChannel[frame.channel()]) - methodHandler->_q_content(frame); - } - break; - case Frame::ftBody: - { - Frame::ContentBody frame(streamB); - foreach (Frame::ContentBodyHandler *methodHandler, d->bodyHandlersByChannel[frame.channel()]) - methodHandler->_q_body(frame); - } - break; - case Frame::ftHeartbeat: - qDebug("AMQP: Heartbeat"); - break; - default: - qWarning() << "AMQP: Unknown frame type: " << type; - } - } else { - break; - } - } -} - -void Network::sendFrame(const Frame::Base &frame) -{ - Q_D(Network); - if (d->socket->state() != QAbstractSocket::ConnectedState) { - qDebug() << Q_FUNC_INFO << "socket not connected: " << d->socket->state(); - return; - } - - QDataStream stream(d->socket); - frame.toStream(stream); -} - -bool Network::isSsl() const -{ - Q_D(const Network); - if (d->socket) - return QString(d->socket->metaObject()->className()).compare("QSslSocket", Qt::CaseInsensitive) == 0; - return false; -} - -void Network::setSsl(bool value) -{ - Q_D(Network); - d->initSocket(value); -} - -void Network::sslErrors() -{ -#ifndef QT_NO_SSL - Q_D(Network); - static_cast(d->socket.data())->ignoreSslErrors(); -#endif -} - -void Network::conectionReady() -{ - Q_D(Network); - d->timeOut = 0; - char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1}; - d->socket->write(header, 8); - Q_EMIT connected(); -} - -bool Network::autoReconnect() const -{ - Q_D(const Network); - return d->autoReconnect; -} - -void Network::setAutoReconnect(bool value) -{ - Q_D(Network); - d->autoReconnect = value; -} - -QAbstractSocket::SocketState Network::state() const -{ - Q_D(const Network); - if (d->socket) - return d->socket->state(); - return QAbstractSocket::UnconnectedState; -} - -void Network::setMethodHandlerConnection(Frame::MethodHandler *connectionMethodHandler) -{ - Q_D(Network); - d->connectionMethodHandler = connectionMethodHandler; -} - -void Network::addMethodHandlerForChannel(Channel channel, Frame::MethodHandler *methodHandler) -{ - Q_D(Network); - d->methodHandlersByChannel[channel].append(methodHandler); -} - -void Network::addContentHandlerForChannel(Channel channel, Frame::ContentHandler *methodHandler) -{ - Q_D(Network); - d->contentHandlerByChannel[channel].append(methodHandler); -} - -void Network::addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler *methodHandler) -{ - Q_D(Network); - d->bodyHandlersByChannel[channel].append(methodHandler); -} diff --git a/src/amqp_network_p.h b/src/amqp_network_p.h deleted file mode 100644 index f9fd0cd..0000000 --- a/src/amqp_network_p.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef amqp_network_h__ -#define amqp_network_h__ - -#include -#include -#ifndef QT_NO_SSL -# include -#endif -#include -#include - -#include "amqp_frame.h" -#include "amqp_global.h" - -namespace QAMQP -{ - -class NetworkPrivate; -class QAMQP_EXPORT Network : public QObject -{ - Q_OBJECT -public: - Network(QObject *parent = 0); - ~Network(); - - void disconnect(); - void sendFrame(const Frame::Base &frame); - - bool isSsl() const; - void setSsl(bool value); - - bool autoReconnect() const; - void setAutoReconnect(bool value); - - QAbstractSocket::SocketState state() const; - - typedef qint16 Channel; - void setMethodHandlerConnection(Frame::MethodHandler *pMethodHandlerConnection); - void addMethodHandlerForChannel(Channel channel, Frame::MethodHandler *pHandler); - void addContentHandlerForChannel(Channel channel, Frame::ContentHandler *pHandler); - void addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler *pHandler); - -Q_SIGNALS: - void connected(); - void disconnected(); - -public Q_SLOTS: - void connectTo(const QString &host = QString(), quint16 port = 0); - void error(QAbstractSocket::SocketError socketError); - -private Q_SLOTS: - void readyRead(); - void sslErrors(); - void conectionReady(); - -private: - Q_DISABLE_COPY(Network) - Q_DECLARE_PRIVATE(Network) - QScopedPointer d_ptr; - -}; - -} // namespace QAMQP - -#endif // amqp_network_h__ diff --git a/src/src.pro b/src/src.pro index 8fc8c1c..29879f5 100644 --- a/src/src.pro +++ b/src/src.pro @@ -13,9 +13,7 @@ win32:DESTDIR = $$OUT_PWD PRIVATE_HEADERS += \ amqp_channel_p.h \ amqp_client_p.h \ - amqp_connection_p.h \ amqp_exchange_p.h \ - amqp_network_p.h \ amqp_queue_p.h INSTALL_HEADERS += \ @@ -36,11 +34,9 @@ SOURCES += \ amqp_authenticator.cpp \ amqp_channel.cpp \ amqp_client.cpp \ - amqp_connection.cpp \ amqp_exchange.cpp \ amqp_frame.cpp \ amqp_message.cpp \ - amqp_network.cpp \ amqp_queue.cpp # install