provide access to tuning variables, add checks for frameMax per spec in a number of areas,

add an incomplete autotest for using connection tuning parameters
This commit is contained in:
Matt Broadstone 2014-06-09 19:27:24 -04:00
parent 223af56683
commit 1510e1f6c6
4 changed files with 130 additions and 18 deletions

View File

@ -16,15 +16,15 @@
using namespace QAMQP;
ClientPrivate::ClientPrivate(Client *q)
: port(AMQPPORT),
host(QString::fromLatin1(AMQPHOST)),
virtualHost(QString::fromLatin1(AMQPVHOST)),
: port(AMQP_PORT),
host(QString::fromLatin1(AMQP_HOST)),
virtualHost(QString::fromLatin1(AMQP_VHOST)),
socket(0),
closed(false),
connected(false),
channelMax(0),
heartbeatDelay(0),
frameMax(0),
frameMax(AMQP_FRAME_MAX),
error(Client::NoError),
q_ptr(q)
{
@ -42,7 +42,7 @@ void ClientPrivate::init(const QUrl &connectionString)
QObject::connect(heartbeatTimer, SIGNAL(timeout()), q, SLOT(_q_heartbeat()));
authenticator = QSharedPointer<Authenticator>(
new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD)));
new AMQPlainAuthenticator(QString::fromLatin1(AMQP_LOGIN), QString::fromLatin1(AMQP_PSWD)));
if (connectionString.isValid()) {
parseConnectionString(connectionString);
@ -63,15 +63,15 @@ void ClientPrivate::initSocket()
void ClientPrivate::parseConnectionString(const QUrl &connectionString)
{
Q_Q(Client);
if (connectionString.scheme() != AMQPSCHEME &&
connectionString.scheme() != AMQPSSCHEME) {
if (connectionString.scheme() != AMQP_SCHEME &&
connectionString.scheme() != AMQP_SSCHEME) {
qAmqpDebug() << Q_FUNC_INFO << "invalid scheme: " << connectionString.scheme();
return;
}
q->setPassword(connectionString.password());
q->setUsername(connectionString.userName());
q->setPort(connectionString.port(AMQPPORT));
q->setPort(connectionString.port(AMQP_PORT));
q->setHost(connectionString.host());
q->setVirtualHost(connectionString.path());
}
@ -156,7 +156,8 @@ void ClientPrivate::_q_readyRead()
const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize];
if (magic != Frame::FRAME_END) {
qAmqpDebug() << Q_FUNC_INFO << "FATAL: wrong end of frame";
_q_disconnect();
buffer.clear();
socket->close();
return;
}
@ -165,6 +166,11 @@ void ClientPrivate::_q_readyRead()
case Frame::ftMethod:
{
Frame::Method frame(streamB);
if (frame.size() > frameMax) {
close(Client::FrameError, "frame size too large");
return;
}
if (frame.methodClass() == Frame::fcConnection) {
_q_method(frame);
} else {
@ -176,6 +182,11 @@ void ClientPrivate::_q_readyRead()
case Frame::ftHeader:
{
Frame::Content frame(streamB);
if (frame.size() > frameMax) {
close(Client::FrameError, "frame size too large");
return;
}
foreach (Frame::ContentHandler *methodHandler, contentHandlerByChannel[frame.channel()])
methodHandler->_q_content(frame);
}
@ -183,6 +194,11 @@ void ClientPrivate::_q_readyRead()
case Frame::ftBody:
{
Frame::ContentBody frame(streamB);
if (frame.size() > frameMax) {
close(Client::FrameError, "frame size too large");
return;
}
foreach (Frame::ContentBodyHandler *methodHandler, bodyHandlersByChannel[frame.channel()])
methodHandler->_q_body(frame);
}
@ -295,9 +311,17 @@ void ClientPrivate::tune(const Frame::Method &frame)
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
stream >> channelMax;
stream >> frameMax;
stream >> heartbeatDelay;
qint16 channel_max = 0,
heartbeat_delay = 0;
qint32 frame_max = 0;
stream >> channel_max;
stream >> frame_max;
stream >> heartbeat_delay;
channelMax = qMax(channel_max, channelMax);
heartbeatDelay = qMax(heartbeat_delay, heartbeatDelay);
frameMax = qMax(frame_max, frameMax);
qAmqpDebug(">> channel_max: %d", channelMax);
qAmqpDebug(">> frame_max: %d", frameMax);
@ -395,7 +419,7 @@ void ClientPrivate::tuneOk()
stream << qint16(channelMax);
stream << qint32(frameMax);
stream << qint16(heartbeatTimer->interval() / 1000);
stream << qint16(heartbeatDelay / 1000);
frame.setArguments(arguments);
sendFrame(frame);
@ -619,6 +643,58 @@ void Client::setAutoReconnect(bool value)
d->autoReconnect = value;
}
qint16 Client::channelMax() const
{
Q_D(const Client);
return d->channelMax;
}
void Client::setChannelMax(qint16 channelMax)
{
Q_D(Client);
if (d->connected) {
qAmqpDebug() << Q_FUNC_INFO << "can't modify value while connected";
return;
}
d->channelMax = channelMax;
}
qint32 Client::frameMax() const
{
Q_D(const Client);
return d->frameMax;
}
void Client::setFrameMax(qint32 frameMax)
{
Q_D(Client);
if (d->connected) {
qAmqpDebug() << Q_FUNC_INFO << "can't modify value while connected";
return;
}
d->frameMax = qMax(frameMax, AMQP_FRAME_MIN_SIZE);
}
qint16 Client::heartbeatDelay() const
{
Q_D(const Client);
return d->heartbeatDelay;
}
void Client::setHeartbeatDelay(qint16 delay)
{
Q_D(Client);
if (d->connected) {
qAmqpDebug() << Q_FUNC_INFO << "can't modify value while connected";
return;
}
d->heartbeatDelay = delay;
}
void Client::addCustomProperty(const QString &name, const QString &value)
{
Q_D(Client);

View File

@ -28,6 +28,9 @@ class QAMQP_EXPORT Client : public QObject
Q_PROPERTY(QString user READ username WRITE setUsername)
Q_PROPERTY(QString password READ password WRITE setPassword)
Q_PROPERTY(bool autoReconnect READ autoReconnect WRITE setAutoReconnect)
Q_PROPERTY(qint16 channelMax READ channelMax WRITE setChannelMax)
Q_PROPERTY(qint32 frameMax READ frameMax WRITE setFrameMax)
Q_PROPERTY(qint16 heartbeatDelay READ heartbeatDelay() WRITE setHeartbeatDelay)
public:
Client(QObject *parent = 0);
@ -58,6 +61,15 @@ public:
bool isConnected() const;
qint16 channelMax() const;
void setChannelMax(qint16 channelMax);
qint32 frameMax() const;
void setFrameMax(qint32 frameMax);
qint16 heartbeatDelay() const;
void setHeartbeatDelay(qint16 delay);
void addCustomProperty(const QString &name, const QString &value);
QString customProperty(const QString &name) const;
@ -87,7 +99,7 @@ public:
// methods
void connectToHost(const QString &connectionString = QString());
void connectToHost(const QHostAddress &address, quint16 port = AMQPPORT);
void connectToHost(const QHostAddress &address, quint16 port = AMQP_PORT);
void disconnectFromHost();
Q_SIGNALS:

View File

@ -2,6 +2,7 @@
#include "amqp_exchange_p.h"
#include "amqp_queue.h"
#include "amqp_global.h"
#include "amqp_client.h"
#include <QDataStream>
#include <QDebug>
@ -169,13 +170,13 @@ void Exchange::remove(bool ifUnused, bool noWait)
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
stream << qint16(0); //reserver 1
stream << qint16(0); //reserved 1
Frame::writeField('s', stream, d->name);
qint8 flag = 0;
flag |= (ifUnused ? 0x1 : 0);
flag |= (noWait ? 0x2 : 0);
stream << flag; //reserver 1
stream << flag; //reserved 1
frame.setArguments(arguments);
d->sendFrame(frame);
@ -227,9 +228,9 @@ void Exchange::publish(const QString &key, const QByteArray &message,
d->sendFrame(content);
int fullSize = message.size();
for (int sent = 0; sent < fullSize; sent += (FRAME_MAX - 7)) {
for (int sent = 0; sent < fullSize; sent += (d->client->frameMax() - 7)) {
Frame::ContentBody body;
QByteArray partition = message.mid(sent, (FRAME_MAX - 7));
QByteArray partition = message.mid(sent, (d->client->frameMax() - 7));
body.setChannel(d->channelNumber);
body.setBody(partition);
d->sendFrame(body);

View File

@ -14,6 +14,8 @@ private Q_SLOTS:
void connectDisconnect();
void invalidAuthenticationMechanism();
void tune();
private:
void autoReconnect();
@ -67,5 +69,26 @@ void tst_QAMQPClient::autoReconnect()
QVERIFY(waitForSignal(&client, SIGNAL(connected()), 2));
}
void tst_QAMQPClient::tune()
{
// NOTE: this is totally incomplete, but the framework is here to
// test it. currently, only channel_max matters since the default
// from rabbit is 0.
Client client;
client.setChannelMax(15);
client.setFrameMax(1000);
client.setHeartbeatDelay(600);
client.connectToHost();
QVERIFY(waitForSignal(&client, SIGNAL(connected())));
QCOMPARE((int)client.channelMax(), 15);
QCOMPARE((int)client.heartbeatDelay(), 600);
QCOMPARE((int)client.frameMax(), 131072);
client.disconnectFromHost();
QVERIFY(waitForSignal(&client, SIGNAL(disconnected())));
}
QTEST_MAIN(tst_QAMQPClient)
#include "tst_qamqpclient.moc"