add preliminary support for publisher acknowledgements

RabbitMQ supports publish confirms (Publisher Acknowledgements) on
a given channel. This enables the user to toggle this functionality
and ensure that published messages are in fact published.
This commit is contained in:
Matt Broadstone 2015-01-19 13:51:20 -05:00
parent 8be77044ef
commit f2ac01de34
8 changed files with 146 additions and 24 deletions

View File

@ -12,9 +12,9 @@ class QAmqpChannelPrivate : public QAmqpMethodFrameHandler
{
public:
enum MethodId {
METHOD_ID_ENUM(miOpen, 10),
METHOD_ID_ENUM(miFlow, 20),
METHOD_ID_ENUM(miClose, 40)
METHOD_ID_ENUM(miOpen, 10),
METHOD_ID_ENUM(miFlow, 20),
METHOD_ID_ENUM(miClose, 40)
};
enum BasicMethod {
@ -29,7 +29,8 @@ public:
bmAck = 80,
bmReject = 90,
bmRecoverAsync = 100,
METHOD_ID_ENUM(bmRecover, 110)
METHOD_ID_ENUM(bmRecover, 110),
bmNack = 120
};
QAmqpChannelPrivate(QAmqpChannel *q);

View File

@ -1,4 +1,6 @@
#include <QEventLoop>
#include <QDataStream>
#include <QTimer>
#include <QDebug>
#include "qamqpexchange.h"
@ -22,7 +24,8 @@ QString QAmqpExchangePrivate::typeToString(QAmqpExchange::ExchangeType type)
QAmqpExchangePrivate::QAmqpExchangePrivate(QAmqpExchange *q)
: QAmqpChannelPrivate(q),
delayedDeclare(false),
declared(false)
declared(false),
nextDeliveryTag(0)
{
}
@ -58,29 +61,36 @@ void QAmqpExchangePrivate::declare()
bool QAmqpExchangePrivate::_q_method(const QAmqpMethodFrame &frame)
{
Q_Q(QAmqpExchange);
if (QAmqpChannelPrivate::_q_method(frame))
return true;
if (frame.methodClass() == QAmqpFrame::Exchange) {
if (frame.methodClass() == QAmqpFrame::Basic) {
switch (frame.id()) {
case miDeclareOk:
declareOk(frame);
break;
case miDeleteOk:
deleteOk(frame);
case bmAck:
case bmNack:
handleAckOrNack(frame);
break;
case bmReturn: basicReturn(frame); break;
default:
break;
}
return true;
} else if (frame.methodClass() == QAmqpFrame::Basic) {
}
if (frame.methodClass() == QAmqpFrame::Confirm) {
if (frame.id() == cmConfirmOk) {
Q_EMIT q->confirmsEnabled();
return true;
}
}
if (frame.methodClass() == QAmqpFrame::Exchange) {
switch (frame.id()) {
case bmReturn:
basicReturn(frame);
break;
case miDeclareOk: declareOk(frame); break;
case miDeleteOk: deleteOk(frame); break;
default:
break;
@ -145,6 +155,39 @@ void QAmqpExchangePrivate::basicReturn(const QAmqpMethodFrame &frame)
qAmqpDebug(">> routingKey: %s", qPrintable(routingKey));
}
void QAmqpExchangePrivate::handleAckOrNack(const QAmqpMethodFrame &frame)
{
Q_Q(QAmqpExchange);
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qlonglong deliveryTag =
QAmqpFrame::readAmqpField(stream, QAmqpMetaType::LongLongUint).toLongLong();
bool multiple = QAmqpFrame::readAmqpField(stream, QAmqpMetaType::Boolean).toBool();
if (frame.id() == QAmqpExchangePrivate::bmAck) {
if (deliveryTag == 0) {
unconfirmedDeliveryTags.clear();
} else {
int idx = unconfirmedDeliveryTags.indexOf(deliveryTag);
if (idx == -1) {
return;
}
if (multiple) {
unconfirmedDeliveryTags.remove(0, idx + 1);
} else {
unconfirmedDeliveryTags.remove(idx);
}
}
if (unconfirmedDeliveryTags.isEmpty())
Q_EMIT q->allMessagesDelivered();
} else {
qAmqpDebug() << "nacked(" << deliveryTag << "), multiple=" << multiple;
}
}
//////////////////////////////////////////////////////////////////////////
QAmqpExchange::QAmqpExchange(int channelNumber, QAmqpClient *parent)
@ -231,6 +274,11 @@ void QAmqpExchange::publish(const QByteArray &message, const QString &routingKey
const QAmqpMessage::PropertyHash &properties, int publishOptions)
{
Q_D(QAmqpExchange);
if (d->nextDeliveryTag > 0) {
d->unconfirmedDeliveryTags.append(d->nextDeliveryTag);
d->nextDeliveryTag++;
}
QAmqpMethodFrame frame(QAmqpFrame::Basic, QAmqpExchangePrivate::bmPublish);
frame.setChannel(d->channelNumber);
@ -268,3 +316,32 @@ void QAmqpExchange::publish(const QByteArray &message, const QString &routingKey
d->sendFrame(body);
}
}
void QAmqpExchange::enableConfirms(bool noWait)
{
Q_D(QAmqpExchange);
QAmqpMethodFrame frame(QAmqpFrame::Confirm, QAmqpExchangePrivate::cmConfirm);
frame.setChannel(d->channelNumber);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
stream << qint8(noWait ? 1 : 0);
frame.setArguments(arguments);
d->sendFrame(frame);
// for tracking acks and nacks
if (d->nextDeliveryTag == 0) d->nextDeliveryTag = 1;
}
bool QAmqpExchange::waitForConfirms(int msecs)
{
Q_D(QAmqpExchange);
QEventLoop loop;
connect(this, SIGNAL(allMessagesDelivered()), &loop, SLOT(quit()));
QTimer::singleShot(msecs, &loop, SLOT(quit()));
loop.exec();
return (d->unconfirmedDeliveryTags.isEmpty());
}

View File

@ -17,6 +17,8 @@ class QAMQP_EXPORT QAmqpExchange : public QAmqpChannel
Q_ENUMS(ExchangeOptions)
public:
virtual ~QAmqpExchange();
enum ExchangeType {
Direct,
FanOut,
@ -50,13 +52,11 @@ public:
Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption)
ExchangeOptions options() const;
virtual ~QAmqpExchange();
// AMQP Exchange
void declare(ExchangeType type = Direct,
ExchangeOptions options = NoOptions,
const QAmqpTable &args = QAmqpTable());
void declare(const QString &type = QLatin1String("direct"),
void declare(const QString &type,
ExchangeOptions options = NoOptions,
const QAmqpTable &args = QAmqpTable());
void remove(int options = roIfUnused|roNoWait);
@ -73,10 +73,16 @@ public:
const QAmqpMessage::PropertyHash &properties = QAmqpMessage::PropertyHash(),
int publishOptions = poNoOptions);
void enableConfirms(bool noWait = false);
bool waitForConfirms(int msecs = 30000);
Q_SIGNALS:
void declared();
void removed();
void confirmsEnabled();
void allMessagesDelivered();
protected:
virtual void channelOpened();
virtual void channelClosed();
@ -86,7 +92,6 @@ private:
Q_DISABLE_COPY(QAmqpExchange)
Q_DECLARE_PRIVATE(QAmqpExchange)
friend class QAmqpClient;
};

View File

@ -13,6 +13,10 @@ public:
METHOD_ID_ENUM(miDelete, 20)
};
enum ConfirmMethod {
METHOD_ID_ENUM(cmConfirm, 10)
};
QAmqpExchangePrivate(QAmqpExchange *q);
static QString typeToString(QAmqpExchange::ExchangeType type);
@ -24,12 +28,15 @@ public:
void declareOk(const QAmqpMethodFrame &frame);
void deleteOk(const QAmqpMethodFrame &frame);
void basicReturn(const QAmqpMethodFrame &frame);
void handleAckOrNack(const QAmqpMethodFrame &frame);
QString type;
QAmqpExchange::ExchangeOptions options;
QAmqpTable arguments;
bool delayedDeclare;
bool declared;
qlonglong nextDeliveryTag;
QVector<qlonglong> unconfirmedDeliveryTags;
Q_DECLARE_PUBLIC(QAmqpExchange)
};

View File

@ -31,7 +31,8 @@ public:
Exchange = 40,
Queue = 50,
Basic = 60,
Tx = 90,
Confirm = 85,
Tx = 90
};
virtual ~QAmqpFrame();

