diff --git a/src/amqp_exchange.cpp b/src/amqp_exchange.cpp index 8e58d61..7f4db09 100644 --- a/src/amqp_exchange.cpp +++ b/src/amqp_exchange.cpp @@ -64,19 +64,37 @@ bool ExchangePrivate::_q_method(const Frame::Method &frame) if (ChannelPrivate::_q_method(frame)) return true; - if (frame.methodClass() != Frame::fcExchange) - return false; + if (frame.methodClass() == Frame::fcExchange) { + switch (frame.id()) { + case miDeclareOk: + declareOk(frame); + break; - switch(frame.id()) { - case miDeclareOk: - declareOk(frame); - break; - case miDeleteOk: - deleteOk(frame); - break; + case miDeleteOk: + deleteOk(frame); + break; + + default: + qDebug() << Q_FUNC_INFO << "unhandled exchange method: " << frame.id(); + break; + } + + return true; + } else if (frame.methodClass() == Frame::fcBasic) { + switch (frame.id()) { + case bmReturn: + basicReturn(frame); + break; + + default: + qDebug() << Q_FUNC_INFO << "unhandled basic method: " << frame.id(); + break; + } + + return true; } - return true; + return false; } void ExchangePrivate::declareOk(const Frame::Method &frame) @@ -107,6 +125,31 @@ void ExchangePrivate::_q_disconnected() declared = false; } +void ExchangePrivate::basicReturn(const Frame::Method &frame) +{ + Q_Q(Exchange); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + + quint16 replyCode; + stream >> replyCode; + QString replyText = Frame::readField('s', stream).toString(); + QString exchangeName = Frame::readField('s', stream).toString(); + QString routingKey = Frame::readField('s', stream).toString(); + + Error checkError = static_cast(replyCode); + if (checkError != QAMQP::NoError) { + error = checkError; + errorString = qPrintable(replyText); + Q_EMIT q->error(error); + } + + qAmqpDebug(">> replyCode: %d", replyCode); + qAmqpDebug(">> replyText: %s", qPrintable(replyText)); + qAmqpDebug(">> exchangeName: %s", qPrintable(exchangeName)); + qAmqpDebug(">> routingKey: %s", qPrintable(routingKey)); +} + ////////////////////////////////////////////////////////////////////////// Exchange::Exchange(int channelNumber, Client *parent) @@ -175,20 +218,22 @@ void Exchange::remove(int options) } void Exchange::publish(const QString &message, const QString &routingKey, - const MessageProperties &properties) + const MessageProperties &properties, int publishOptions) { - publish(message.toUtf8(), routingKey, QLatin1String("text.plain"), QVariantHash(), properties); + publish(message.toUtf8(), routingKey, QLatin1String("text.plain"), + QVariantHash(), properties, publishOptions); } void Exchange::publish(const QByteArray &message, const QString &routingKey, - const QString &mimeType, const MessageProperties &properties) + const QString &mimeType, const MessageProperties &properties, + int publishOptions) { - publish(message, routingKey, mimeType, QVariantHash(), properties); + publish(message, routingKey, mimeType, QVariantHash(), properties, publishOptions); } void Exchange::publish(const QByteArray &message, const QString &routingKey, const QString &mimeType, const QVariantHash &headers, - const MessageProperties &properties) + const MessageProperties &properties, int publishOptions) { Q_D(Exchange); Frame::Method frame(Frame::fcBasic, ExchangePrivate::bmPublish); @@ -200,7 +245,7 @@ void Exchange::publish(const QByteArray &message, const QString &routingKey, out << qint16(0); //reserved 1 Frame::writeField('s', out, d->name); Frame::writeField('s', out, routingKey); - out << qint8(0); + out << qint8(publishOptions); frame.setArguments(arguments); d->sendFrame(frame); diff --git a/src/amqp_exchange.h b/src/amqp_exchange.h index d7768f4..ccf6c6a 100644 --- a/src/amqp_exchange.h +++ b/src/amqp_exchange.h @@ -26,9 +26,16 @@ public: }; QString type() const; + enum PublishOption { + poNoOptions = 0x0, + poMandatory = 0x01, + poImmediate = 0x02 + }; + Q_DECLARE_FLAGS(PublishOptions, PublishOption) + enum RemoveOption { roForce = 0x0, - roIfUnused = 0x1, + roIfUnused = 0x01, roNoWait = 0x04 }; Q_DECLARE_FLAGS(RemoveOptions, RemoveOption) @@ -37,8 +44,8 @@ public: NoOptions = 0x0, Passive = 0x01, Durable = 0x02, - AutoDelete = 0x4, - Internal = 0x8, + AutoDelete = 0x04, + Internal = 0x08, NoWait = 0x10 }; Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption) @@ -57,12 +64,15 @@ public: // AMQP Basic void publish(const QString &message, const QString &routingKey, - const MessageProperties &properties = MessageProperties()); + const MessageProperties &properties = MessageProperties(), + int publishOptions = poNoOptions); void publish(const QByteArray &message, const QString &routingKey, - const QString &mimeType, const MessageProperties &properties = MessageProperties()); + const QString &mimeType, const MessageProperties &properties = MessageProperties(), + int publishOptions = poNoOptions); void publish(const QByteArray &message, const QString &routingKey, const QString &mimeType, const QVariantHash &headers, - const MessageProperties &properties = MessageProperties()); + const MessageProperties &properties = MessageProperties(), + int publishOptions = poNoOptions); Q_SIGNALS: void declared(); diff --git a/src/amqp_exchange_p.h b/src/amqp_exchange_p.h index 83e00f1..04a1e1c 100644 --- a/src/amqp_exchange_p.h +++ b/src/amqp_exchange_p.h @@ -25,6 +25,7 @@ public: virtual bool _q_method(const Frame::Method &frame); void declareOk(const Frame::Method &frame); void deleteOk(const Frame::Method &frame); + void basicReturn(const Frame::Method &frame); QString type; Exchange::ExchangeOptions options; diff --git a/src/amqp_global.h b/src/amqp_global.h index 89be59e..2ca17ed 100644 --- a/src/amqp_global.h +++ b/src/amqp_global.h @@ -35,6 +35,7 @@ namespace QAMQP { enum Error { NoError = 0, ContentTooLargeError = 311, + UnroutableKey = 312, NoConsumersError = 313, ConnectionForcedError = 320, InvalidPathError = 402, diff --git a/src/amqp_queue.h b/src/amqp_queue.h index 206712d..7d8b930 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -30,24 +30,24 @@ public: NoOptions = 0x0, Passive = 0x01, Durable = 0x02, - Exclusive = 0x4, - AutoDelete = 0x8, + Exclusive = 0x04, + AutoDelete = 0x08, NoWait = 0x10 }; Q_DECLARE_FLAGS(QueueOptions, QueueOption) int options() const; enum ConsumeOption { - coNoLocal = 0x1, + coNoLocal = 0x01, coNoAck = 0x02, coExclusive = 0x04, - coNoWait = 0x8 + coNoWait = 0x08 }; Q_DECLARE_FLAGS(ConsumeOptions, ConsumeOption) enum RemoveOption { roForce = 0x0, - roIfUnused = 0x1, + roIfUnused = 0x01, roIfEmpty = 0x02, roNoWait = 0x04 }; diff --git a/tests/auto/qamqpexchange/tst_qamqpexchange.cpp b/tests/auto/qamqpexchange/tst_qamqpexchange.cpp index b776e30..a826a7b 100644 --- a/tests/auto/qamqpexchange/tst_qamqpexchange.cpp +++ b/tests/auto/qamqpexchange/tst_qamqpexchange.cpp @@ -22,6 +22,7 @@ private Q_SLOTS: void invalidDeclaration(); void invalidRedeclaration(); void removeIfUnused(); + void invalidMandatoryRouting(); private: QScopedPointer client; @@ -140,5 +141,13 @@ void tst_QAMQPExchange::removeIfUnused() QVERIFY(waitForSignal(queue, SIGNAL(removed()))); } +void tst_QAMQPExchange::invalidMandatoryRouting() +{ + Exchange *defaultExchange = client->createExchange(); + defaultExchange->publish("some message", "unroutable-key", MessageProperties(), Exchange::poMandatory); + QVERIFY(waitForSignal(defaultExchange, SIGNAL(error(QAMQP::Error)))); + QCOMPARE(defaultExchange->error(), QAMQP::UnroutableKey); +} + QTEST_MAIN(tst_QAMQPExchange) #include "tst_qamqpexchange.moc"