- added auto tests for remove

- remove now takes flags rather than magical booleans
- fixed a bug delete -> deleteOk in QueuePrivate so we can listen for sync queue destruction
- added error signals to Client(Connection), and Channel
- removed automatic calls to remove a Queue when the channel is closed, or Queue is deleted
  this behavior is already handled by the AutoDelete declare option
This commit is contained in:
Matt Broadstone 2014-06-06 12:10:51 -04:00
parent 75ebbec309
commit 9b45f2ac58
10 changed files with 200 additions and 47 deletions

View File

@ -13,6 +13,7 @@ ChannelPrivate::ChannelPrivate(Channel *q)
: channelNumber(0),
opened(false),
needOpen(true),
error(Channel::NoError),
q_ptr(q)
{
}
@ -118,15 +119,18 @@ void ChannelPrivate::flow()
void ChannelPrivate::flow(const Frame::Method &frame)
{
Q_UNUSED(frame);
qDebug() << Q_FUNC_INFO;
}
void ChannelPrivate::flowOk()
{
qDebug() << Q_FUNC_INFO;
}
void ChannelPrivate::flowOk(const Frame::Method &frame)
{
Q_UNUSED(frame);
qDebug() << Q_FUNC_INFO;
}
void ChannelPrivate::close(int code, const QString &text, int classId, int methodId)
@ -148,6 +152,7 @@ void ChannelPrivate::close(int code, const QString &text, int classId, int metho
void ChannelPrivate::close(const Frame::Method &frame)
{
Q_Q(Channel);
qDebug(">> CLOSE");
stateChanged(csClosed);
QByteArray data = frame.arguments();
@ -158,6 +163,13 @@ void ChannelPrivate::close(const Frame::Method &frame)
stream >> classId;
stream >> methodId;
Channel::ChannelError checkError = static_cast<Channel::ChannelError>(code);
if (checkError != Channel::NoError) {
error = checkError;
errorString = qPrintable(text);
Q_EMIT q->error(error);
}
qDebug(">> code: %d", code);
qDebug(">> text: %s", qPrintable(text));
qDebug(">> class-id: %d", classId);
@ -270,4 +282,16 @@ void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount)
d->setQOS(prefetchSize, prefetchCount);
}
Channel::ChannelError Channel::error() const
{
Q_D(const Channel);
return d->error;
}
QString Channel::errorString() const
{
Q_D(const Channel);
return d->errorString;
}
#include "moc_amqp_channel.cpp"

View File

@ -24,6 +24,18 @@ public:
QString name() const;
void setName(const QString &name);
enum ChannelError {
NoError = 0,
ContentTooLarge = 311,
NoConsumers = 313,
AccessRefused = 403,
NotFound = 404,
ResourceLocked = 405,
PreconditionFailed = 406
};
ChannelError error() const;
QString errorString() const;
public Q_SLOTS:
void closeChannel();
void reopen();
@ -33,6 +45,7 @@ Q_SIGNALS:
void opened();
void closed();
void flowChanged(bool enabled);
void error(ChannelError error);
protected:
virtual void channelOpened() = 0;

View File

@ -77,6 +77,9 @@ public:
bool opened;
bool needOpen;
Channel::ChannelError error;
QString errorString;
Q_DECLARE_PUBLIC(Channel)
Channel * const q_ptr;
};

View File

@ -21,6 +21,7 @@ ClientPrivate::ClientPrivate(Client *q)
socket(0),
closed(false),
connected(false),
error(Client::NoError),
q_ptr(q)
{
}
@ -335,6 +336,13 @@ void ClientPrivate::close(const Frame::Method &frame)
stream >> classId;
stream >> methodId;
Client::ConnectionError checkError = static_cast<Client::ConnectionError>(code);
if (checkError != Client::NoError) {
error = checkError;
errorString = qPrintable(text);
Q_EMIT q->error(error);
}
qDebug(">> code: %d", code);
qDebug(">> text: %s", qPrintable(text));
qDebug(">> class-id: %d", classId);

View File

@ -61,6 +61,23 @@ public:
void addCustomProperty(const QString &name, const QString &value);
QString customProperty(const QString &name) const;
enum ConnectionError {
NoError = 0,
ConnectionForced = 320,
InvalidPath = 402,
FrameError = 501,
SyntaxError = 502,
CommandInvalid = 503,
ChannelError = 504,
UnexpectedFrame = 505,
ResourceError = 506,
NotAllowed = 530,
NotImplemented = 540,
InternalError = 541
};
ConnectionError error() const;
QString errorString() const;
// channels
Exchange *createExchange(int channelNumber = -1);
Exchange *createExchange(const QString &name, int channelNumber = -1);
@ -76,6 +93,7 @@ public:
Q_SIGNALS:
void connected();
void disconnected();
void error(ConnectionError error);
protected:
Client(ClientPrivate *dd, QObject *parent = 0);

View File

