merged Client Connection and Network classes, fixed a bug allowing for proper

connecting and disconnecting from a broker
This commit is contained in:
Matt Broadstone 2014-06-03 16:11:30 -04:00
parent fde4bcf39a
commit d4b1824b46
9 changed files with 450 additions and 862 deletions

View File

@ -2,8 +2,6 @@
#include "amqp_channel_p.h" #include "amqp_channel_p.h"
#include "amqp_client.h" #include "amqp_client.h"
#include "amqp_client_p.h" #include "amqp_client_p.h"
#include "amqp_connection_p.h"
#include "amqp_network_p.h"
#include <QDebug> #include <QDebug>
#include <QDataStream> #include <QDataStream>
@ -89,7 +87,7 @@ void ChannelPrivate::_q_open()
void ChannelPrivate::sendFrame(const Frame::Base &frame) void ChannelPrivate::sendFrame(const Frame::Base &frame)
{ {
if (client) if (client)
client->d_func()->network_->sendFrame(frame); client->d_func()->sendFrame(frame);
} }
void ChannelPrivate::open() void ChannelPrivate::open()
@ -97,7 +95,7 @@ void ChannelPrivate::open()
if (!needOpen || opened) if (!needOpen || opened)
return; return;
if (!client->d_func()->connection_->isConnected()) if (!client->isConnected())
return; return;
qDebug("Open channel #%d", number); qDebug("Open channel #%d", number);

View File

