- added auto tests for binding to standard AMQP-defined exchanges
- removed name parameter from Queue::declare, reducing confusion. Updated manual test to reflect this change
This commit is contained in:
parent
9b45f2ac58
commit
8599cec147
|
|
@ -151,7 +151,7 @@ void QueuePrivate::bindOk(const Frame::Method &frame)
|
||||||
Q_UNUSED(frame)
|
Q_UNUSED(frame)
|
||||||
|
|
||||||
Q_Q(Queue);
|
Q_Q(Queue);
|
||||||
qDebug() << Q_FUNC_INFO << "bound to queue: " << name;
|
qDebug() << Q_FUNC_INFO << "bound to exchange";
|
||||||
Q_EMIT q->bound();
|
Q_EMIT q->bound();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -160,7 +160,7 @@ void QueuePrivate::unbindOk(const Frame::Method &frame)
|
||||||
Q_UNUSED(frame)
|
Q_UNUSED(frame)
|
||||||
|
|
||||||
Q_Q(Queue);
|
Q_Q(Queue);
|
||||||
qDebug() << Q_FUNC_INFO << "unbound queue: " << name;
|
qDebug() << Q_FUNC_INFO << "unbound exchange";
|
||||||
Q_EMIT q->unbound();
|
Q_EMIT q->unbound();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -220,15 +220,6 @@ void QueuePrivate::declare()
|
||||||
|
|
||||||
out << qint16(0); //reserved 1
|
out << qint16(0); //reserved 1
|
||||||
Frame::writeField('s', out, name);
|
Frame::writeField('s', out, name);
|
||||||
|
|
||||||
qDebug() << "DECLARE OPTIONS: ";
|
|
||||||
if (options & Queue::NoOptions) qDebug() << "NoOptions";
|
|
||||||
if (options & Queue::Passive) qDebug() << "Passive";
|
|
||||||
if (options & Queue::Durable) qDebug() << "Durable";
|
|
||||||
if (options & Queue::Exclusive) qDebug() << "Exclusive";
|
|
||||||
if (options & Queue::AutoDelete) qDebug() << "AutoDelete";
|
|
||||||
if (options & Queue::NoWait) qDebug() << "NoWait";
|
|
||||||
|
|
||||||
out << qint8(options);
|
out << qint8(options);
|
||||||
Frame::writeField('F', out, Frame::TableField());
|
Frame::writeField('F', out, Frame::TableField());
|
||||||
|
|
||||||
|
|
@ -289,11 +280,9 @@ bool Queue::noAck() const
|
||||||
return d->noAck;
|
return d->noAck;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Queue::declare(const QString &name, int options)
|
void Queue::declare(int options)
|
||||||
{
|
{
|
||||||
Q_D(Queue);
|
Q_D(Queue);
|
||||||
if (!name.isEmpty())
|
|
||||||
d->name = name;
|
|
||||||
d->options = options;
|
d->options = options;
|
||||||
|
|
||||||
if (!d->opened) {
|
if (!d->opened) {
|
||||||
|
|
@ -338,13 +327,11 @@ void Queue::purge()
|
||||||
|
|
||||||
QByteArray arguments;
|
QByteArray arguments;
|
||||||
QDataStream out(&arguments, QIODevice::WriteOnly);
|
QDataStream out(&arguments, QIODevice::WriteOnly);
|
||||||
|
|
||||||
out << qint16(0); //reserved 1
|
out << qint16(0); //reserved 1
|
||||||
Frame::writeField('s', out, d->name);
|
Frame::writeField('s', out, d->name);
|
||||||
|
|
||||||
out << qint8(0); // no-wait
|
out << qint8(0); // no-wait
|
||||||
frame.setArguments(arguments);
|
|
||||||
|
|
||||||
|
frame.setArguments(arguments);
|
||||||
d->sendFrame(frame);
|
d->sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -372,12 +359,12 @@ void Queue::bind(const QString &exchangeName, const QString &key)
|
||||||
QByteArray arguments;
|
QByteArray arguments;
|
||||||
QDataStream out(&arguments, QIODevice::WriteOnly);
|
QDataStream out(&arguments, QIODevice::WriteOnly);
|
||||||
|
|
||||||
out << qint16(0); //reserved 1
|
out << qint16(0); // reserved 1
|
||||||
Frame::writeField('s', out, d->name);
|
Frame::writeField('s', out, d->name);
|
||||||
Frame::writeField('s', out, exchangeName);
|
Frame::writeField('s', out, exchangeName);
|
||||||
Frame::writeField('s', out, key);
|
Frame::writeField('s', out, key);
|
||||||
|
|
||||||
out << qint8(0); // no-wait
|
out << qint8(0); // no-wait
|
||||||
Frame::writeField('F', out, Frame::TableField());
|
Frame::writeField('F', out, Frame::TableField());
|
||||||
|
|
||||||
frame.setArguments(arguments);
|
frame.setArguments(arguments);
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,7 @@ public:
|
||||||
bool noAck() const;
|
bool noAck() const;
|
||||||
|
|
||||||
// AMQP Queue
|
// AMQP Queue
|
||||||
void declare(const QString &name = QString(), int options = Durable|AutoDelete);
|
void declare(int options = Durable|AutoDelete);
|
||||||
void bind(const QString &exchangeName, const QString &key);
|
void bind(const QString &exchangeName, const QString &key);
|
||||||
void bind(Exchange *exchange, const QString &key);
|
void bind(Exchange *exchange, const QString &key);
|
||||||
void unbind(const QString &exchangeName, const QString &key);
|
void unbind(const QString &exchangeName, const QString &key);
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@ private Q_SLOTS:
|
||||||
void cleanup();
|
void cleanup();
|
||||||
|
|
||||||
void defaultExchange();
|
void defaultExchange();
|
||||||
|
void standardExchanges_data();
|
||||||
|
void standardExchanges();
|
||||||
|
|
||||||
void remove();
|
void remove();
|
||||||
void removeIfUnused();
|
void removeIfUnused();
|
||||||
|
|
@ -55,6 +57,38 @@ void tst_QAMQPQueue::defaultExchange()
|
||||||
QCOMPARE(message.payload(), QByteArray("first message"));
|
QCOMPARE(message.payload(), QByteArray("first message"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tst_QAMQPQueue::standardExchanges_data()
|
||||||
|
{
|
||||||
|
QTest::addColumn<QString>("exchange");
|
||||||
|
QTest::newRow("amq.direct") << "amq.direct";
|
||||||
|
QTest::newRow("amq.fanout") << "amq.fanout";
|
||||||
|
QTest::newRow("amq.headers") << "amq.headers";
|
||||||
|
QTest::newRow("amq.match") << "amq.match";
|
||||||
|
QTest::newRow("amq.topic") << "amq.topic";
|
||||||
|
}
|
||||||
|
|
||||||
|
void tst_QAMQPQueue::standardExchanges()
|
||||||
|
{
|
||||||
|
QFETCH(QString, exchange);
|
||||||
|
|
||||||
|
QString queueName = QString("test-%1").arg(exchange);
|
||||||
|
QString routingKey = QString("testRoutingKey-%1").arg(exchange);
|
||||||
|
|
||||||
|
Queue *queue = client->createQueue(queueName);
|
||||||
|
queue->declare();
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
|
queue->consume(); // required because AutoDelete will not delete if
|
||||||
|
// there was never a consumer
|
||||||
|
|
||||||
|
queue->bind(exchange, routingKey);
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(bound())));
|
||||||
|
|
||||||
|
Exchange *defaultExchange = client->createExchange(exchange);
|
||||||
|
defaultExchange->publish(routingKey, "test message");
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(messageReceived())));
|
||||||
|
QCOMPARE(queue->getMessage().payload(), QByteArray("test message"));
|
||||||
|
}
|
||||||
|
|
||||||
void tst_QAMQPQueue::remove()
|
void tst_QAMQPQueue::remove()
|
||||||
{
|
{
|
||||||
Queue *queue = client->createQueue("test-remove");
|
Queue *queue = client->createQueue("test-remove");
|
||||||
|
|
@ -74,6 +108,7 @@ void tst_QAMQPQueue::removeIfUnused()
|
||||||
queue->remove(Queue::roIfUnused);
|
queue->remove(Queue::roIfUnused);
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError))));
|
QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError))));
|
||||||
QCOMPARE(queue->error(), Channel::PreconditionFailed);
|
QCOMPARE(queue->error(), Channel::PreconditionFailed);
|
||||||
|
QVERIFY(!queue->errorString().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
void tst_QAMQPQueue::removeIfEmpty()
|
void tst_QAMQPQueue::removeIfEmpty()
|
||||||
|
|
@ -81,8 +116,8 @@ void tst_QAMQPQueue::removeIfEmpty()
|
||||||
// NOTE: this will work once I refactor messages to easily
|
// NOTE: this will work once I refactor messages to easily
|
||||||
// add propertis for e.g. persistence
|
// add propertis for e.g. persistence
|
||||||
|
|
||||||
Queue *queue = client->createQueue();
|
Queue *queue = client->createQueue("test-remove-if-empty");
|
||||||
queue->declare("test-remove-if-empty", Queue::Durable);
|
queue->declare(Queue::Durable);
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
queue->consume();
|
queue->consume();
|
||||||
|
|
||||||
|
|
@ -93,6 +128,7 @@ void tst_QAMQPQueue::removeIfEmpty()
|
||||||
queue->remove(Queue::roIfEmpty);
|
queue->remove(Queue::roIfEmpty);
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError))));
|
QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError))));
|
||||||
QCOMPARE(queue->error(), Channel::PreconditionFailed);
|
QCOMPARE(queue->error(), Channel::PreconditionFailed);
|
||||||
|
QVERIFY(!queue->errorString().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
QTEST_MAIN(tst_QAMQPQueue)
|
QTEST_MAIN(tst_QAMQPQueue)
|
||||||
|
|
|
||||||
|
|
@ -31,8 +31,8 @@ public:
|
||||||
client->connectToHost(address);
|
client->connectToHost(address);
|
||||||
|
|
||||||
// Create an exclusive queue
|
// Create an exclusive queue
|
||||||
queue_ = client->createQueue();
|
queue_ = client->createQueue("");
|
||||||
queue_->declare("", Queue::Exclusive);
|
queue_->declare(Queue::Exclusive);
|
||||||
|
|
||||||
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
||||||
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
|
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,8 @@ public:
|
||||||
client->connectToHost(address);
|
client->connectToHost(address);
|
||||||
|
|
||||||
// Create an exclusive queue
|
// Create an exclusive queue
|
||||||
queue_ = client->createQueue();
|
queue_ = client->createQueue("");
|
||||||
queue_->declare("", Queue::Exclusive);
|
queue_->declare(Queue::Exclusive);
|
||||||
|
|
||||||
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
||||||
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
|
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
|
||||||
|
|
|
||||||
|
|
@ -35,8 +35,8 @@ public:
|
||||||
exchange_ = client->createExchange();
|
exchange_ = client->createExchange();
|
||||||
|
|
||||||
// Create the "task_queue" queue, with the "durable" option set
|
// Create the "task_queue" queue, with the "durable" option set
|
||||||
queue_ = client->createQueue(exchange_->channelNumber());
|
queue_ = client->createQueue("task_queue", exchange_->channelNumber());
|
||||||
queue_->declare("task_queue", Queue::Durable);
|
queue_->declare(Queue::Durable);
|
||||||
}
|
}
|
||||||
|
|
||||||
void run()
|
void run()
|
||||||
|
|
|
||||||
|
|
@ -29,8 +29,8 @@ public:
|
||||||
QAMQP::Client* client = new QAMQP::Client(this);
|
QAMQP::Client* client = new QAMQP::Client(this);
|
||||||
client->connectToHost(address);
|
client->connectToHost(address);
|
||||||
|
|
||||||
queue_ = client->createQueue();
|
queue_ = client->createQueue("task_queue");
|
||||||
queue_->declare("task_queue", Queue::Durable);
|
queue_->declare(Queue::Durable);
|
||||||
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
|
||||||
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
|
connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage()));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue