diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index 7172082..ec44a41 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -151,7 +151,7 @@ void QueuePrivate::bindOk(const Frame::Method &frame) Q_UNUSED(frame) Q_Q(Queue); - qDebug() << Q_FUNC_INFO << "bound to queue: " << name; + qDebug() << Q_FUNC_INFO << "bound to exchange"; Q_EMIT q->bound(); } @@ -160,7 +160,7 @@ void QueuePrivate::unbindOk(const Frame::Method &frame) Q_UNUSED(frame) Q_Q(Queue); - qDebug() << Q_FUNC_INFO << "unbound queue: " << name; + qDebug() << Q_FUNC_INFO << "unbound exchange"; Q_EMIT q->unbound(); } @@ -220,15 +220,6 @@ void QueuePrivate::declare() out << qint16(0); //reserved 1 Frame::writeField('s', out, name); - - qDebug() << "DECLARE OPTIONS: "; - if (options & Queue::NoOptions) qDebug() << "NoOptions"; - if (options & Queue::Passive) qDebug() << "Passive"; - if (options & Queue::Durable) qDebug() << "Durable"; - if (options & Queue::Exclusive) qDebug() << "Exclusive"; - if (options & Queue::AutoDelete) qDebug() << "AutoDelete"; - if (options & Queue::NoWait) qDebug() << "NoWait"; - out << qint8(options); Frame::writeField('F', out, Frame::TableField()); @@ -289,11 +280,9 @@ bool Queue::noAck() const return d->noAck; } -void Queue::declare(const QString &name, int options) +void Queue::declare(int options) { Q_D(Queue); - if (!name.isEmpty()) - d->name = name; d->options = options; if (!d->opened) { @@ -338,13 +327,11 @@ void Queue::purge() QByteArray arguments; QDataStream out(&arguments, QIODevice::WriteOnly); - out << qint16(0); //reserved 1 Frame::writeField('s', out, d->name); - out << qint8(0); // no-wait - frame.setArguments(arguments); + frame.setArguments(arguments); d->sendFrame(frame); } @@ -372,12 +359,12 @@ void Queue::bind(const QString &exchangeName, const QString &key) QByteArray arguments; QDataStream out(&arguments, QIODevice::WriteOnly); - out << qint16(0); //reserved 1 + out << qint16(0); // reserved 1 Frame::writeField('s', out, d->name); Frame::writeField('s', out, exchangeName); Frame::writeField('s', out, key); - out << qint8(0); // no-wait + out << qint8(0); // no-wait Frame::writeField('F', out, Frame::TableField()); frame.setArguments(arguments); diff --git a/src/amqp_queue.h b/src/amqp_queue.h index db3957e..3b04459 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -62,7 +62,7 @@ public: bool noAck() const; // AMQP Queue - void declare(const QString &name = QString(), int options = Durable|AutoDelete); + void declare(int options = Durable|AutoDelete); void bind(const QString &exchangeName, const QString &key); void bind(Exchange *exchange, const QString &key); void unbind(const QString &exchangeName, const QString &key); diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 92e80f4..0ab5add 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -16,6 +16,8 @@ private Q_SLOTS: void cleanup(); void defaultExchange(); + void standardExchanges_data(); + void standardExchanges(); void remove(); void removeIfUnused(); @@ -55,6 +57,38 @@ void tst_QAMQPQueue::defaultExchange() QCOMPARE(message.payload(), QByteArray("first message")); } +void tst_QAMQPQueue::standardExchanges_data() +{ + QTest::addColumn("exchange"); + QTest::newRow("amq.direct") << "amq.direct"; + QTest::newRow("amq.fanout") << "amq.fanout"; + QTest::newRow("amq.headers") << "amq.headers"; + QTest::newRow("amq.match") << "amq.match"; + QTest::newRow("amq.topic") << "amq.topic"; +} + +void tst_QAMQPQueue::standardExchanges() +{ + QFETCH(QString, exchange); + + QString queueName = QString("test-%1").arg(exchange); + QString routingKey = QString("testRoutingKey-%1").arg(exchange); + + Queue *queue = client->createQueue(queueName); + queue->declare(); + QVERIFY(waitForSignal(queue, SIGNAL(declared()))); + queue->consume(); // required because AutoDelete will not delete if + // there was never a consumer + + queue->bind(exchange, routingKey); + QVERIFY(waitForSignal(queue, SIGNAL(bound()))); + + Exchange *defaultExchange = client->createExchange(exchange); + defaultExchange->publish(routingKey, "test message"); + QVERIFY(waitForSignal(queue, SIGNAL(messageReceived()))); + QCOMPARE(queue->getMessage().payload(), QByteArray("test message")); +} + void tst_QAMQPQueue::remove() { Queue *queue = client->createQueue("test-remove"); @@ -74,6 +108,7 @@ void tst_QAMQPQueue::removeIfUnused() queue->remove(Queue::roIfUnused); QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError)))); QCOMPARE(queue->error(), Channel::PreconditionFailed); + QVERIFY(!queue->errorString().isEmpty()); } void tst_QAMQPQueue::removeIfEmpty() @@ -81,8 +116,8 @@ void tst_QAMQPQueue::removeIfEmpty() // NOTE: this will work once I refactor messages to easily // add propertis for e.g. persistence - Queue *queue = client->createQueue(); - queue->declare("test-remove-if-empty", Queue::Durable); + Queue *queue = client->createQueue("test-remove-if-empty"); + queue->declare(Queue::Durable); QVERIFY(waitForSignal(queue, SIGNAL(declared()))); queue->consume(); @@ -93,6 +128,7 @@ void tst_QAMQPQueue::removeIfEmpty() queue->remove(Queue::roIfEmpty); QVERIFY(waitForSignal(queue, SIGNAL(error(ChannelError)))); QCOMPARE(queue->error(), Channel::PreconditionFailed); + QVERIFY(!queue->errorString().isEmpty()); } QTEST_MAIN(tst_QAMQPQueue) diff --git a/tests/manual/qamqp/pubsub/ReceiveLog.h b/tests/manual/qamqp/pubsub/ReceiveLog.h index 12acbdb..984345f 100644 --- a/tests/manual/qamqp/pubsub/ReceiveLog.h +++ b/tests/manual/qamqp/pubsub/ReceiveLog.h @@ -31,8 +31,8 @@ public: client->connectToHost(address); // Create an exclusive queue - queue_ = client->createQueue(); - queue_->declare("", Queue::Exclusive); + queue_ = client->createQueue(""); + queue_->declare(Queue::Exclusive); connect(queue_, SIGNAL(declared()), this, SLOT(declared())); connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); diff --git a/tests/manual/qamqp/routing/ReceiveLogDirect.h b/tests/manual/qamqp/routing/ReceiveLogDirect.h index 3f58abf..d14f96e 100644 --- a/tests/manual/qamqp/routing/ReceiveLogDirect.h +++ b/tests/manual/qamqp/routing/ReceiveLogDirect.h @@ -34,8 +34,8 @@ public: client->connectToHost(address); // Create an exclusive queue - queue_ = client->createQueue(); - queue_->declare("", Queue::Exclusive); + queue_ = client->createQueue(""); + queue_->declare(Queue::Exclusive); connect(queue_, SIGNAL(declared()), this, SLOT(declared())); connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); diff --git a/tests/manual/qamqp/workqueues/NewTask.h b/tests/manual/qamqp/workqueues/NewTask.h index 18fa1d8..62bda7c 100644 --- a/tests/manual/qamqp/workqueues/NewTask.h +++ b/tests/manual/qamqp/workqueues/NewTask.h @@ -35,8 +35,8 @@ public: exchange_ = client->createExchange(); // Create the "task_queue" queue, with the "durable" option set - queue_ = client->createQueue(exchange_->channelNumber()); - queue_->declare("task_queue", Queue::Durable); + queue_ = client->createQueue("task_queue", exchange_->channelNumber()); + queue_->declare(Queue::Durable); } void run() diff --git a/tests/manual/qamqp/workqueues/Worker.h b/tests/manual/qamqp/workqueues/Worker.h index d40e2c8..74808ef 100644 --- a/tests/manual/qamqp/workqueues/Worker.h +++ b/tests/manual/qamqp/workqueues/Worker.h @@ -29,8 +29,8 @@ public: QAMQP::Client* client = new QAMQP::Client(this); client->connectToHost(address); - queue_ = client->createQueue(); - queue_->declare("task_queue", Queue::Durable); + queue_ = client->createQueue("task_queue"); + queue_->declare(Queue::Durable); connect(queue_, SIGNAL(declared()), this, SLOT(declared())); connect(queue_, SIGNAL(messageReceived()), this, SLOT(newMessage())); }