@ -3,13 +3,14 @@
#include "amqp_global.h" #include "amqp_global.h"
#include "amqp_exchange.h" #include "amqp_exchange.h"
#include "amqp_exchange_p.h" #include "amqp_exchange_p.h"
#include "amqp_network_p.h"
#include "amqp_queue.h" #include "amqp_queue.h"
#include "amqp_queue_p.h" #include "amqp_queue_p.h"
#include "amqp_connection_p.h"
#include "amqp_authenticator.h" #include "amqp_authenticator.h"
#include <QTimer>
#include <QTcpSocket>
#include <QTextStream> #include <QTextStream>
#include <QtEndian>
using namespace QAMQP; using namespace QAMQP;
@ -17,6 +18,9 @@ ClientPrivate::ClientPrivate(Client *q)
: port(AMQPPORT), : port(AMQPPORT),
host(QString::fromLatin1(AMQPHOST)), host(QString::fromLatin1(AMQPHOST)),
virtualHost(QString::fromLatin1(AMQPVHOST)), virtualHost(QString::fromLatin1(AMQPVHOST)),
socket(0),
closed(false),
connected(false),
q_ptr(q) q_ptr(q)
{ {
} }
@ -28,21 +32,17 @@ ClientPrivate::~ClientPrivate()
void ClientPrivate::init(const QUrl &connectionString) void ClientPrivate::init(const QUrl &connectionString)
{ {
Q_Q(Client); Q_Q(Client);
if (!network_) { socket = new QTcpSocket(q);
network_ = new Network(q); QObject::connect(socket, SIGNAL(connected()), q, SLOT(_q_socketConnected()));
QObject::connect(network_.data(), SIGNAL(connected()), q, SIGNAL(connected())); QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(_q_readyRead()));
QObject::connect(network_.data(), SIGNAL(disconnected()), q, SIGNAL(disconnected())); QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
} q, SLOT(_q_socketError(QAbstractSocket::SocketError)));
if (!connection_) heartbeatTimer = new QTimer(q);
connection_ = new Connection(network_, q); QObject::connect(heartbeatTimer, SIGNAL(timeout()), q, SLOT(_q_heartbeat()));
network_->setMethodHandlerConnection(connection_);
auth_ = QSharedPointer<Authenticator>( authenticator = QSharedPointer<Authenticator>(
new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD))); new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD)));
QObject::connect(connection_, SIGNAL(connected()), q, SIGNAL(connected()));
QObject::connect(connection_, SIGNAL(disconnected()), q, SIGNAL(disconnected()));
if (connectionString.isValid()) { if (connectionString.isValid()) {
parseConnectionString(connectionString); parseConnectionString(connectionString);
@ -50,13 +50,6 @@ void ClientPrivate::init(const QUrl &connectionString)
} }
} }
void ClientPrivate::connect()
{
if (network_->state() != QAbstractSocket::UnconnectedState)
disconnect();
network_->connectTo(host, port);
}
void ClientPrivate::parseConnectionString(const QUrl &connectionString) void ClientPrivate::parseConnectionString(const QUrl &connectionString)
{ {
Q_Q(Client); Q_Q(Client);
@ -66,7 +59,6 @@ void ClientPrivate::parseConnectionString(const QUrl &connectionString)
return; return;
} }
q->setSsl(connectionString.scheme() == AMQPSSCHEME);
q->setPassword(connectionString.password()); q->setPassword(connectionString.password());
q->setUser(connectionString.userName()); q->setUser(connectionString.userName());
q->setPort(connectionString.port(AMQPPORT)); q->setPort(connectionString.port(AMQPPORT));
@ -74,21 +66,354 @@ void ClientPrivate::parseConnectionString(const QUrl &connectionString)
q->setVirtualHost(connectionString.path()); q->setVirtualHost(connectionString.path());
} }
void ClientPrivate::connect()
{
if (socket->state() != QAbstractSocket::UnconnectedState) {
qDebug() << Q_FUNC_INFO << "socket already connected, disconnecting..";
disconnect();
}
socket->connectToHost(host, port);
}
void ClientPrivate::disconnect() void ClientPrivate::disconnect()
{ {
if (network_->state() == QAbstractSocket::UnconnectedState) { if (socket->state() == QAbstractSocket::UnconnectedState) {
qDebug() << Q_FUNC_INFO << "already disconnected"; qDebug() << Q_FUNC_INFO << "already disconnected";
return; return;
} }
connection_->close(); close(200, "client.disconnect");
network_->disconnect();
// NOTE: this should be handled by signals, no need for dptr // NOTE: this should be handled by signals, no need for dptr
// access here. // access here.
// connection_->d_func()->connected = false; // connection_->d_func()->connected = false;
} }
// private slots
void ClientPrivate::_q_socketConnected()
{
timeout = 0;
char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1};
socket->write(header, 8);
}
void ClientPrivate::_q_heartbeat()
{
Frame::Heartbeat frame;
sendFrame(frame);
}
void ClientPrivate::_q_socketError(QAbstractSocket::SocketError error)
{
if (timeout == 0) {
timeout = 1000;
} else {
if (timeout < 120000)
timeout *= 5;
}
switch (error) {
case QAbstractSocket::ConnectionRefusedError:
case QAbstractSocket::RemoteHostClosedError:
case QAbstractSocket::SocketTimeoutError:
case QAbstractSocket::NetworkError:
case QAbstractSocket::ProxyConnectionClosedError:
case QAbstractSocket::ProxyConnectionRefusedError:
case QAbstractSocket::ProxyConnectionTimeoutError:
default:
qWarning() << "AMQP: Socket Error: " << socket->errorString();
break;
}
// if (autoReconnect && connect)
// QTimer::singleShot(timeout, this, SLOT(connectTo()));
}
void ClientPrivate::_q_readyRead()
{
while (socket->bytesAvailable() >= Frame::HEADER_SIZE) {
char *headerData = buffer.data();
socket->peek(headerData, Frame::HEADER_SIZE);
const quint32 payloadSize = qFromBigEndian<quint32>(*(quint32*)&headerData[3]);
const qint64 readSize = Frame::HEADER_SIZE + payloadSize + Frame::FRAME_END_SIZE;
if (socket->bytesAvailable() >= readSize) {
buffer.resize(readSize);
socket->read(buffer.data(), readSize);
const char *bufferData = buffer.constData();
const quint8 type = *(quint8*)&bufferData[0];
const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize];
if (magic != Frame::FRAME_END)
qWarning() << "Wrong end frame";
QDataStream streamB(&buffer, QIODevice::ReadOnly);
switch(Frame::Type(type)) {
case Frame::ftMethod:
{
Frame::Method frame(streamB);
if (frame.methodClass() == Frame::fcConnection) {
_q_method(frame);
} else {
foreach (Frame::MethodHandler *methodHandler, methodHandlersByChannel[frame.channel()])
methodHandler->_q_method(frame);
}
}
break;
case Frame::ftHeader:
{
Frame::Content frame(streamB);
foreach (Frame::ContentHandler *methodHandler, contentHandlerByChannel[frame.channel()])
methodHandler->_q_content(frame);
}
break;
case Frame::ftBody:
{
Frame::ContentBody frame(streamB);
foreach (Frame::ContentBodyHandler *methodHandler, bodyHandlersByChannel[frame.channel()])
methodHandler->_q_body(frame);
}
break;
case Frame::ftHeartbeat:
qDebug("AMQP: Heartbeat");
break;
default:
qWarning() << "AMQP: Unknown frame type: " << type;
}
} else {
break;
}
}
}
void ClientPrivate::sendFrame(const Frame::Base &frame)
{
if (socket->state() != QAbstractSocket::ConnectedState) {
qDebug() << Q_FUNC_INFO << "socket not connected: " << socket->state();
return;
}
QDataStream stream(socket);
frame.toStream(stream);
}
void ClientPrivate::_q_method(const Frame::Method &frame)
{
Q_ASSERT(frame.methodClass() == Frame::fcConnection);
if (frame.methodClass() != Frame::fcConnection)
return;
qDebug() << "Connection:";
if (closed) {
if (frame.id() == ClientPrivate::miCloseOk)
closeOk(frame);
return;
}
switch (ClientPrivate::MethodId(frame.id())) {
case ClientPrivate::miStart:
start(frame);
break;
case ClientPrivate::miSecure:
secure(frame);
break;
case ClientPrivate::miTune:
tune(frame);
break;
case ClientPrivate::miOpenOk:
openOk(frame);
break;
case ClientPrivate::miClose:
close(frame);
break;
case ClientPrivate::miCloseOk:
closeOk(frame);
break;
default:
qWarning("Unknown method-id %d", frame.id());
}
}
void ClientPrivate::start(const Frame::Method &frame)
{
qDebug(">> Start");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
quint8 version_major = 0;
quint8 version_minor = 0;
stream >> version_major >> version_minor;
Frame::TableField table;
Frame::deserialize(stream, table);
QString mechanisms = Frame::readField('S', stream).toString();
QString locales = Frame::readField('S', stream).toString();
qDebug(">> version_major: %d", version_major);
qDebug(">> version_minor: %d", version_minor);
Frame::print(table);
qDebug(">> mechanisms: %s", qPrintable(mechanisms));
qDebug(">> locales: %s", qPrintable(locales));
startOk();
}
void ClientPrivate::secure(const Frame::Method &frame)
{
Q_UNUSED(frame)
qDebug() << Q_FUNC_INFO << "called!";
}
void ClientPrivate::tune(const Frame::Method &frame)
{
qDebug(">> Tune");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 channel_max = 0,
heartbeat = 0;
qint32 frame_max = 0;
stream >> channel_max;
stream >> frame_max;
stream >> heartbeat;
qDebug(">> channel_max: %d", channel_max);
qDebug(">> frame_max: %d", frame_max);
qDebug(">> heartbeat: %d", heartbeat);
if (heartbeatTimer) {
heartbeatTimer->setInterval(heartbeat * 1000);
if (heartbeatTimer->interval())
heartbeatTimer->start();
else
heartbeatTimer->stop();
}
tuneOk();
open();
}
void ClientPrivate::openOk(const Frame::Method &frame)
{
Q_Q(Client);
Q_UNUSED(frame)
qDebug(">> OpenOK");
connected = true;
Q_EMIT q->connected();
}
void ClientPrivate::closeOk(const Frame::Method &frame)
{
Q_Q(Client);
Q_UNUSED(frame)
qDebug() << Q_FUNC_INFO << "received";
connected = false;
if (heartbeatTimer)
heartbeatTimer->stop();
Q_EMIT q->disconnected();
}
void ClientPrivate::close(const Frame::Method &frame)
{
Q_Q(Client);
qDebug(">> CLOSE");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code = 0, classId, methodId;
stream >> code;
QString text(Frame::readField('s', stream).toString());
stream >> classId;
stream >> methodId;
qDebug(">> code: %d", code);
qDebug(">> text: %s", qPrintable(text));
qDebug(">> class-id: %d", classId);
qDebug(">> method-id: %d", methodId);
connected = false;
Q_EMIT q->disconnected();
}
void ClientPrivate::startOk()
{
Frame::Method frame(Frame::fcConnection, ClientPrivate::miStartOk);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
Frame::TableField clientProperties;
clientProperties["version"] = QString(QAMQP_VERSION);
clientProperties["platform"] = QString("Qt %1").arg(qVersion());
clientProperties["product"] = QString("QAMQP");
clientProperties.unite(customProperty);
Frame::serialize(stream, clientProperties);
authenticator->write(stream);
Frame::writeField('s', stream, "en_US");
frame.setArguments(arguments);
sendFrame(frame);
}
void ClientPrivate::secureOk()
{
qDebug() << Q_FUNC_INFO;
}
void ClientPrivate::tuneOk()
{
Frame::Method frame(Frame::fcConnection, ClientPrivate::miTuneOk);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
stream << qint16(0); //channel_max
stream << qint32(FRAME_MAX); //frame_max
stream << qint16(heartbeatTimer->interval() / 1000); //heartbeat
frame.setArguments(arguments);
sendFrame(frame);
}
void ClientPrivate::open()
{
Frame::Method frame(Frame::fcConnection, ClientPrivate::miOpen);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
Frame::writeField('s',stream, virtualHost);
stream << qint8(0);
stream << qint8(0);
frame.setArguments(arguments);
sendFrame(frame);
}
void ClientPrivate::close(int code, const QString &text, int classId, int methodId)
{
Frame::Method frame(Frame::fcConnection, ClientPrivate::miClose);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
stream << qint16(code);
Frame::writeField('s', stream, text);
stream << qint16(classId);
stream << qint16(methodId);
frame.setArguments(arguments);
sendFrame(frame);
}
void ClientPrivate::closeOk()
{
Frame::Method frame(Frame::fcConnection, ClientPrivate::miCloseOk);
connected = false;
sendFrame(frame);
}
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
Client::Client(QObject *parent) Client::Client(QObject *parent)
@ -109,6 +434,15 @@ Client::Client(const QUrl &connectionString, QObject *parent)
Client::~Client() Client::~Client()
{ {
Q_D(Client);
if (d->connected)
d->disconnect();
}
bool Client::isConnected() const
{
Q_D(const Client);
return d->connected;
} }
quint16 Client::port() const quint16 Client::port() const
@ -150,7 +484,7 @@ void Client::setVirtualHost(const QString &virtualHost)
QString Client::user() const QString Client::user() const
{ {
Q_D(const Client); Q_D(const Client);
const Authenticator *auth = d->auth_.data(); const Authenticator *auth = d->authenticator.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) { if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
const AMQPlainAuthenticator *a = static_cast<const AMQPlainAuthenticator*>(auth); const AMQPlainAuthenticator *a = static_cast<const AMQPlainAuthenticator*>(auth);
return a->login(); return a->login();
@ -162,7 +496,7 @@ QString Client::user() const
void Client::setUser(const QString &user) void Client::setUser(const QString &user)
{ {
Q_D(const Client); Q_D(const Client);
Authenticator *auth = d->auth_.data(); Authenticator *auth = d->authenticator.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) { if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
AMQPlainAuthenticator *a = static_cast<AMQPlainAuthenticator*>(auth); AMQPlainAuthenticator *a = static_cast<AMQPlainAuthenticator*>(auth);
a->setLogin(user); a->setLogin(user);
@ -172,7 +506,7 @@ void Client::setUser(const QString &user)
QString Client::password() const QString Client::password() const
{ {
Q_D(const Client); Q_D(const Client);
const Authenticator *auth = d->auth_.data(); const Authenticator *auth = d->authenticator.data();
if (auth && auth->type() == "AMQPLAIN") { if (auth && auth->type() == "AMQPLAIN") {
const AMQPlainAuthenticator *a = static_cast<const AMQPlainAuthenticator*>(auth); const AMQPlainAuthenticator *a = static_cast<const AMQPlainAuthenticator*>(auth);
return a->password(); return a->password();
@ -184,7 +518,7 @@ QString Client::password() const
void Client::setPassword(const QString &password) void Client::setPassword(const QString &password)
{ {
Q_D(Client); Q_D(Client);
Authenticator *auth = d->auth_.data(); Authenticator *auth = d->authenticator.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) { if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
AMQPlainAuthenticator *a = static_cast<AMQPlainAuthenticator*>(auth); AMQPlainAuthenticator *a = static_cast<AMQPlainAuthenticator*>(auth);
a->setPassword(password); a->setPassword(password);
@ -200,8 +534,8 @@ Exchange *Client::createExchange(const QString &name, int channelNumber)
{ {
Q_D(Client); Q_D(Client);
Exchange *exchange = new Exchange(channelNumber, this); Exchange *exchange = new Exchange(channelNumber, this);
d->network_->addMethodHandlerForChannel(exchange->channelNumber(), exchange); d->methodHandlersByChannel[exchange->channelNumber()].append(exchange);
connect(d->connection_, SIGNAL(connected()), exchange, SLOT(_q_open())); connect(this, SIGNAL(connected()), exchange, SLOT(_q_open()));
exchange->d_func()->open(); exchange->d_func()->open();
connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected())); connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected()));
if (!name.isEmpty()) if (!name.isEmpty())
@ -218,11 +552,10 @@ Queue *Client::createQueue(const QString &name, int channelNumber)
{ {
Q_D(Client); Q_D(Client);
Queue *queue = new Queue(channelNumber, this); Queue *queue = new Queue(channelNumber, this);
d->network_->addMethodHandlerForChannel(queue->channelNumber(), queue); d->methodHandlersByChannel[queue->channelNumber()].append(queue);
d->network_->addContentHandlerForChannel(queue->channelNumber(), queue); d->contentHandlerByChannel[queue->channelNumber()].append(queue);
d->network_->addContentBodyHandlerForChannel(queue->channelNumber(), queue); d->bodyHandlersByChannel[queue->channelNumber()].append(queue);
connect(this, SIGNAL(connected()), queue, SLOT(_q_open()));
connect(d->connection_, SIGNAL(connected()), queue, SLOT(_q_open()));
queue->d_func()->open(); queue->d_func()->open();
connect(this, SIGNAL(disconnected()), queue, SLOT(_q_disconnected())); connect(this, SIGNAL(disconnected()), queue, SLOT(_q_disconnected()));
@ -231,58 +564,40 @@ Queue *Client::createQueue(const QString &name, int channelNumber)
return queue; return queue;
} }
void Client::setAuth(Authenticator *auth) void Client::setAuth(Authenticator *authenticator)
{ {
Q_D(Client); Q_D(Client);
d->auth_ = QSharedPointer<Authenticator>(auth); d->authenticator = QSharedPointer<Authenticator>(authenticator);
} }
Authenticator *Client::auth() const Authenticator *Client::auth() const
{ {
Q_D(const Client); Q_D(const Client);
return d->auth_.data(); return d->authenticator.data();
}
bool Client::isSsl() const
{
Q_D(const Client);
return d->network_->isSsl();
}
void Client::setSsl(bool value)
{
Q_D(Client);
d->network_->setSsl(value);
} }
bool Client::autoReconnect() const bool Client::autoReconnect() const
{ {
Q_D(const Client); Q_D(const Client);
return d->network_->autoReconnect(); return d->autoReconnect;
} }
void Client::setAutoReconnect(bool value) void Client::setAutoReconnect(bool value)
{ {
Q_D(Client); Q_D(Client);
d->network_->setAutoReconnect(value); d->autoReconnect = value;
}
bool Client::isConnected() const
{
Q_D(const Client);
return d->connection_->isConnected();
} }
void Client::addCustomProperty(const QString &name, const QString &value) void Client::addCustomProperty(const QString &name, const QString &value)
{ {
Q_D(Client); Q_D(Client);
return d->connection_->addCustomProperty(name, value); d->customProperty.insert(name, value);
} }
QString Client::customProperty(const QString &name) const QString Client::customProperty(const QString &name) const
{ {
Q_D(const Client); Q_D(const Client);
return d->connection_->customProperty(name); return d->customProperty.value(name).toString();
} }
void Client::connectToHost(const QString &connectionString) void Client::connectToHost(const QString &connectionString)
@ -310,3 +625,5 @@ void Client::disconnectFromHost()
Q_D(Client); Q_D(Client);
d->disconnect(); d->disconnect();
} }
#include "moc_amqp_client.cpp"

View File

@ -22,9 +22,7 @@ class QAMQP_EXPORT Client : public QObject
Q_PROPERTY(QString virtualHost READ virtualHost WRITE setVirtualHost) Q_PROPERTY(QString virtualHost READ virtualHost WRITE setVirtualHost)
Q_PROPERTY(QString user READ user WRITE setUser) Q_PROPERTY(QString user READ user WRITE setUser)
Q_PROPERTY(QString password READ password WRITE setPassword) Q_PROPERTY(QString password READ password WRITE setPassword)
Q_PROPERTY(bool ssl READ isSsl WRITE setSsl)
Q_PROPERTY(bool autoReconnect READ autoReconnect WRITE setAutoReconnect) Q_PROPERTY(bool autoReconnect READ autoReconnect WRITE setAutoReconnect)
Q_PROPERTY(bool connected READ isConnected)
public: public:
Client(QObject *parent = 0); Client(QObject *parent = 0);
@ -58,16 +56,13 @@ public:
void setAuth(Authenticator *auth); void setAuth(Authenticator *auth);
Authenticator *auth() const; Authenticator *auth() const;
bool isSsl() const;
void setSsl(bool value);
bool autoReconnect() const; bool autoReconnect() const;
void setAutoReconnect(bool value); void setAutoReconnect(bool value);
bool isConnected() const; bool isConnected() const;
void connectToHost(const QString &connectionString = QString()); void connectToHost(const QString &connectionString = QString());
void connectToHost(const QHostAddress &address, quint16 port); void connectToHost(const QHostAddress &address, quint16 port = AMQPPORT);
void disconnectFromHost(); void disconnectFromHost();
Q_SIGNALS: Q_SIGNALS:
@ -79,6 +74,11 @@ private:
Q_DECLARE_PRIVATE(Client) Q_DECLARE_PRIVATE(Client)
QScopedPointer<ClientPrivate> d_ptr; QScopedPointer<ClientPrivate> d_ptr;
Q_PRIVATE_SLOT(d_func(), void _q_socketConnected())
Q_PRIVATE_SLOT(d_func(), void _q_readyRead())
Q_PRIVATE_SLOT(d_func(), void _q_socketError(QAbstractSocket::SocketError error))
Q_PRIVATE_SLOT(d_func(), void _q_heartbeat())
friend class ChannelPrivate; friend class ChannelPrivate;
}; };

View File

@ -1,37 +1,94 @@
#ifndef amqp_client_p_h__ #ifndef amqp_client_p_h__
#define amqp_client_p_h__ #define amqp_client_p_h__
#include <QHash>
#include <QSharedPointer> #include <QSharedPointer>
#include <QPointer> #include <QPointer>
#include <QAbstractSocket>
#include "amqp_frame.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
class QTimer;
class QTcpSocket;
namespace QAMQP namespace QAMQP
{ {
class Client;
class Queue; class Queue;
class Exchange; class Exchange;
class Network; class Network;
class Connection; class Connection;
class Authenticator; class Authenticator;
class ClientPrivate class ClientPrivate : public Frame::MethodHandler
{ {
public: public:
enum MethodId {
METHOD_ID_ENUM(miStart, 10),
METHOD_ID_ENUM(miSecure, 20),
METHOD_ID_ENUM(miTune, 30),
METHOD_ID_ENUM(miOpen, 40),
METHOD_ID_ENUM(miClose, 50)
};
ClientPrivate(Client *q); ClientPrivate(Client *q);
~ClientPrivate(); virtual ~ClientPrivate();
void init(const QUrl &connectionString = QUrl()); void init(const QUrl &connectionString = QUrl());
void connect(); void connect();
void disconnect(); void disconnect();
void parseConnectionString(const QUrl &connectionString); void parseConnectionString(const QUrl &connectionString);
void sendFrame(const Frame::Base &frame);
// private slots
void _q_socketConnected();
void _q_readyRead();
void _q_socketError(QAbstractSocket::SocketError error);
void _q_heartbeat();
virtual void _q_method(const Frame::Method &frame);
// method handlers, FROM server
void start(const Frame::Method &frame);
void secure(const Frame::Method &frame);
void tune(const Frame::Method &frame);
void openOk(const Frame::Method &frame);
void closeOk(const Frame::Method &frame);
// method handlers, TO server
void startOk();
void secureOk();
void tuneOk();
void open();
// method handlers, BOTH ways
void close(int code, const QString &text, int classId = 0, int methodId = 0);
void close(const Frame::Method &frame);
void closeOk();
quint32 port; quint32 port;
QString host; QString host;
QString virtualHost; QString virtualHost;
QPointer<Network> network_; QSharedPointer<Authenticator> authenticator;
QPointer<Connection> connection_;
QSharedPointer<Authenticator> auth_;
bool isSSl() const; // Network
QByteArray buffer;
bool autoReconnect;
int timeout;
bool connecting;
QTcpSocket *socket;
QHash<quint16, QList<Frame::MethodHandler*> > methodHandlersByChannel;
QHash<quint16, QList<Frame::ContentHandler*> > contentHandlerByChannel;
QHash<quint16, QList<Frame::ContentBodyHandler*> > bodyHandlersByChannel;
// Connection
bool closed;
bool connected;
QPointer<QTimer> heartbeatTimer;
Frame::TableField customProperty;
Client * const q_ptr; Client * const q_ptr;
Q_DECLARE_PUBLIC(Client) Q_DECLARE_PUBLIC(Client)

View File

@ -1,347 +0,0 @@
#include "amqp_connection_p.h"
#include "amqp_client.h"
#include "amqp_client_p.h"
#include "amqp_frame.h"
#include "amqp_global.h"
#include "amqp_network_p.h"
#include "amqp_authenticator.h"
#include <QDebug>
#include <QDataStream>
#include <QTimer>
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
namespace QAMQP {
class ConnectionPrivate
{
public:
enum MethodId {
METHOD_ID_ENUM(miStart, 10),
METHOD_ID_ENUM(miSecure, 20),
METHOD_ID_ENUM(miTune, 30),
METHOD_ID_ENUM(miOpen, 40),
METHOD_ID_ENUM(miClose, 50)
};
ConnectionPrivate(Connection *q);
// private slots
void _q_heartbeat();
QPointer<Client> client;
QPointer<Network> network;
bool closed;
bool connected;
QPointer<QTimer> heartbeatTimer;
Frame::TableField customProperty;
Q_DECLARE_PUBLIC(Connection)
Connection * const q_ptr;
};
ConnectionPrivate::ConnectionPrivate(Connection *q)
: closed(false),
connected(false),
q_ptr(q)
{
}
void ConnectionPrivate::_q_heartbeat()
{
Frame::Heartbeat frame;
network->sendFrame(frame);
}
} // namespace QAMQP
//////////////////////////////////////////////////////////////////////////
using namespace QAMQP;
Connection::Connection(Network *network, Client *client)
: QObject(client),
d_ptr(new ConnectionPrivate(this))
{
Q_D(Connection);
d->client = client;
d->network = network;
d->heartbeatTimer = new QTimer(this);
connect(d->heartbeatTimer, SIGNAL(timeout()), this, SLOT(_q_heartbeat()));
}
Connection::~Connection()
{
}
void Connection::start(const Frame::Method &frame)
{
qDebug(">> Start");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
quint8 version_major = 0;
quint8 version_minor = 0;
stream >> version_major >> version_minor;
Frame::TableField table;
Frame::deserialize(stream, table);
QString mechanisms = Frame::readField('S', stream).toString();
QString locales = Frame::readField('S', stream).toString();
qDebug(">> version_major: %d", version_major);
qDebug(">> version_minor: %d", version_minor);
Frame::print(table);
qDebug(">> mechanisms: %s", qPrintable(mechanisms));
qDebug(">> locales: %s", qPrintable(locales));
startOk();
}
void Connection::secure(const Frame::Method &frame)
{
Q_UNUSED(frame)
qDebug() << Q_FUNC_INFO << "called!";
}
void Connection::tune(const Frame::Method &frame)
{
Q_D(Connection);
qDebug(">> Tune");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 channel_max = 0,
heartbeat = 0;
qint32 frame_max = 0;
stream >> channel_max;
stream >> frame_max;
stream >> heartbeat;
qDebug(">> channel_max: %d", channel_max);
qDebug(">> frame_max: %d", frame_max);
qDebug(">> heartbeat: %d", heartbeat);
if (d->heartbeatTimer) {
d->heartbeatTimer->setInterval(heartbeat * 1000);
if (d->heartbeatTimer->interval())
d->heartbeatTimer->start();
else
d->heartbeatTimer->stop();
}
tuneOk();
open();
}
void Connection::openOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_D(Connection);
qDebug(">> OpenOK");
d->connected = true;
Q_EMIT connected();
}
void Connection::closeOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_D(Connection);
qDebug() << Q_FUNC_INFO << "received";
d->connected = false;
if (d->heartbeatTimer)
d->heartbeatTimer->stop();
Q_EMIT disconnected();
}
void Connection::close(const Frame::Method &frame)
{
Q_D(Connection);
qDebug(">> CLOSE");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code_ = 0, classId, methodId;
stream >> code_;
QString text(Frame::readField('s', stream).toString());
stream >> classId;
stream >> methodId;
qDebug(">> code: %d", code_);
qDebug(">> text: %s", qPrintable(text));
qDebug(">> class-id: %d", classId);
qDebug(">> method-id: %d", methodId);
d->connected = false;
d->network->error(QAbstractSocket::RemoteHostClosedError);
Q_EMIT disconnected();
}
void Connection::startOk()
{
Q_D(Connection);
Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miStartOk);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
Frame::TableField clientProperties;
clientProperties["version"] = QString(QAMQP_VERSION);
clientProperties["platform"] = QString("Qt %1").arg(qVersion());
clientProperties["product"] = QString("QAMQP");
clientProperties.unite(d->customProperty);
Frame::serialize(stream, clientProperties);
d->client->auth()->write(stream);
Frame::writeField('s', stream, "en_US");
frame.setArguments(arguments);
d->network->sendFrame(frame);
}
void Connection::secureOk()
{
qDebug() << Q_FUNC_INFO;
}
void Connection::tuneOk()
{
Q_D(Connection);
Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miTuneOk);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
stream << qint16(0); //channel_max
stream << qint32(FRAME_MAX); //frame_max
stream << qint16(d->heartbeatTimer->interval() / 1000); //heartbeat
frame.setArguments(arguments);
d->network->sendFrame(frame);
}
void Connection::open()
{
Q_D(Connection);
Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miOpen);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
Frame::writeField('s',stream, d->client->virtualHost());
stream << qint8(0);
stream << qint8(0);
frame.setArguments(arguments);
d->network->sendFrame(frame);
}
void Connection::close(int code, const QString &text, int classId, int methodId)
{
Q_D(Connection);
Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miClose);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
Frame::writeField('s',stream, d->client->virtualHost());
stream << qint16(code);
Frame::writeField('s', stream, text);
stream << qint16(classId);
stream << qint16(methodId);
frame.setArguments(arguments);
d->network->sendFrame(frame);
}
void Connection::closeOk()
{
Q_D(Connection);
Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miCloseOk);
d->connected = false;
d->network->sendFrame(frame);
}
void Connection::_q_method(const Frame::Method &frame)
{
Q_D(Connection);
Q_ASSERT(frame.methodClass() == Frame::fcConnection);
if (frame.methodClass() != Frame::fcConnection)
return;
qDebug() << "Connection:";
if (d->closed) {
if (frame.id() == ConnectionPrivate::miCloseOk)
closeOk(frame);
return;
}
switch (ConnectionPrivate::MethodId(frame.id())) {
case ConnectionPrivate::miStart:
start(frame);
break;
case ConnectionPrivate::miSecure:
secure(frame);
break;
case ConnectionPrivate::miTune:
tune(frame);
break;
case ConnectionPrivate::miOpenOk:
openOk(frame);
break;
case ConnectionPrivate::miClose:
close(frame);
break;
case ConnectionPrivate::miCloseOk:
closeOk(frame);
break;
default:
qWarning("Unknown method-id %d", frame.id());
}
}
bool Connection::isConnected() const
{
Q_D(const Connection);
return d->connected;
}
void Connection::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
Q_D(Connection);
// NOTE: these were hardcoded values, could be bad
int channel = 0;
bool global = true;
Frame::Method frame(Frame::fcBasic, 10);
frame.setChannel(channel);
QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly);
out << prefetchSize;
out << prefetchCount;
out << qint8(global ? 1 : 0);
frame.setArguments(arguments);
d->network->sendFrame(frame);
}
void Connection::addCustomProperty(const QString &name, const QString &value)
{
Q_D(Connection);
d->customProperty[name] = value;
}
QString Connection::customProperty(const QString &name) const
{
Q_D(const Connection);
if (d->customProperty.contains(name))
return d->customProperty.value(name).toString();
return QString();
}
#include "moc_amqp_connection_p.cpp"

