diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 0075216..2d530ee 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -275,7 +275,10 @@ void ChannelPrivate::openOk(const Frame::Method &frame) void ChannelPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount) { - client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, number, false); + Q_UNUSED(prefetchSize) + Q_UNUSED(prefetchCount) + qDebug() << Q_FUNC_INFO << "temporarily disabled"; +// client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, number, false); } void ChannelPrivate::_q_disconnected() diff --git a/src/amqp_client.cpp b/src/amqp_client.cpp index 56bfa33..31d521e 100644 --- a/src/amqp_client.cpp +++ b/src/amqp_client.cpp @@ -81,7 +81,10 @@ void ClientPrivate::disconnect() } network_->disconnect(); - connection_->d_func()->connected = false; + + // NOTE: this should be handled by signals, no need for dptr + // access here. + // connection_->d_func()->connected = false; } ////////////////////////////////////////////////////////////////////////// diff --git a/src/amqp_client_p.h b/src/amqp_client_p.h index 4866c78..d2fa91b 100644 --- a/src/amqp_client_p.h +++ b/src/amqp_client_p.h @@ -4,7 +4,6 @@ #include #include "amqp_network.h" -#include "amqp_connection.h" #include "amqp_authenticator.h" namespace QAMQP @@ -12,6 +11,7 @@ namespace QAMQP class Queue; class Exchange; +class Connection; class ClientPrivate { public: diff --git a/src/amqp_connection.cpp b/src/amqp_connection.cpp index 2b9b468..6ea645b 100644 --- a/src/amqp_connection.cpp +++ b/src/amqp_connection.cpp @@ -1,4 +1,3 @@ -#include "amqp_connection.h" #include "amqp_connection_p.h" #include "amqp_client.h" #include "amqp_client_p.h" @@ -9,7 +8,36 @@ #include #include -using namespace QAMQP; +#define METHOD_ID_ENUM(name, id) name = id, name ## Ok + +namespace QAMQP { + +class ConnectionPrivate +{ +public: + enum MethodId { + METHOD_ID_ENUM(miStart, 10), + METHOD_ID_ENUM(miSecure, 20), + METHOD_ID_ENUM(miTune, 30), + METHOD_ID_ENUM(miOpen, 40), + METHOD_ID_ENUM(miClose, 50) + }; + + ConnectionPrivate(Connection *q); + + // private slots + void _q_heartbeat(); + + QPointer client; + QPointer network; + bool closed; + bool connected; + QPointer heartbeatTimer; + Frame::TableField customProperty; + + Q_DECLARE_PUBLIC(Connection) + Connection * const q_ptr; +}; ConnectionPrivate::ConnectionPrivate(Connection *q) : closed(false), @@ -18,64 +46,33 @@ ConnectionPrivate::ConnectionPrivate(Connection *q) { } -ConnectionPrivate::~ConnectionPrivate() +void ConnectionPrivate::_q_heartbeat() { -} - -void ConnectionPrivate::startOk() -{ - Frame::Method frame(Frame::fcConnection, miStartOk); - QByteArray arguments; - QDataStream stream(&arguments, QIODevice::WriteOnly); - - Frame::TableField clientProperties; - clientProperties["version"] = QString(QAMQP_VERSION); - clientProperties["platform"] = QString("Qt %1").arg(qVersion()); - clientProperties["product"] = QString("QAMQP"); - clientProperties.unite(customProperty); - Frame::serialize(stream, clientProperties); - - client->auth()->write(stream); - Frame::writeField('s', stream, "en_US"); - - frame.setArguments(arguments); + Frame::Heartbeat frame; network->sendFrame(frame); } -void ConnectionPrivate::secureOk() +} // namespace QAMQP + +////////////////////////////////////////////////////////////////////////// + +using namespace QAMQP; +Connection::Connection(Network *network, Client *client) + : QObject(client), + d_ptr(new ConnectionPrivate(this)) +{ + Q_D(Connection); + d->client = client; + d->network = network; + d->heartbeatTimer = new QTimer(this); + connect(d->heartbeatTimer, SIGNAL(timeout()), this, SLOT(_q_heartbeat())); +} + +Connection::~Connection() { } -void ConnectionPrivate::tuneOk() -{ - Frame::Method frame(Frame::fcConnection, miTuneOk); - QByteArray arguments; - QDataStream stream(&arguments, QIODevice::WriteOnly); - - stream << qint16(0); //channel_max - stream << qint32(FRAME_MAX); //frame_max - stream << qint16(heartbeatTimer->interval() / 1000); //heartbeat - - frame.setArguments(arguments); - network->sendFrame(frame); -} - -void ConnectionPrivate::open() -{ - Frame::Method frame(Frame::fcConnection, miOpen); - QByteArray arguments; - QDataStream stream(&arguments, QIODevice::WriteOnly); - - Frame::writeField('s',stream, client->virtualHost()); - - stream << qint8(0); - stream << qint8(0); - - frame.setArguments(arguments); - network->sendFrame(frame); -} - -void ConnectionPrivate::start(const Frame::Method &frame) +void Connection::start(const Frame::Method &frame) { qDebug(">> Start"); QByteArray data = frame.arguments(); @@ -102,13 +99,15 @@ void ConnectionPrivate::start(const Frame::Method &frame) startOk(); } -void ConnectionPrivate::secure(const Frame::Method &frame) +void Connection::secure(const Frame::Method &frame) { Q_UNUSED(frame) + qDebug() << Q_FUNC_INFO << "called!"; } -void ConnectionPrivate::tune(const Frame::Method &frame) +void Connection::tune(const Frame::Method &frame) { + Q_D(Connection); qDebug(">> Tune"); QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); @@ -125,32 +124,42 @@ void ConnectionPrivate::tune(const Frame::Method &frame) qDebug(">> frame_max: %d", frame_max); qDebug(">> heartbeat: %d", heartbeat); - if (heartbeatTimer) { - heartbeatTimer->setInterval(heartbeat * 1000); - if (heartbeatTimer->interval()) - heartbeatTimer->start(); + if (d->heartbeatTimer) { + d->heartbeatTimer->setInterval(heartbeat * 1000); + if (d->heartbeatTimer->interval()) + d->heartbeatTimer->start(); else - heartbeatTimer->stop(); + d->heartbeatTimer->stop(); } tuneOk(); open(); } -void ConnectionPrivate::openOk(const Frame::Method &frame) +void Connection::openOk(const Frame::Method &frame) { Q_UNUSED(frame) - Q_Q(Connection); - + Q_D(Connection); qDebug(">> OpenOK"); - connected = true; - q->openOk(); + d->connected = true; + Q_EMIT connected(); } -void ConnectionPrivate::close(const Frame::Method &frame) +void Connection::closeOk(const Frame::Method &frame) { - Q_Q(Connection); + Q_UNUSED(frame) + Q_D(Connection); + d->connected = false; + Q_EMIT disconnected(); + if (d->heartbeatTimer) + d->heartbeatTimer->stop(); + Q_EMIT disconnected(); +} + +void Connection::close(const Frame::Method &frame) +{ + Q_D(Connection); qDebug(">> CLOSE"); QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); @@ -164,18 +173,76 @@ void ConnectionPrivate::close(const Frame::Method &frame) qDebug(">> text: %s", qPrintable(text)); qDebug(">> class-id: %d", classId); qDebug(">> method-id: %d", methodId); - connected = false; - network->error(QAbstractSocket::RemoteHostClosedError); - Q_EMIT q->disconnected(); + d->connected = false; + d->network->error(QAbstractSocket::RemoteHostClosedError); + Q_EMIT disconnected(); } -void ConnectionPrivate::close(int code, const QString &text, int classId, int methodId) +void Connection::startOk() { - Frame::Method frame(Frame::fcConnection, miClose); + Q_D(Connection); + Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miStartOk); QByteArray arguments; QDataStream stream(&arguments, QIODevice::WriteOnly); - Frame::writeField('s',stream, client->virtualHost()); + Frame::TableField clientProperties; + clientProperties["version"] = QString(QAMQP_VERSION); + clientProperties["platform"] = QString("Qt %1").arg(qVersion()); + clientProperties["product"] = QString("QAMQP"); + clientProperties.unite(d->customProperty); + Frame::serialize(stream, clientProperties); + + d->client->auth()->write(stream); + Frame::writeField('s', stream, "en_US"); + + frame.setArguments(arguments); + d->network->sendFrame(frame); +} + +void Connection::secureOk() +{ + qDebug() << Q_FUNC_INFO; +} + +void Connection::tuneOk() +{ + Q_D(Connection); + Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miTuneOk); + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + stream << qint16(0); //channel_max + stream << qint32(FRAME_MAX); //frame_max + stream << qint16(d->heartbeatTimer->interval() / 1000); //heartbeat + + frame.setArguments(arguments); + d->network->sendFrame(frame); +} + +void Connection::open() +{ + Q_D(Connection); + Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miOpen); + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + Frame::writeField('s',stream, d->client->virtualHost()); + + stream << qint8(0); + stream << qint8(0); + + frame.setArguments(arguments); + d->network->sendFrame(frame); +} + +void Connection::close(int code, const QString &text, int classId, int methodId) +{ + Q_D(Connection); + Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miClose); + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + Frame::writeField('s',stream, d->client->virtualHost()); stream << qint16(code); Frame::writeField('s', stream, text); @@ -183,153 +250,54 @@ void ConnectionPrivate::close(int code, const QString &text, int classId, int me stream << qint16(methodId); frame.setArguments(arguments); - network->sendFrame(frame); -} - -void ConnectionPrivate::closeOk() -{ - Frame::Method frame(Frame::fcConnection, miCloseOk); - connected = false; - network->sendFrame(frame); -} - -void ConnectionPrivate::closeOk(const Frame::Method &frame) -{ - Q_UNUSED(frame) - Q_Q(Connection); - - connected = false; - Q_EMIT q->disconnected(); - if (heartbeatTimer) - heartbeatTimer->stop(); -} - -void ConnectionPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global) -{ - Frame::Method frame(Frame::fcBasic, 10); - frame.setChannel(channel); - QByteArray arguments; - QDataStream out(&arguments, QIODevice::WriteOnly); - - out << prefetchSize; - out << prefetchCount; - out << qint8(global ? 1 : 0); - - frame.setArguments(arguments); - network->sendFrame(frame); -} - -bool ConnectionPrivate::_q_method(const Frame::Method &frame) -{ - Q_ASSERT(frame.methodClass() == Frame::fcConnection); - if (frame.methodClass() != Frame::fcConnection) - return true; - - qDebug() << "Connection:"; - - if (closed) { - if (frame.id() == miCloseOk) - closeOk(frame); - - return true; - } - - switch (MethodId(frame.id())) { - case miStart: - start(frame); - break; - case miSecure: - secure(frame); - break; - case miTune: - tune(frame); - break; - case miOpenOk: - openOk(frame); - break; - case miClose: - close(frame); - break; - case miCloseOk: - closeOk(frame); - break; - default: - qWarning("Unknown method-id %d", frame.id()); - return false; - } - - return true; -} - -void ConnectionPrivate::_q_heartbeat() -{ - Frame::Heartbeat frame; - network->sendFrame(frame); -} - -////////////////////////////////////////////////////////////////////////// - -Connection::Connection(Network *network, Client *client) - : QObject(client), - d_ptr(new ConnectionPrivate(this)) -{ - Q_D(Connection); - d->client = client; - d->network = network; - d->heartbeatTimer = new QTimer(this); - connect(d->heartbeatTimer, SIGNAL(timeout()), this, SLOT(_q_heartbeat())); -} - -Connection::~Connection() -{ -} - -void Connection::startOk() -{ - Q_D(Connection); - d->startOk(); -} - -void Connection::secureOk() -{ - Q_D(Connection); - d->secureOk(); -} - -void Connection::tuneOk() -{ - Q_D(Connection); - d->tuneOk(); -} - -void Connection::open() -{ - Q_D(Connection); - d->open(); -} - -void Connection::close(int code, const QString &text, int classId , int methodId) -{ - Q_D(Connection); - d->close(code, text, classId, methodId); + d->network->sendFrame(frame); } void Connection::closeOk() { Q_D(Connection); - d->closeOk(); - Q_EMIT disconnected(); -} - -void Connection::openOk() -{ - Q_EMIT connected(); + Frame::Method frame(Frame::fcConnection, ConnectionPrivate::miCloseOk); + d->connected = false; + d->network->sendFrame(frame); } void Connection::_q_method(const Frame::Method &frame) { Q_D(Connection); - d->_q_method(frame); + Q_ASSERT(frame.methodClass() == Frame::fcConnection); + if (frame.methodClass() != Frame::fcConnection) + return; + + qDebug() << "Connection:"; + + if (d->closed) { + if (frame.id() == ConnectionPrivate::miCloseOk) + closeOk(frame); + return; + } + + switch (ConnectionPrivate::MethodId(frame.id())) { + case ConnectionPrivate::miStart: + start(frame); + break; + case ConnectionPrivate::miSecure: + secure(frame); + break; + case ConnectionPrivate::miTune: + tune(frame); + break; + case ConnectionPrivate::miOpenOk: + openOk(frame); + break; + case ConnectionPrivate::miClose: + close(frame); + break; + case ConnectionPrivate::miCloseOk: + closeOk(frame); + break; + default: + qWarning("Unknown method-id %d", frame.id()); + } } bool Connection::isConnected() const @@ -341,7 +309,22 @@ bool Connection::isConnected() const void Connection::setQOS(qint32 prefetchSize, quint16 prefetchCount) { Q_D(Connection); - d->setQOS(prefetchSize, prefetchCount, 0, true); + + // NOTE: these were hardcoded values, could be bad + int channel = 0; + bool global = true; + + Frame::Method frame(Frame::fcBasic, 10); + frame.setChannel(channel); + QByteArray arguments; + QDataStream out(&arguments, QIODevice::WriteOnly); + + out << prefetchSize; + out << prefetchCount; + out << qint8(global ? 1 : 0); + + frame.setArguments(arguments); + d->network->sendFrame(frame); } void Connection::addCustomProperty(const QString &name, const QString &value) @@ -358,4 +341,4 @@ QString Connection::customProperty(const QString &name) const return QString(); } -#include "moc_amqp_connection.cpp" +#include "moc_amqp_connection_p.cpp" diff --git a/src/amqp_connection.h b/src/amqp_connection.h deleted file mode 100644 index 8508513..0000000 --- a/src/amqp_connection.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef amqp_connection_h__ -#define amqp_connection_h__ - -#include -#include "amqp_frame.h" - -namespace QAMQP -{ - -class Client; -class Network; -class ClientPrivate; -class ChannelPrivate; -class ConnectionPrivate; -class QAMQP_EXPORT Connection : public QObject, public Frame::MethodHandler -{ - Q_OBJECT - Q_PROPERTY(bool connected READ isConnected CONSTANT) - -public: - virtual ~Connection(); - - void addCustomProperty(const QString &name, const QString &value); - QString customProperty(const QString &name) const; - - bool isConnected() const; - -public Q_SLOTS: - void startOk(); - void secureOk(); - void tuneOk(); - void open(); - void close(int code, const QString &text, int classId = 0, int methodId = 0); - void closeOk(); - - void setQOS(qint32 prefetchSize, quint16 prefetchCount); - -Q_SIGNALS: - void disconnected(); - void connected(); - -private: - explicit Connection(Network *network, Client *parent); - - Q_DISABLE_COPY(Connection) - Q_DECLARE_PRIVATE(Connection) - QScopedPointer d_ptr; - - Q_PRIVATE_SLOT(d_func(), void _q_heartbeat()) - friend class ClientPrivate; - friend class ChannelPrivate; - - // should be moved to private - void openOk(); - void _q_method(const Frame::Method &frame); -}; - -} // namespace QAMQP - -#endif // amqp_connection_h__ diff --git a/src/amqp_connection_p.h b/src/amqp_connection_p.h index e63cd69..145bbcf 100644 --- a/src/amqp_connection_p.h +++ b/src/amqp_connection_p.h @@ -1,62 +1,67 @@ #ifndef amqp_connection_p_h__ #define amqp_connection_p_h__ -#include +#include +#include "amqp_frame.h" -#define METHOD_ID_ENUM(name, id) name = id, name ## Ok - -class QTimer; namespace QAMQP { class Client; +class Network; class ClientPrivate; -class Connection; -class ConnectionPrivate +class ConnectionPrivate; +class QAMQP_EXPORT Connection : public QObject, public Frame::MethodHandler { + Q_OBJECT + Q_PROPERTY(bool connected READ isConnected CONSTANT) + public: - enum MethodId { - METHOD_ID_ENUM(miStart, 10), - METHOD_ID_ENUM(miSecure, 20), - METHOD_ID_ENUM(miTune, 30), - METHOD_ID_ENUM(miOpen, 40), - METHOD_ID_ENUM(miClose, 50) - }; + virtual ~Connection(); - ConnectionPrivate(Connection *q); - ~ConnectionPrivate(); + void addCustomProperty(const QString &name, const QString &value); + QString customProperty(const QString &name) const; - void init(Client *parent); - void startOk(); - void secureOk(); - void tuneOk(); - void open(); - void close(int code, const QString &text, int classId = 0, int methodId = 0); - void closeOk(); + bool isConnected() const; + void setQOS(qint32 prefetchSize, quint16 prefetchCount); + // method handlers, FROM server void start(const Frame::Method &frame); void secure(const Frame::Method &frame); void tune(const Frame::Method &frame); void openOk(const Frame::Method &frame); - void close(const Frame::Method &frame); void closeOk(const Frame::Method &frame); - bool _q_method(const Frame::Method &frame); - void _q_heartbeat(); + // method handlers, TO server + void startOk(); + void secureOk(); + void tuneOk(); + void open(); - void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global); + // method handlers, BOTH ways + void close(int code, const QString &text, int classId = 0, int methodId = 0); + void close(const Frame::Method &frame); + void closeOk(); - QPointer client; - QPointer network; - bool closed; - bool connected; - QPointer heartbeatTimer; - Frame::TableField customProperty; +Q_SIGNALS: + void disconnected(); + void connected(); - Q_DECLARE_PUBLIC(Connection) - Connection * const q_ptr; +private: + explicit Connection(Network *network, Client *parent); + + Q_DISABLE_COPY(Connection) + Q_DECLARE_PRIVATE(Connection) + QScopedPointer d_ptr; + + Q_PRIVATE_SLOT(d_func(), void _q_heartbeat()) + friend class ClientPrivate; + + // should be moved to private + void openOk(); + void _q_method(const Frame::Method &frame); }; } // namespace QAMQP -#endif // amqp_connection_p_h__ +#endif // amqp_connection_h__ diff --git a/src/amqp_network.cpp b/src/amqp_network.cpp index b10a3b5..7da37f3 100644 --- a/src/amqp_network.cpp +++ b/src/amqp_network.cpp @@ -67,7 +67,7 @@ void Network::error(QAbstractSocket::SocketError socketError) timeOut_ *= 5; } - switch(socketError) { + switch (socketError) { case QAbstractSocket::ConnectionRefusedError: case QAbstractSocket::RemoteHostClosedError: case QAbstractSocket::SocketTimeoutError: @@ -91,14 +91,14 @@ void Network::readyRead() char *headerData = buffer_.data(); socket_->peek(headerData, Frame::HEADER_SIZE); const quint32 payloadSize = qFromBigEndian(*(quint32*)&headerData[3]); - const qint64 readSize = Frame::HEADER_SIZE+payloadSize+Frame::FRAME_END_SIZE; + const qint64 readSize = Frame::HEADER_SIZE+payloadSize + Frame::FRAME_END_SIZE; if (socket_->bytesAvailable() >= readSize) { buffer_.resize(readSize); socket_->read(buffer_.data(), readSize); const char *bufferData = buffer_.constData(); const quint8 type = *(quint8*)&bufferData[0]; - const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE+payloadSize]; + const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize]; if (magic != Frame::FRAME_END) qWarning() << "Wrong end frame"; @@ -203,11 +203,10 @@ void Network::sslErrors() void Network::conectionReady() { - Q_EMIT connected(); timeOut_ = 0; - - char header_[8] = {'A', 'M', 'Q', 'P', 0,0,9,1}; - socket_->write(header_, 8); + char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1}; + socket_->write(header, 8); + Q_EMIT connected(); } bool Network::autoReconnect() const diff --git a/src/src.pro b/src/src.pro index c77b9e8..76cbe29 100644 --- a/src/src.pro +++ b/src/src.pro @@ -21,7 +21,6 @@ INSTALL_HEADERS += \ amqp_authenticator.h \ amqp_channel.h \ amqp_client.h \ - amqp_connection.h \ amqp_exchange.h \ amqp_frame.h \ amqp_global.h \