refactored QAMQP::Network to include a dptr, made whole class private as it is

not meant to be used externally (only used by Client internally)
This commit is contained in:
Matt Broadstone 2014-06-03 12:28:12 -04:00
parent 60bcfabe2b
commit 128f350cf8
8 changed files with 163 additions and 117 deletions

View File

@ -3,6 +3,7 @@
#include "amqp_client.h" #include "amqp_client.h"
#include "amqp_client_p.h" #include "amqp_client_p.h"
#include "amqp_connection_p.h" #include "amqp_connection_p.h"
#include "amqp_network_p.h"
#include <QDebug> #include <QDebug>
#include <QDataStream> #include <QDataStream>

View File

@ -3,6 +3,7 @@
#include "amqp_global.h" #include "amqp_global.h"
#include "amqp_exchange.h" #include "amqp_exchange.h"
#include "amqp_exchange_p.h" #include "amqp_exchange_p.h"
#include "amqp_network_p.h"
#include "amqp_queue.h" #include "amqp_queue.h"
#include "amqp_queue_p.h" #include "amqp_queue_p.h"
#include "amqp_connection_p.h" #include "amqp_connection_p.h"

View File

@ -2,16 +2,16 @@
#define amqp_client_p_h__ #define amqp_client_p_h__
#include <QSharedPointer> #include <QSharedPointer>
#include <QPointer>
#include "amqp_network.h"
#include "amqp_authenticator.h"
namespace QAMQP namespace QAMQP
{ {
class Queue; class Queue;
class Exchange; class Exchange;
class Network;
class Connection; class Connection;
class Authenticator;
class ClientPrivate class ClientPrivate
{ {
public: public:

View File

@ -3,6 +3,8 @@
#include "amqp_client_p.h" #include "amqp_client_p.h"
#include "amqp_frame.h" #include "amqp_frame.h"
#include "amqp_global.h" #include "amqp_global.h"
#include "amqp_network_p.h"
#include "amqp_authenticator.h"
#include <QDebug> #include <QDebug>
#include <QDataStream> #include <QDataStream>

View File

@ -1,7 +1,6 @@
#ifndef qamqp_global_h__ #ifndef qamqp_global_h__
#define qamqp_global_h__ #define qamqp_global_h__
#define QAMQP_P_INCLUDE
#define AMQPSCHEME "amqp" #define AMQPSCHEME "amqp"
#define AMQPSSCHEME "amqps" #define AMQPSSCHEME "amqps"
#define AMQPPORT 5672 #define AMQPPORT 5672

View File

@ -1,20 +1,88 @@
#include "amqp_network.h" #include "amqp_network_p.h"
#include <QDebug> #include <QDebug>
#include <QTimer> #include <QTimer>
#include <QtEndian> #include <QtEndian>
using namespace QAMQP; namespace QAMQP {
Network::Network(QObject *parent) class NetworkPrivate {
: QObject(parent) public:
NetworkPrivate(Network *qq);
void initSocket(bool ssl = false);
static int s_frameMethodMetaType;
QPointer<QTcpSocket> socket;
QByteArray buffer;
QString lastHost;
int lastPort;
bool autoReconnect;
int timeOut;
bool connect;
Frame::MethodHandler *connectionMethodHandler;
QHash<Network::Channel, QList<Frame::MethodHandler*> > methodHandlersByChannel;
QHash<Network::Channel, QList<Frame::ContentHandler*> > contentHandlerByChannel;
QHash<Network::Channel, QList<Frame::ContentBodyHandler*> > bodyHandlersByChannel;
Q_DECLARE_PUBLIC(Network)
Network * const q_ptr;
};
int NetworkPrivate::s_frameMethodMetaType = qRegisterMetaType<Frame::Method>("QAMQP::Frame::Method");
NetworkPrivate::NetworkPrivate(Network *qq)
: lastPort(0),
autoReconnect(false),
timeOut(1000),
connect(false),
q_ptr(qq)
{ {
qRegisterMetaType<Frame::Method>("QAMQP::Frame::Method"); buffer.reserve(Frame::HEADER_SIZE);
}
buffer_.reserve(Frame::HEADER_SIZE); void NetworkPrivate::initSocket(bool ssl)
timeOut_ = 1000; {
connect_ = false; Q_Q(Network);
if (socket) {
socket->deleteLater();
socket = 0;
}
initSocket(false); if (ssl) {
#ifndef QT_NO_SSL
socket = new QSslSocket(q);
QSslSocket *sslSocket = static_cast<QSslSocket*>(socket.data());
sslSocket->setProtocol(QSsl::AnyProtocol);
QObject::connect(socket, SIGNAL(sslErrors(const QList<QSslError> &)), q, SLOT(sslErrors()));
QObject::connect(socket, SIGNAL(connected()), q, SLOT(conectionReady()));
#else
qWarning("AMQP: You library has builded with QT_NO_SSL option.");
#endif
} else {
socket = new QTcpSocket(q);
QObject::connect(socket, SIGNAL(connected()), q, SLOT(conectionReady()));
}
if (socket) {
QObject::connect(socket, SIGNAL(disconnected()), q, SIGNAL(disconnected()));
QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(readyRead()));
QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
q, SLOT(error(QAbstractSocket::SocketError)));
}
}
} // namespace QAMQP
//////////////////////////////////////////////////////////////////////////
using namespace QAMQP;
Network::Network(QObject *parent)
: QObject(parent),
d_ptr(new NetworkPrivate(this))
{
Q_D(Network);
d->initSocket(false);
} }
Network::~Network() Network::~Network()
@ -24,47 +92,50 @@ Network::~Network()
void Network::connectTo(const QString &host, quint16 port) void Network::connectTo(const QString &host, quint16 port)
{ {
if (!socket_) { Q_D(Network);
if (!d->socket) {
qWarning("AMQP: Socket didn't create."); qWarning("AMQP: Socket didn't create.");
return; return;
} }
QString h(host); QString h(host);
int p(port); int p(port);
connect_ = true; d->connect = true;
if (host.isEmpty()) if (host.isEmpty())
h = lastHost_ ; h = d->lastHost;
if (port == 0) if (port == 0)
p = lastPort_; p = d->lastPort;
if (isSsl()) { if (isSsl()) {
#ifndef QT_NO_SSL #ifndef QT_NO_SSL
static_cast<QSslSocket*>(socket_.data())->connectToHostEncrypted(h, p); static_cast<QSslSocket*>(d->socket.data())->connectToHostEncrypted(h, p);
#else #else
qWarning("AMQP: You library has builded with QT_NO_SSL option."); qWarning("AMQP: You library has builded with QT_NO_SSL option.");
#endif #endif
} else { } else {
socket_->connectToHost(h, p); d->socket->connectToHost(h, p);
} }
lastHost_ = h; d->lastHost = h;
lastPort_ = p; d->lastPort = p;
} }
void Network::disconnect() void Network::disconnect()
{ {
connect_ = false; Q_D(Network);
if (socket_) d->connect = false;
socket_->close(); if (d->socket)
d->socket->close();
} }
void Network::error(QAbstractSocket::SocketError socketError) void Network::error(QAbstractSocket::SocketError socketError)
{ {
if (timeOut_ == 0) { Q_D(Network);
timeOut_ = 1000; if (d->timeOut == 0) {
d->timeOut = 1000;
} else { } else {
if (timeOut_ < 120000) if (d->timeOut < 120000)
timeOut_ *= 5; d->timeOut *= 5;
} }
switch (socketError) { switch (socketError) {
@ -77,56 +148,57 @@ void Network::error(QAbstractSocket::SocketError socketError)
case QAbstractSocket::ProxyConnectionTimeoutError: case QAbstractSocket::ProxyConnectionTimeoutError:
default: default:
qWarning() << "AMQP: Socket Error: " << socket_->errorString(); qWarning() << "AMQP: Socket Error: " << d->socket->errorString();
break; break;
} }
if (autoReconnect_ && connect_) if (d->autoReconnect && d->connect)
QTimer::singleShot(timeOut_, this, SLOT(connectTo())); QTimer::singleShot(d->timeOut, this, SLOT(connectTo()));
} }
void Network::readyRead() void Network::readyRead()
{ {
while (socket_->bytesAvailable() >= Frame::HEADER_SIZE) { Q_D(Network);
char *headerData = buffer_.data(); while (d->socket->bytesAvailable() >= Frame::HEADER_SIZE) {
socket_->peek(headerData, Frame::HEADER_SIZE); char *headerData = d->buffer.data();
d->socket->peek(headerData, Frame::HEADER_SIZE);
const quint32 payloadSize = qFromBigEndian<quint32>(*(quint32*)&headerData[3]); const quint32 payloadSize = qFromBigEndian<quint32>(*(quint32*)&headerData[3]);
const qint64 readSize = Frame::HEADER_SIZE+payloadSize + Frame::FRAME_END_SIZE; const qint64 readSize = Frame::HEADER_SIZE + payloadSize + Frame::FRAME_END_SIZE;
if (socket_->bytesAvailable() >= readSize) { if (d->socket->bytesAvailable() >= readSize) {
buffer_.resize(readSize); d->buffer.resize(readSize);
socket_->read(buffer_.data(), readSize); d->socket->read(d->buffer.data(), readSize);
const char *bufferData = buffer_.constData(); const char *bufferData = d->buffer.constData();
const quint8 type = *(quint8*)&bufferData[0]; const quint8 type = *(quint8*)&bufferData[0];
const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize]; const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize];
if (magic != Frame::FRAME_END) if (magic != Frame::FRAME_END)
qWarning() << "Wrong end frame"; qWarning() << "Wrong end frame";
QDataStream streamB(&buffer_, QIODevice::ReadOnly); QDataStream streamB(&d->buffer, QIODevice::ReadOnly);
switch(Frame::Type(type)) { switch(Frame::Type(type)) {
case Frame::ftMethod: case Frame::ftMethod:
{ {
Frame::Method frame(streamB); Frame::Method frame(streamB);
if (frame.methodClass() == Frame::fcConnection) { if (frame.methodClass() == Frame::fcConnection) {
m_pMethodHandlerConnection->_q_method(frame); d->connectionMethodHandler->_q_method(frame);
} else { } else {
foreach (Frame::MethodHandler *pMethodHandler, m_methodHandlersByChannel[frame.channel()]) foreach (Frame::MethodHandler *methodHandler, d->methodHandlersByChannel[frame.channel()])
pMethodHandler->_q_method(frame); methodHandler->_q_method(frame);
} }
} }
break; break;
case Frame::ftHeader: case Frame::ftHeader:
{ {
Frame::Content frame(streamB); Frame::Content frame(streamB);
foreach (Frame::ContentHandler *pMethodHandler, m_contentHandlerByChannel[frame.channel()]) foreach (Frame::ContentHandler *methodHandler, d->contentHandlerByChannel[frame.channel()])
pMethodHandler->_q_content(frame); methodHandler->_q_content(frame);
} }
break; break;
case Frame::ftBody: case Frame::ftBody:
{ {
Frame::ContentBody frame(streamB); Frame::ContentBody frame(streamB);
foreach (Frame::ContentBodyHandler *pMethodHandler, m_bodyHandlersByChannel[frame.channel()]) foreach (Frame::ContentBodyHandler *methodHandler, d->bodyHandlersByChannel[frame.channel()])
pMethodHandler->_q_body(frame); methodHandler->_q_body(frame);
} }
break; break;
case Frame::ftHeartbeat: case Frame::ftHeartbeat:
@ -143,105 +215,87 @@ void Network::readyRead()
void Network::sendFrame(const Frame::Base &frame) void Network::sendFrame(const Frame::Base &frame)
{ {
if (socket_->state() != QAbstractSocket::ConnectedState) { Q_D(Network);
qDebug() << Q_FUNC_INFO << "socket not connected: " << socket_->state(); if (d->socket->state() != QAbstractSocket::ConnectedState) {
qDebug() << Q_FUNC_INFO << "socket not connected: " << d->socket->state();
return; return;
} }
QDataStream stream(socket_); QDataStream stream(d->socket);
frame.toStream(stream); frame.toStream(stream);
} }
bool Network::isSsl() const bool Network::isSsl() const
{ {
if (socket_) Q_D(const Network);
return QString(socket_->metaObject()->className()).compare("QSslSocket", Qt::CaseInsensitive) == 0; if (d->socket)
return QString(d->socket->metaObject()->className()).compare("QSslSocket", Qt::CaseInsensitive) == 0;
return false; return false;
} }
void Network::setSsl(bool value) void Network::setSsl(bool value)
{ {
initSocket(value); Q_D(Network);
} d->initSocket(value);
void Network::initSocket(bool ssl)
{
if (socket_) {
socket_->deleteLater();
socket_ = 0;
}
if (ssl) {
#ifndef QT_NO_SSL
socket_ = new QSslSocket(this);
QSslSocket *ssl_= static_cast<QSslSocket*> (socket_.data());
ssl_->setProtocol(QSsl::AnyProtocol);
connect(socket_, SIGNAL(sslErrors(const QList<QSslError> &)), this, SLOT(sslErrors()));
connect(socket_, SIGNAL(connected()), this, SLOT(conectionReady()));
#else
qWarning("AMQP: You library has builded with QT_NO_SSL option.");
#endif
} else {
socket_ = new QTcpSocket(this);
connect(socket_, SIGNAL(connected()), this, SLOT(conectionReady()));
}
if (socket_) {
connect(socket_, SIGNAL(disconnected()), this, SIGNAL(disconnected()));
connect(socket_, SIGNAL(readyRead()), this, SLOT(readyRead()));
connect(socket_, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(error(QAbstractSocket::SocketError)));
}
} }
void Network::sslErrors() void Network::sslErrors()
{ {
#ifndef QT_NO_SSL #ifndef QT_NO_SSL
static_cast<QSslSocket*>(socket_.data())->ignoreSslErrors(); Q_D(Network);
static_cast<QSslSocket*>(d->socket.data())->ignoreSslErrors();
#endif #endif
} }
void Network::conectionReady() void Network::conectionReady()
{ {
timeOut_ = 0; Q_D(Network);
d->timeOut = 0;
char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1}; char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1};
socket_->write(header, 8); d->socket->write(header, 8);
Q_EMIT connected(); Q_EMIT connected();
} }
bool Network::autoReconnect() const bool Network::autoReconnect() const
{ {
return autoReconnect_; Q_D(const Network);
return d->autoReconnect;
} }
void Network::setAutoReconnect(bool value) void Network::setAutoReconnect(bool value)
{ {
autoReconnect_ = value; Q_D(Network);
d->autoReconnect = value;
} }
QAbstractSocket::SocketState Network::state() const QAbstractSocket::SocketState Network::state() const
{ {
if (socket_) Q_D(const Network);
return socket_->state(); if (d->socket)
return d->socket->state();
return QAbstractSocket::UnconnectedState; return QAbstractSocket::UnconnectedState;
} }
void Network::setMethodHandlerConnection(Frame::MethodHandler *pMethodHandlerConnection) void Network::setMethodHandlerConnection(Frame::MethodHandler *connectionMethodHandler)
{ {
m_pMethodHandlerConnection = pMethodHandlerConnection; Q_D(Network);
d->connectionMethodHandler = connectionMethodHandler;
} }
void Network::addMethodHandlerForChannel(Channel channel, Frame::MethodHandler *pHandler) void Network::addMethodHandlerForChannel(Channel channel, Frame::MethodHandler *methodHandler)
{ {
m_methodHandlersByChannel[channel].append(pHandler); Q_D(Network);
d->methodHandlersByChannel[channel].append(methodHandler);
} }
void Network::addContentHandlerForChannel(Channel channel, Frame::ContentHandler *pHandler) void Network::addContentHandlerForChannel(Channel channel, Frame::ContentHandler *methodHandler)
{ {
m_contentHandlerByChannel[channel].append(pHandler); Q_D(Network);
d->contentHandlerByChannel[channel].append(methodHandler);
} }
void Network::addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler *pHandler) void Network::addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler *methodHandler)
{ {
m_bodyHandlersByChannel[channel].append(pHandler); Q_D(Network);
d->bodyHandlersByChannel[channel].append(methodHandler);
} }

View File

@ -15,6 +15,7 @@
namespace QAMQP namespace QAMQP
{ {
class NetworkPrivate;
class QAMQP_EXPORT Network : public QObject class QAMQP_EXPORT Network : public QObject
{ {
Q_OBJECT Q_OBJECT
@ -39,14 +40,14 @@ public:
void addContentHandlerForChannel(Channel channel, Frame::ContentHandler *pHandler); void addContentHandlerForChannel(Channel channel, Frame::ContentHandler *pHandler);
void addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler *pHandler); void addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler *pHandler);
public Q_SLOTS:
void connectTo(const QString &host = QString(), quint16 port = 0);
void error(QAbstractSocket::SocketError socketError);
Q_SIGNALS: Q_SIGNALS:
void connected(); void connected();
void disconnected(); void disconnected();
public Q_SLOTS:
void connectTo(const QString &host = QString(), quint16 port = 0);
void error(QAbstractSocket::SocketError socketError);
private Q_SLOTS: private Q_SLOTS:
void readyRead(); void readyRead();
void sslErrors(); void sslErrors();
@ -54,21 +55,9 @@ private Q_SLOTS:
private: private:
Q_DISABLE_COPY(Network) Q_DISABLE_COPY(Network)
Q_DECLARE_PRIVATE(Network)
QScopedPointer<NetworkPrivate> d_ptr;
void initSocket(bool ssl = false);
QPointer<QTcpSocket> socket_;
QByteArray buffer_;
QString lastHost_;
int lastPort_;
bool autoReconnect_;
int timeOut_;
bool connect_;
Frame::MethodHandler *m_pMethodHandlerConnection;
QHash<Channel, QList<Frame::MethodHandler*> > m_methodHandlersByChannel;
QHash<Channel, QList<Frame::ContentHandler*> > m_contentHandlerByChannel;
QHash<Channel, QList<Frame::ContentBodyHandler*> > m_bodyHandlersByChannel;
}; };
} // namespace QAMQP } // namespace QAMQP

View File

@ -15,6 +15,7 @@ PRIVATE_HEADERS += \
amqp_client_p.h \ amqp_client_p.h \
amqp_connection_p.h \ amqp_connection_p.h \
amqp_exchange_p.h \ amqp_exchange_p.h \
amqp_network_p.h \
amqp_queue_p.h amqp_queue_p.h
INSTALL_HEADERS += \ INSTALL_HEADERS += \
@ -25,7 +26,6 @@ INSTALL_HEADERS += \
amqp_frame.h \ amqp_frame.h \
amqp_global.h \ amqp_global.h \
amqp_message.h \ amqp_message.h \
amqp_network.h \
amqp_queue.h amqp_queue.h
HEADERS += \ HEADERS += \