View File

@ -1,67 +0,0 @@
#ifndef amqp_connection_p_h__
#define amqp_connection_p_h__
#include <QObject>
#include "amqp_frame.h"
namespace QAMQP
{
class Client;
class Network;
class ClientPrivate;
class ConnectionPrivate;
class QAMQP_EXPORT Connection : public QObject, public Frame::MethodHandler
{
Q_OBJECT
Q_PROPERTY(bool connected READ isConnected CONSTANT)
public:
virtual ~Connection();
void addCustomProperty(const QString &name, const QString &value);
QString customProperty(const QString &name) const;
bool isConnected() const;
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
// method handlers, FROM server
void start(const Frame::Method &frame);
void secure(const Frame::Method &frame);
void tune(const Frame::Method &frame);
void openOk(const Frame::Method &frame);
void closeOk(const Frame::Method &frame);
// method handlers, TO server
void startOk();
void secureOk();
void tuneOk();
void open();
// method handlers, BOTH ways
void close(int code, const QString &text, int classId = 0, int methodId = 0);
void close(const Frame::Method &frame);
void closeOk();
Q_SIGNALS:
void disconnected();
void connected();
private:
explicit Connection(Network *network, Client *parent);
Q_DISABLE_COPY(Connection)
Q_DECLARE_PRIVATE(Connection)
QScopedPointer<ConnectionPrivate> d_ptr;
Q_PRIVATE_SLOT(d_func(), void _q_heartbeat())
friend class ClientPrivate;
// should be moved to private
void openOk();
void _q_method(const Frame::Method &frame);
};
} // namespace QAMQP
#endif // amqp_connection_h__