View File

@ -502,6 +502,11 @@ void QAmqpQueue::get(bool noAck)
}
void QAmqpQueue::ack(const QAmqpMessage &message)
{
ack(message.deliveryTag(), false);
}
void QAmqpQueue::ack(qlonglong deliveryTag, bool multiple)
{
Q_D(QAmqpQueue);
if (!d->opened) {
@ -515,8 +520,8 @@ void QAmqpQueue::ack(const QAmqpMessage &message)
QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly);
out << message.deliveryTag();
out << qint8(0); // multiple
out << deliveryTag;
out << qint8(multiple ? 1 : 0); // multiple
frame.setArguments(arguments);
d->sendFrame(frame);

View File

@ -52,6 +52,7 @@ public:
~QAmqpQueue();
bool isConsuming() const;
void setConsumerTag(const QString &consumerTag);
QString consumerTag() const;
@ -67,9 +68,11 @@ public:
// AMQP Basic
bool consume(int options = NoOptions);
void get(bool noAck = true);
void ack(const QAmqpMessage &message);
bool cancel(bool noWait = false);
void ack(const QAmqpMessage &message);
void ack(qlonglong deliveryTag, bool multiple);
Q_SIGNALS:
void declared();
void bound();

View File

@ -23,6 +23,8 @@ private Q_SLOTS:
void removeIfUnused();
void invalidMandatoryRouting();
void invalidImmediateRouting();
void confirmsSupport();
void confirmDontLoseMessages();
private:
QScopedPointer<QAmqpClient> client;
@ -167,5 +169,26 @@ void tst_QAMQPExchange::invalidImmediateRouting()
QCOMPARE(client->error(), QAMQP::NotImplementedError);
}
void tst_QAMQPExchange::confirmsSupport()
{
QAmqpExchange *exchange = client->createExchange("confirm-test");
exchange->enableConfirms();
QVERIFY(waitForSignal(exchange, SIGNAL(confirmsEnabled())));
}
void tst_QAMQPExchange::confirmDontLoseMessages()
{
QAmqpExchange *defaultExchange = client->createExchange();
defaultExchange->enableConfirms();
QVERIFY(waitForSignal(defaultExchange, SIGNAL(confirmsEnabled())));
QAmqpMessage::PropertyHash properties;
properties[QAmqpMessage::DeliveryMode] = "2"; // make message persistent
for (int i = 0; i < 10000; ++i)
defaultExchange->publish("noop", "confirms-test", properties);
QVERIFY(defaultExchange->waitForConfirms());
}
QTEST_MAIN(tst_QAMQPExchange)
#include "tst_qamqpexchange.moc"