@ -91,6 +91,9 @@ public:
QPointer<QTimer> heartbeatTimer;
Frame::TableField customProperties;
Client::ConnectionError error;
QString errorString;
Client * const q_ptr;
Q_DECLARE_PUBLIC(Client)

View File

@ -34,7 +34,7 @@ bool QueuePrivate::_q_method(const Frame::Method &frame)
case miDeclareOk:
declareOk(frame);
break;
case miDelete:
case miDeleteOk:
deleteOk(frame);
break;
case miBindOk:
@ -180,8 +180,6 @@ void QueuePrivate::getOk(const Frame::Method &frame)
void QueuePrivate::consumeOk(const Frame::Method &frame)
{
qDebug() << "Consume ok: " << name;
declared = false;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
consumerTag = Frame::readField('s',stream).toString();
@ -190,6 +188,7 @@ void QueuePrivate::consumeOk(const Frame::Method &frame)
void QueuePrivate::deliver(const Frame::Method &frame)
{
qDebug() << Q_FUNC_INFO;
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
QString consumer_ = Frame::readField('s',in).toString();
@ -206,6 +205,41 @@ void QueuePrivate::deliver(const Frame::Method &frame)
messages.enqueue(message);
}
void QueuePrivate::declare()
{
if (name.isEmpty()) {
qDebug() << Q_FUNC_INFO << "can't declare queue with no name";
return;
}
Frame::Method frame(Frame::fcQueue, QueuePrivate::miDeclare);
frame.setChannel(channelNumber);
QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserved 1
Frame::writeField('s', out, name);
qDebug() << "DECLARE OPTIONS: ";
if (options & Queue::NoOptions) qDebug() << "NoOptions";
if (options & Queue::Passive) qDebug() << "Passive";
if (options & Queue::Durable) qDebug() << "Durable";
if (options & Queue::Exclusive) qDebug() << "Exclusive";
if (options & Queue::AutoDelete) qDebug() << "AutoDelete";
if (options & Queue::NoWait) qDebug() << "NoWait";
out << qint8(options);
Frame::writeField('F', out, Frame::TableField());
frame.setArguments(arguments);
sendFrame(frame);
if (delayedDeclare)
delayedDeclare = false;
}
//////////////////////////////////////////////////////////////////////////
Queue::Queue(int channelNumber, Client *parent)
@ -217,14 +251,13 @@ Queue::Queue(int channelNumber, Client *parent)
Queue::~Queue()
{
remove();
}
void Queue::channelOpened()
{
Q_D(Queue);
if (d->delayedDeclare)
declare();
d->declare();
if (!d->delayedBindings.isEmpty()) {
typedef QPair<QString, QString> BindingPair;
@ -236,10 +269,9 @@ void Queue::channelOpened()
void Queue::channelClosed()
{
remove(true, true);
}
Queue::QueueOptions Queue::option() const
int Queue::options() const
{
Q_D(const Queue);
return d->options;
@ -257,7 +289,7 @@ bool Queue::noAck() const
return d->noAck;
}
void Queue::declare(const QString &name, QueueOptions options)
void Queue::declare(const QString &name, int options)
{
Q_D(Queue);
if (!name.isEmpty())
@ -269,24 +301,10 @@ void Queue::declare(const QString &name, QueueOptions options)
return;
}
Frame::Method frame(Frame::fcQueue, QueuePrivate::miDeclare);
frame.setChannel(d->channelNumber);
QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name);
out << qint8(options);
Frame::writeField('F', out, Frame::TableField());
frame.setArguments(arguments);
d->sendFrame(frame);
d->delayedDeclare = false;
d->declare();
}
void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait)
void Queue::remove(int options)
{
Q_D(Queue);
if (!d->declared) {
@ -302,12 +320,7 @@ void Queue::remove(bool ifUnused, bool ifEmpty, bool noWait)
out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name);
qint8 flag = 0;
flag |= (ifUnused ? 0x1 : 0);
flag |= (ifEmpty ? 0x2 : 0);
flag |= (noWait ? 0x4 : 0);
out << flag;
out << qint8(options);
frame.setArguments(arguments);
d->sendFrame(frame);
@ -420,7 +433,7 @@ bool Queue::hasMessage() const
return message.d->leftSize == 0;
}
void Queue::consume(ConsumeOptions options)
void Queue::consume(int options)
{
Q_D(Queue);
if (!d->opened) {

View File

@ -16,9 +16,12 @@ class QAMQP_EXPORT Queue : public Channel
{
Q_OBJECT
Q_ENUMS(QueueOptions)
Q_PROPERTY(QueueOptions option READ option)
Q_PROPERTY(int options READ options CONSTANT)
Q_PROPERTY(QString consumerTag READ consumerTag WRITE setConsumerTag)
Q_PROPERTY(bool noAck READ noAck WRITE setNoAck)
Q_ENUMS(QueueOption)
Q_ENUMS(ConsumeOption)
Q_ENUMS(RemoveOption)
public:
enum QueueOption {
@ -30,6 +33,7 @@ public:
NoWait = 0x10
};
Q_DECLARE_FLAGS(QueueOptions, QueueOption)
int options() const;
enum ConsumeOption {
coNoLocal = 0x1,
@ -39,8 +43,14 @@ public:
};
Q_DECLARE_FLAGS(ConsumeOptions, ConsumeOption)
enum RemoveOption {
roIfUnused = 0x1,
roIfEmpty = 0x02,
roNoWait = 0x04
};
Q_DECLARE_FLAGS(RemoveOptions, RemoveOption)
~Queue();
QueueOptions option() const;
bool hasMessage() const;
Message getMessage();
@ -52,17 +62,16 @@ public:
bool noAck() const;
// AMQP Queue
void declare(const QString &name = QString(),
QueueOptions options = QueueOptions(Durable | AutoDelete));
void declare(const QString &name = QString(), int options = Durable|AutoDelete);
void bind(const QString &exchangeName, const QString &key);
void bind(Exchange *exchange, const QString &key);
void unbind(const QString &exchangeName, const QString &key);
void unbind(Exchange *exchange, const QString &key);
void purge();
void remove(bool ifUnused = true, bool ifEmpty = true, bool noWait = true);
void remove(int options = roIfUnused|roIfEmpty|roNoWait);
// AMQP Basic
void consume(ConsumeOptions options = ConsumeOptions(NoOptions));
void consume(int options = NoOptions);
void get();
void ack(const Message &message);
@ -80,7 +89,7 @@ protected:
virtual void channelClosed();
private:
Queue(int channelNumber = -1, Client *parent = 0);
explicit Queue(int channelNumber = -1, Client *parent = 0);
Q_DISABLE_COPY(Queue)
Q_DECLARE_PRIVATE(Queue)

View File

@ -23,6 +23,8 @@ public:
QueuePrivate(Queue *q);
~QueuePrivate();
void declare();
// method handler related
virtual bool _q_method(const Frame::Method &frame);
virtual void _q_content(const Frame::Content &frame);
@ -36,7 +38,7 @@ public:
void deliver(const Frame::Method &frame);
QString type;
Queue::QueueOptions options;
int options;
bool delayedDeclare;
bool declared;
bool noAck;

View File

@ -1,3 +1,5 @@
#include <QScopedPointer>
#include <QtTest/QtTest>
#include "amqp_testcase.h"
@ -10,29 +12,87 @@ class tst_QAMQPQueue : public TestCase
{
Q_OBJECT
private Q_SLOTS:
void init();
void cleanup();
void defaultExchange();
void remove();
void removeIfUnused();
private: // disabled
void removeIfEmpty();
private:
QScopedPointer<Client> client;
};
void tst_QAMQPQueue::init()
{
client.reset(new Client);
client->connectToHost();
QVERIFY(waitForSignal(client.data(), SIGNAL(connected())));
}
void tst_QAMQPQueue::cleanup()
{
client->disconnectFromHost();
QVERIFY(waitForSignal(client.data(), SIGNAL(disconnected())));
}
void tst_QAMQPQueue::defaultExchange()
{
Client client;
client.connectToHost();
QVERIFY(waitForSignal(&client, SIGNAL(connected())));
Queue *queue = client.createQueue("test-default-exchange");
Queue *queue = client->createQueue("test-default-exchange");
queue->declare();
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
queue->consume();
Exchange *defaultExchange = client.createExchange();
Exchange *defaultExchange = client->createExchange();
defaultExchange->publish("test-default-exchange", "first message");
QVERIFY(waitForSignal(queue, SIGNAL(messageReceived())));
Message message = queue->getMessage();
QCOMPARE(message.payload(), QByteArray("first message"));
}
client.disconnectFromHost();
QVERIFY(waitForSignal(&client, SIGNAL(disconnected())));
void tst_QAMQPQueue::remove()
{
Queue *queue = client->createQueue("test-remove");
queue->declare();
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
queue->remove(Queue::roIfEmpty|Queue::roIfUnused);
QVERIFY(waitForSignal(queue, SIGNAL(removed())));
}
void tst_QAMQPQueue::removeIfUnused()
{
Queue *queue = client->createQueue("test-remove-if-unused");
queue->declare();
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
queue->consume();
queue->remove(Queue::roIfUnused);
QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError))));
QCOMPARE(queue->error(), Channel::PreconditionFailed);
}
void tst_QAMQPQueue::removeIfEmpty()
{
// NOTE: this will work once I refactor messages to easily
// add propertis for e.g. persistence
Queue *queue = client->createQueue();
queue->declare("test-remove-if-empty", Queue::Durable);
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
queue->consume();
Exchange *defaultExchange = client->createExchange();
defaultExchange->publish("test-remove-if-empty", "first message");
QVERIFY(waitForSignal(queue, SIGNAL(messageReceived())));
queue->remove(Queue::roIfEmpty);
QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError))));
QCOMPARE(queue->error(), Channel::PreconditionFailed);
}
QTEST_MAIN(tst_QAMQPQueue)