View File

@ -1,301 +0,0 @@
#include "amqp_network_p.h"
#include <QDebug>
#include <QTimer>
#include <QtEndian>
namespace QAMQP {
class NetworkPrivate {
public:
NetworkPrivate(Network *qq);
void initSocket(bool ssl = false);
static int s_frameMethodMetaType;
QPointer<QTcpSocket> socket;
QByteArray buffer;
QString lastHost;
int lastPort;
bool autoReconnect;
int timeOut;
bool connect;
Frame::MethodHandler *connectionMethodHandler;
QHash<Network::Channel, QList<Frame::MethodHandler*> > methodHandlersByChannel;
QHash<Network::Channel, QList<Frame::ContentHandler*> > contentHandlerByChannel;
QHash<Network::Channel, QList<Frame::ContentBodyHandler*> > bodyHandlersByChannel;
Q_DECLARE_PUBLIC(Network)
Network * const q_ptr;
};
int NetworkPrivate::s_frameMethodMetaType = qRegisterMetaType<Frame::Method>("QAMQP::Frame::Method");
NetworkPrivate::NetworkPrivate(Network *qq)
: lastPort(0),
autoReconnect(false),
timeOut(1000),
connect(false),
q_ptr(qq)
{
buffer.reserve(Frame::HEADER_SIZE);
}
void NetworkPrivate::initSocket(bool ssl)
{
Q_Q(Network);
if (socket) {
socket->deleteLater();
socket = 0;
}
if (ssl) {
#ifndef QT_NO_SSL
socket = new QSslSocket(q);
QSslSocket *sslSocket = static_cast<QSslSocket*>(socket.data());
sslSocket->setProtocol(QSsl::AnyProtocol);
QObject::connect(socket, SIGNAL(sslErrors(const QList<QSslError> &)), q, SLOT(sslErrors()));
QObject::connect(socket, SIGNAL(connected()), q, SLOT(conectionReady()));
#else
qWarning("AMQP: You library has builded with QT_NO_SSL option.");
#endif
} else {
socket = new QTcpSocket(q);
QObject::connect(socket, SIGNAL(connected()), q, SLOT(conectionReady()));
}
if (socket) {
QObject::connect(socket, SIGNAL(disconnected()), q, SIGNAL(disconnected()));
QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(readyRead()));
QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
q, SLOT(error(QAbstractSocket::SocketError)));
}
}
} // namespace QAMQP
//////////////////////////////////////////////////////////////////////////
using namespace QAMQP;
Network::Network(QObject *parent)
: QObject(parent),
d_ptr(new NetworkPrivate(this))
{
Q_D(Network);
d->initSocket(false);
}
Network::~Network()
{
disconnect();
}
void Network::connectTo(const QString &host, quint16 port)
{
Q_D(Network);
if (!d->socket) {
qWarning("AMQP: Socket didn't create.");
return;
}
QString h(host);
int p(port);
d->connect = true;
if (host.isEmpty())
h = d->lastHost;
if (port == 0)
p = d->lastPort;
if (isSsl()) {
#ifndef QT_NO_SSL
static_cast<QSslSocket*>(d->socket.data())->connectToHostEncrypted(h, p);
#else
qWarning("AMQP: You library has builded with QT_NO_SSL option.");
#endif
} else {
d->socket->connectToHost(h, p);
}
d->lastHost = h;
d->lastPort = p;
}
void Network::disconnect()
{
Q_D(Network);
d->connect = false;
if (d->socket)
d->socket->close();
}
void Network::error(QAbstractSocket::SocketError socketError)
{
Q_D(Network);
if (d->timeOut == 0) {
d->timeOut = 1000;
} else {
if (d->timeOut < 120000)
d->timeOut *= 5;
}
switch (socketError) {
case QAbstractSocket::ConnectionRefusedError:
case QAbstractSocket::RemoteHostClosedError:
case QAbstractSocket::SocketTimeoutError:
case QAbstractSocket::NetworkError:
case QAbstractSocket::ProxyConnectionClosedError:
case QAbstractSocket::ProxyConnectionRefusedError:
case QAbstractSocket::ProxyConnectionTimeoutError:
default:
qWarning() << "AMQP: Socket Error: " << d->socket->errorString();
break;
}
if (d->autoReconnect && d->connect)
QTimer::singleShot(d->timeOut, this, SLOT(connectTo()));
}
void Network::readyRead()
{
Q_D(Network);
while (d->socket->bytesAvailable() >= Frame::HEADER_SIZE) {
char *headerData = d->buffer.data();
d->socket->peek(headerData, Frame::HEADER_SIZE);
const quint32 payloadSize = qFromBigEndian<quint32>(*(quint32*)&headerData[3]);
const qint64 readSize = Frame::HEADER_SIZE + payloadSize + Frame::FRAME_END_SIZE;
if (d->socket->bytesAvailable() >= readSize) {
d->buffer.resize(readSize);
d->socket->read(d->buffer.data(), readSize);
const char *bufferData = d->buffer.constData();
const quint8 type = *(quint8*)&bufferData[0];
const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize];
if (magic != Frame::FRAME_END)
qWarning() << "Wrong end frame";
QDataStream streamB(&d->buffer, QIODevice::ReadOnly);
switch(Frame::Type(type)) {
case Frame::ftMethod:
{
Frame::Method frame(streamB);
if (frame.methodClass() == Frame::fcConnection) {
d->connectionMethodHandler->_q_method(frame);
} else {
foreach (Frame::MethodHandler *methodHandler, d->methodHandlersByChannel[frame.channel()])
methodHandler->_q_method(frame);
}
}
break;
case Frame::ftHeader:
{
Frame::Content frame(streamB);
foreach (Frame::ContentHandler *methodHandler, d->contentHandlerByChannel[frame.channel()])
methodHandler->_q_content(frame);
}
break;
case Frame::ftBody:
{
Frame::ContentBody frame(streamB);
foreach (Frame::ContentBodyHandler *methodHandler, d->bodyHandlersByChannel[frame.channel()])
methodHandler->_q_body(frame);
}
break;
case Frame::ftHeartbeat:
qDebug("AMQP: Heartbeat");
break;
default:
qWarning() << "AMQP: Unknown frame type: " << type;
}
} else {
break;
}
}
}
void Network::sendFrame(const Frame::Base &frame)
{
Q_D(Network);
if (d->socket->state() != QAbstractSocket::ConnectedState) {
qDebug() << Q_FUNC_INFO << "socket not connected: " << d->socket->state();
return;
}
QDataStream stream(d->socket);
frame.toStream(stream);
}
bool Network::isSsl() const
{
Q_D(const Network);
if (d->socket)
return QString(d->socket->metaObject()->className()).compare("QSslSocket", Qt::CaseInsensitive) == 0;
return false;
}
void Network::setSsl(bool value)
{
Q_D(Network);
d->initSocket(value);
}
void Network::sslErrors()
{
#ifndef QT_NO_SSL
Q_D(Network);
static_cast<QSslSocket*>(d->socket.data())->ignoreSslErrors();
#endif
}
void Network::conectionReady()
{
Q_D(Network);
d->timeOut = 0;
char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1};
d->socket->write(header, 8);
Q_EMIT connected();
}
bool Network::autoReconnect() const
{
Q_D(const Network);
return d->autoReconnect;
}
void Network::setAutoReconnect(bool value)
{
Q_D(Network);
d->autoReconnect = value;
}
QAbstractSocket::SocketState Network::state() const
{
Q_D(const Network);
if (d->socket)
return d->socket->state();
return QAbstractSocket::UnconnectedState;
}
void Network::setMethodHandlerConnection(Frame::MethodHandler *connectionMethodHandler)
{
Q_D(Network);
d->connectionMethodHandler = connectionMethodHandler;
}
void Network::addMethodHandlerForChannel(Channel channel, Frame::MethodHandler *methodHandler)
{
Q_D(Network);
d->methodHandlersByChannel[channel].append(methodHandler);
}
void Network::addContentHandlerForChannel(Channel channel, Frame::ContentHandler *methodHandler)
{
Q_D(Network);
d->contentHandlerByChannel[channel].append(methodHandler);
}
void Network::addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler *methodHandler)
{
Q_D(Network);
d->bodyHandlersByChannel[channel].append(methodHandler);
}

