diff --git a/src/amqp_client.cpp b/src/amqp_client.cpp index 49efbb8..62e3650 100644 --- a/src/amqp_client.cpp +++ b/src/amqp_client.cpp @@ -16,15 +16,15 @@ using namespace QAMQP; ClientPrivate::ClientPrivate(Client *q) - : port(AMQPPORT), - host(QString::fromLatin1(AMQPHOST)), - virtualHost(QString::fromLatin1(AMQPVHOST)), + : port(AMQP_PORT), + host(QString::fromLatin1(AMQP_HOST)), + virtualHost(QString::fromLatin1(AMQP_VHOST)), socket(0), closed(false), connected(false), channelMax(0), heartbeatDelay(0), - frameMax(0), + frameMax(AMQP_FRAME_MAX), error(Client::NoError), q_ptr(q) { @@ -42,7 +42,7 @@ void ClientPrivate::init(const QUrl &connectionString) QObject::connect(heartbeatTimer, SIGNAL(timeout()), q, SLOT(_q_heartbeat())); authenticator = QSharedPointer( - new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD))); + new AMQPlainAuthenticator(QString::fromLatin1(AMQP_LOGIN), QString::fromLatin1(AMQP_PSWD))); if (connectionString.isValid()) { parseConnectionString(connectionString); @@ -63,15 +63,15 @@ void ClientPrivate::initSocket() void ClientPrivate::parseConnectionString(const QUrl &connectionString) { Q_Q(Client); - if (connectionString.scheme() != AMQPSCHEME && - connectionString.scheme() != AMQPSSCHEME) { + if (connectionString.scheme() != AMQP_SCHEME && + connectionString.scheme() != AMQP_SSCHEME) { qAmqpDebug() << Q_FUNC_INFO << "invalid scheme: " << connectionString.scheme(); return; } q->setPassword(connectionString.password()); q->setUsername(connectionString.userName()); - q->setPort(connectionString.port(AMQPPORT)); + q->setPort(connectionString.port(AMQP_PORT)); q->setHost(connectionString.host()); q->setVirtualHost(connectionString.path()); } @@ -156,7 +156,8 @@ void ClientPrivate::_q_readyRead() const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize]; if (magic != Frame::FRAME_END) { qAmqpDebug() << Q_FUNC_INFO << "FATAL: wrong end of frame"; - _q_disconnect(); + buffer.clear(); + socket->close(); return; } @@ -165,6 +166,11 @@ void ClientPrivate::_q_readyRead() case Frame::ftMethod: { Frame::Method frame(streamB); + if (frame.size() > frameMax) { + close(Client::FrameError, "frame size too large"); + return; + } + if (frame.methodClass() == Frame::fcConnection) { _q_method(frame); } else { @@ -176,6 +182,11 @@ void ClientPrivate::_q_readyRead() case Frame::ftHeader: { Frame::Content frame(streamB); + if (frame.size() > frameMax) { + close(Client::FrameError, "frame size too large"); + return; + } + foreach (Frame::ContentHandler *methodHandler, contentHandlerByChannel[frame.channel()]) methodHandler->_q_content(frame); } @@ -183,6 +194,11 @@ void ClientPrivate::_q_readyRead() case Frame::ftBody: { Frame::ContentBody frame(streamB); + if (frame.size() > frameMax) { + close(Client::FrameError, "frame size too large"); + return; + } + foreach (Frame::ContentBodyHandler *methodHandler, bodyHandlersByChannel[frame.channel()]) methodHandler->_q_body(frame); } @@ -295,9 +311,17 @@ void ClientPrivate::tune(const Frame::Method &frame) QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); - stream >> channelMax; - stream >> frameMax; - stream >> heartbeatDelay; + qint16 channel_max = 0, + heartbeat_delay = 0; + qint32 frame_max = 0; + + stream >> channel_max; + stream >> frame_max; + stream >> heartbeat_delay; + + channelMax = qMax(channel_max, channelMax); + heartbeatDelay = qMax(heartbeat_delay, heartbeatDelay); + frameMax = qMax(frame_max, frameMax); qAmqpDebug(">> channel_max: %d", channelMax); qAmqpDebug(">> frame_max: %d", frameMax); @@ -395,7 +419,7 @@ void ClientPrivate::tuneOk() stream << qint16(channelMax); stream << qint32(frameMax); - stream << qint16(heartbeatTimer->interval() / 1000); + stream << qint16(heartbeatDelay / 1000); frame.setArguments(arguments); sendFrame(frame); @@ -619,6 +643,58 @@ void Client::setAutoReconnect(bool value) d->autoReconnect = value; } +qint16 Client::channelMax() const +{ + Q_D(const Client); + return d->channelMax; +} + +void Client::setChannelMax(qint16 channelMax) +{ + Q_D(Client); + if (d->connected) { + qAmqpDebug() << Q_FUNC_INFO << "can't modify value while connected"; + return; + } + + d->channelMax = channelMax; +} + +qint32 Client::frameMax() const +{ + Q_D(const Client); + return d->frameMax; +} + +void Client::setFrameMax(qint32 frameMax) +{ + Q_D(Client); + if (d->connected) { + qAmqpDebug() << Q_FUNC_INFO << "can't modify value while connected"; + return; + } + + d->frameMax = qMax(frameMax, AMQP_FRAME_MIN_SIZE); +} + +qint16 Client::heartbeatDelay() const +{ + Q_D(const Client); + return d->heartbeatDelay; +} + +void Client::setHeartbeatDelay(qint16 delay) +{ + Q_D(Client); + if (d->connected) { + qAmqpDebug() << Q_FUNC_INFO << "can't modify value while connected"; + return; + } + + d->heartbeatDelay = delay; +} + + void Client::addCustomProperty(const QString &name, const QString &value) { Q_D(Client); diff --git a/src/amqp_client.h b/src/amqp_client.h index 5df7ae2..4a415f1 100644 --- a/src/amqp_client.h +++ b/src/amqp_client.h @@ -28,6 +28,9 @@ class QAMQP_EXPORT Client : public QObject Q_PROPERTY(QString user READ username WRITE setUsername) Q_PROPERTY(QString password READ password WRITE setPassword) Q_PROPERTY(bool autoReconnect READ autoReconnect WRITE setAutoReconnect) + Q_PROPERTY(qint16 channelMax READ channelMax WRITE setChannelMax) + Q_PROPERTY(qint32 frameMax READ frameMax WRITE setFrameMax) + Q_PROPERTY(qint16 heartbeatDelay READ heartbeatDelay() WRITE setHeartbeatDelay) public: Client(QObject *parent = 0); @@ -58,6 +61,15 @@ public: bool isConnected() const; + qint16 channelMax() const; + void setChannelMax(qint16 channelMax); + + qint32 frameMax() const; + void setFrameMax(qint32 frameMax); + + qint16 heartbeatDelay() const; + void setHeartbeatDelay(qint16 delay); + void addCustomProperty(const QString &name, const QString &value); QString customProperty(const QString &name) const; @@ -87,7 +99,7 @@ public: // methods void connectToHost(const QString &connectionString = QString()); - void connectToHost(const QHostAddress &address, quint16 port = AMQPPORT); + void connectToHost(const QHostAddress &address, quint16 port = AMQP_PORT); void disconnectFromHost(); Q_SIGNALS: diff --git a/src/amqp_exchange.cpp b/src/amqp_exchange.cpp index d73bc64..8722b31 100644 --- a/src/amqp_exchange.cpp +++ b/src/amqp_exchange.cpp @@ -2,6 +2,7 @@ #include "amqp_exchange_p.h" #include "amqp_queue.h" #include "amqp_global.h" +#include "amqp_client.h" #include #include @@ -169,13 +170,13 @@ void Exchange::remove(bool ifUnused, bool noWait) QByteArray arguments; QDataStream stream(&arguments, QIODevice::WriteOnly); - stream << qint16(0); //reserver 1 + stream << qint16(0); //reserved 1 Frame::writeField('s', stream, d->name); qint8 flag = 0; flag |= (ifUnused ? 0x1 : 0); flag |= (noWait ? 0x2 : 0); - stream << flag; //reserver 1 + stream << flag; //reserved 1 frame.setArguments(arguments); d->sendFrame(frame); @@ -227,9 +228,9 @@ void Exchange::publish(const QString &key, const QByteArray &message, d->sendFrame(content); int fullSize = message.size(); - for (int sent = 0; sent < fullSize; sent += (FRAME_MAX - 7)) { + for (int sent = 0; sent < fullSize; sent += (d->client->frameMax() - 7)) { Frame::ContentBody body; - QByteArray partition = message.mid(sent, (FRAME_MAX - 7)); + QByteArray partition = message.mid(sent, (d->client->frameMax() - 7)); body.setChannel(d->channelNumber); body.setBody(partition); d->sendFrame(body); diff --git a/tests/auto/qamqpclient/tst_qamqpclient.cpp b/tests/auto/qamqpclient/tst_qamqpclient.cpp index c5e6404..2cae962 100644 --- a/tests/auto/qamqpclient/tst_qamqpclient.cpp +++ b/tests/auto/qamqpclient/tst_qamqpclient.cpp @@ -14,6 +14,8 @@ private Q_SLOTS: void connectDisconnect(); void invalidAuthenticationMechanism(); + void tune(); + private: void autoReconnect(); @@ -67,5 +69,26 @@ void tst_QAMQPClient::autoReconnect() QVERIFY(waitForSignal(&client, SIGNAL(connected()), 2)); } +void tst_QAMQPClient::tune() +{ + // NOTE: this is totally incomplete, but the framework is here to + // test it. currently, only channel_max matters since the default + // from rabbit is 0. + + Client client; + client.setChannelMax(15); + client.setFrameMax(1000); + client.setHeartbeatDelay(600); + + client.connectToHost(); + QVERIFY(waitForSignal(&client, SIGNAL(connected()))); + QCOMPARE((int)client.channelMax(), 15); + QCOMPARE((int)client.heartbeatDelay(), 600); + QCOMPARE((int)client.frameMax(), 131072); + + client.disconnectFromHost(); + QVERIFY(waitForSignal(&client, SIGNAL(disconnected()))); +} + QTEST_MAIN(tst_QAMQPClient) #include "tst_qamqpclient.moc"