diff --git a/src/qamqpchannel_p.h b/src/qamqpchannel_p.h index c2acb7f..7a79945 100644 --- a/src/qamqpchannel_p.h +++ b/src/qamqpchannel_p.h @@ -12,9 +12,9 @@ class QAmqpChannelPrivate : public QAmqpMethodFrameHandler { public: enum MethodId { - METHOD_ID_ENUM(miOpen, 10), - METHOD_ID_ENUM(miFlow, 20), - METHOD_ID_ENUM(miClose, 40) + METHOD_ID_ENUM(miOpen, 10), + METHOD_ID_ENUM(miFlow, 20), + METHOD_ID_ENUM(miClose, 40) }; enum BasicMethod { @@ -29,7 +29,8 @@ public: bmAck = 80, bmReject = 90, bmRecoverAsync = 100, - METHOD_ID_ENUM(bmRecover, 110) + METHOD_ID_ENUM(bmRecover, 110), + bmNack = 120 }; QAmqpChannelPrivate(QAmqpChannel *q); diff --git a/src/qamqpexchange.cpp b/src/qamqpexchange.cpp index 308b136..ff38b55 100644 --- a/src/qamqpexchange.cpp +++ b/src/qamqpexchange.cpp @@ -1,4 +1,6 @@ +#include #include +#include #include #include "qamqpexchange.h" @@ -22,7 +24,8 @@ QString QAmqpExchangePrivate::typeToString(QAmqpExchange::ExchangeType type) QAmqpExchangePrivate::QAmqpExchangePrivate(QAmqpExchange *q) : QAmqpChannelPrivate(q), delayedDeclare(false), - declared(false) + declared(false), + nextDeliveryTag(0) { } @@ -58,29 +61,36 @@ void QAmqpExchangePrivate::declare() bool QAmqpExchangePrivate::_q_method(const QAmqpMethodFrame &frame) { + Q_Q(QAmqpExchange); if (QAmqpChannelPrivate::_q_method(frame)) return true; - if (frame.methodClass() == QAmqpFrame::Exchange) { + if (frame.methodClass() == QAmqpFrame::Basic) { switch (frame.id()) { - case miDeclareOk: - declareOk(frame); - break; - - case miDeleteOk: - deleteOk(frame); + case bmAck: + case bmNack: + handleAckOrNack(frame); break; + case bmReturn: basicReturn(frame); break; default: break; } return true; - } else if (frame.methodClass() == QAmqpFrame::Basic) { + } + + if (frame.methodClass() == QAmqpFrame::Confirm) { + if (frame.id() == cmConfirmOk) { + Q_EMIT q->confirmsEnabled(); + return true; + } + } + + if (frame.methodClass() == QAmqpFrame::Exchange) { switch (frame.id()) { - case bmReturn: - basicReturn(frame); - break; + case miDeclareOk: declareOk(frame); break; + case miDeleteOk: deleteOk(frame); break; default: break; @@ -145,6 +155,39 @@ void QAmqpExchangePrivate::basicReturn(const QAmqpMethodFrame &frame) qAmqpDebug(">> routingKey: %s", qPrintable(routingKey)); } +void QAmqpExchangePrivate::handleAckOrNack(const QAmqpMethodFrame &frame) +{ + Q_Q(QAmqpExchange); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + + qlonglong deliveryTag = + QAmqpFrame::readAmqpField(stream, QAmqpMetaType::LongLongUint).toLongLong(); + bool multiple = QAmqpFrame::readAmqpField(stream, QAmqpMetaType::Boolean).toBool(); + if (frame.id() == QAmqpExchangePrivate::bmAck) { + if (deliveryTag == 0) { + unconfirmedDeliveryTags.clear(); + } else { + int idx = unconfirmedDeliveryTags.indexOf(deliveryTag); + if (idx == -1) { + return; + } + + if (multiple) { + unconfirmedDeliveryTags.remove(0, idx + 1); + } else { + unconfirmedDeliveryTags.remove(idx); + } + } + + if (unconfirmedDeliveryTags.isEmpty()) + Q_EMIT q->allMessagesDelivered(); + + } else { + qAmqpDebug() << "nacked(" << deliveryTag << "), multiple=" << multiple; + } +} + ////////////////////////////////////////////////////////////////////////// QAmqpExchange::QAmqpExchange(int channelNumber, QAmqpClient *parent) @@ -231,6 +274,11 @@ void QAmqpExchange::publish(const QByteArray &message, const QString &routingKey const QAmqpMessage::PropertyHash &properties, int publishOptions) { Q_D(QAmqpExchange); + if (d->nextDeliveryTag > 0) { + d->unconfirmedDeliveryTags.append(d->nextDeliveryTag); + d->nextDeliveryTag++; + } + QAmqpMethodFrame frame(QAmqpFrame::Basic, QAmqpExchangePrivate::bmPublish); frame.setChannel(d->channelNumber); @@ -268,3 +316,32 @@ void QAmqpExchange::publish(const QByteArray &message, const QString &routingKey d->sendFrame(body); } } + +void QAmqpExchange::enableConfirms(bool noWait) +{ + Q_D(QAmqpExchange); + QAmqpMethodFrame frame(QAmqpFrame::Confirm, QAmqpExchangePrivate::cmConfirm); + frame.setChannel(d->channelNumber); + + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + stream << qint8(noWait ? 1 : 0); + + frame.setArguments(arguments); + d->sendFrame(frame); + + // for tracking acks and nacks + if (d->nextDeliveryTag == 0) d->nextDeliveryTag = 1; +} + +bool QAmqpExchange::waitForConfirms(int msecs) +{ + Q_D(QAmqpExchange); + + QEventLoop loop; + connect(this, SIGNAL(allMessagesDelivered()), &loop, SLOT(quit())); + QTimer::singleShot(msecs, &loop, SLOT(quit())); + loop.exec(); + + return (d->unconfirmedDeliveryTags.isEmpty()); +} diff --git a/src/qamqpexchange.h b/src/qamqpexchange.h index ef0a40b..122155b 100644 --- a/src/qamqpexchange.h +++ b/src/qamqpexchange.h @@ -17,6 +17,8 @@ class QAMQP_EXPORT QAmqpExchange : public QAmqpChannel Q_ENUMS(ExchangeOptions) public: + virtual ~QAmqpExchange(); + enum ExchangeType { Direct, FanOut, @@ -50,13 +52,11 @@ public: Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption) ExchangeOptions options() const; - virtual ~QAmqpExchange(); - // AMQP Exchange void declare(ExchangeType type = Direct, ExchangeOptions options = NoOptions, const QAmqpTable &args = QAmqpTable()); - void declare(const QString &type = QLatin1String("direct"), + void declare(const QString &type, ExchangeOptions options = NoOptions, const QAmqpTable &args = QAmqpTable()); void remove(int options = roIfUnused|roNoWait); @@ -73,10 +73,16 @@ public: const QAmqpMessage::PropertyHash &properties = QAmqpMessage::PropertyHash(), int publishOptions = poNoOptions); + void enableConfirms(bool noWait = false); + bool waitForConfirms(int msecs = 30000); + Q_SIGNALS: void declared(); void removed(); + void confirmsEnabled(); + void allMessagesDelivered(); + protected: virtual void channelOpened(); virtual void channelClosed(); @@ -86,7 +92,6 @@ private: Q_DISABLE_COPY(QAmqpExchange) Q_DECLARE_PRIVATE(QAmqpExchange) - friend class QAmqpClient; }; diff --git a/src/qamqpexchange_p.h b/src/qamqpexchange_p.h index 4a4054e..3083344 100644 --- a/src/qamqpexchange_p.h +++ b/src/qamqpexchange_p.h @@ -13,6 +13,10 @@ public: METHOD_ID_ENUM(miDelete, 20) }; + enum ConfirmMethod { + METHOD_ID_ENUM(cmConfirm, 10) + }; + QAmqpExchangePrivate(QAmqpExchange *q); static QString typeToString(QAmqpExchange::ExchangeType type); @@ -24,12 +28,15 @@ public: void declareOk(const QAmqpMethodFrame &frame); void deleteOk(const QAmqpMethodFrame &frame); void basicReturn(const QAmqpMethodFrame &frame); + void handleAckOrNack(const QAmqpMethodFrame &frame); QString type; QAmqpExchange::ExchangeOptions options; QAmqpTable arguments; bool delayedDeclare; bool declared; + qlonglong nextDeliveryTag; + QVector unconfirmedDeliveryTags; Q_DECLARE_PUBLIC(QAmqpExchange) }; diff --git a/src/qamqpframe_p.h b/src/qamqpframe_p.h index 059b60f..a07e499 100644 --- a/src/qamqpframe_p.h +++ b/src/qamqpframe_p.h @@ -31,7 +31,8 @@ public: Exchange = 40, Queue = 50, Basic = 60, - Tx = 90, + Confirm = 85, + Tx = 90 }; virtual ~QAmqpFrame(); diff --git a/src/qamqpqueue.cpp b/src/qamqpqueue.cpp index 3d3aa6c..c5ea577 100644 --- a/src/qamqpqueue.cpp +++ b/src/qamqpqueue.cpp @@ -502,6 +502,11 @@ void QAmqpQueue::get(bool noAck) } void QAmqpQueue::ack(const QAmqpMessage &message) +{ + ack(message.deliveryTag(), false); +} + +void QAmqpQueue::ack(qlonglong deliveryTag, bool multiple) { Q_D(QAmqpQueue); if (!d->opened) { @@ -515,8 +520,8 @@ void QAmqpQueue::ack(const QAmqpMessage &message) QByteArray arguments; QDataStream out(&arguments, QIODevice::WriteOnly); - out << message.deliveryTag(); - out << qint8(0); // multiple + out << deliveryTag; + out << qint8(multiple ? 1 : 0); // multiple frame.setArguments(arguments); d->sendFrame(frame); diff --git a/src/qamqpqueue.h b/src/qamqpqueue.h index ada3d19..8a33b06 100644 --- a/src/qamqpqueue.h +++ b/src/qamqpqueue.h @@ -52,6 +52,7 @@ public: ~QAmqpQueue(); bool isConsuming() const; + void setConsumerTag(const QString &consumerTag); QString consumerTag() const; @@ -67,9 +68,11 @@ public: // AMQP Basic bool consume(int options = NoOptions); void get(bool noAck = true); - void ack(const QAmqpMessage &message); bool cancel(bool noWait = false); + void ack(const QAmqpMessage &message); + void ack(qlonglong deliveryTag, bool multiple); + Q_SIGNALS: void declared(); void bound(); diff --git a/tests/auto/qamqpexchange/tst_qamqpexchange.cpp b/tests/auto/qamqpexchange/tst_qamqpexchange.cpp index 4ef3965..92bdf72 100644 --- a/tests/auto/qamqpexchange/tst_qamqpexchange.cpp +++ b/tests/auto/qamqpexchange/tst_qamqpexchange.cpp @@ -23,6 +23,8 @@ private Q_SLOTS: void removeIfUnused(); void invalidMandatoryRouting(); void invalidImmediateRouting(); + void confirmsSupport(); + void confirmDontLoseMessages(); private: QScopedPointer client; @@ -167,5 +169,26 @@ void tst_QAMQPExchange::invalidImmediateRouting() QCOMPARE(client->error(), QAMQP::NotImplementedError); } +void tst_QAMQPExchange::confirmsSupport() +{ + QAmqpExchange *exchange = client->createExchange("confirm-test"); + exchange->enableConfirms(); + QVERIFY(waitForSignal(exchange, SIGNAL(confirmsEnabled()))); +} + +void tst_QAMQPExchange::confirmDontLoseMessages() +{ + QAmqpExchange *defaultExchange = client->createExchange(); + defaultExchange->enableConfirms(); + QVERIFY(waitForSignal(defaultExchange, SIGNAL(confirmsEnabled()))); + + QAmqpMessage::PropertyHash properties; + properties[QAmqpMessage::DeliveryMode] = "2"; // make message persistent + + for (int i = 0; i < 10000; ++i) + defaultExchange->publish("noop", "confirms-test", properties); + QVERIFY(defaultExchange->waitForConfirms()); +} + QTEST_MAIN(tst_QAMQPExchange) #include "tst_qamqpexchange.moc"