View File

@ -1,65 +0,0 @@
#ifndef amqp_network_h__
#define amqp_network_h__
#include <QObject>
#include <QTcpSocket>
#ifndef QT_NO_SSL
# include <QSslSocket>
#endif
#include <QPointer>
#include <QBuffer>
#include "amqp_frame.h"
#include "amqp_global.h"
namespace QAMQP
{
class NetworkPrivate;
class QAMQP_EXPORT Network : public QObject
{
Q_OBJECT
public:
Network(QObject *parent = 0);
~Network();
void disconnect();
void sendFrame(const Frame::Base &frame);
bool isSsl() const;
void setSsl(bool value);
bool autoReconnect() const;
void setAutoReconnect(bool value);
QAbstractSocket::SocketState state() const;
typedef qint16 Channel;
void setMethodHandlerConnection(Frame::MethodHandler *pMethodHandlerConnection);
void addMethodHandlerForChannel(Channel channel, Frame::MethodHandler *pHandler);
void addContentHandlerForChannel(Channel channel, Frame::ContentHandler *pHandler);
void addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler *pHandler);
Q_SIGNALS:
void connected();
void disconnected();
public Q_SLOTS:
void connectTo(const QString &host = QString(), quint16 port = 0);
void error(QAbstractSocket::SocketError socketError);
private Q_SLOTS:
void readyRead();
void sslErrors();
void conectionReady();
private:
Q_DISABLE_COPY(Network)
Q_DECLARE_PRIVATE(Network)
QScopedPointer<NetworkPrivate> d_ptr;
};
} // namespace QAMQP
#endif // amqp_network_h__

View File

@ -13,9 +13,7 @@ win32:DESTDIR = $$OUT_PWD
PRIVATE_HEADERS += \ PRIVATE_HEADERS += \
amqp_channel_p.h \ amqp_channel_p.h \
amqp_client_p.h \ amqp_client_p.h \
amqp_connection_p.h \
amqp_exchange_p.h \ amqp_exchange_p.h \
amqp_network_p.h \
amqp_queue_p.h amqp_queue_p.h
INSTALL_HEADERS += \ INSTALL_HEADERS += \
@ -36,11 +34,9 @@ SOURCES += \
amqp_authenticator.cpp \ amqp_authenticator.cpp \
amqp_channel.cpp \ amqp_channel.cpp \
amqp_client.cpp \ amqp_client.cpp \
amqp_connection.cpp \
amqp_exchange.cpp \ amqp_exchange.cpp \
amqp_frame.cpp \ amqp_frame.cpp \
amqp_message.cpp \ amqp_message.cpp \
amqp_network.cpp \
amqp_queue.cpp amqp_queue.cpp
# install # install