ensure consuming only occurs once per queue
a Queue can act as a consumer in QAMQP, but previously we did no checks to ensure that a Queue sent a request to consume only once. This commit adds API for checking and ensuring that this is the case. Also, I reverted the changes to allow multiple consumer tags because this was incorrect. Also included is a new signal "consuming" to notify when consumption occurs, and auto tests have been updated to use this, as well as verify that it can only occur once per-consumer
This commit is contained in:
parent
50e38c7455
commit
c43c2d6788
|
|
@ -15,7 +15,8 @@ QueuePrivate::QueuePrivate(Queue *q)
|
||||||
delayedDeclare(false),
|
delayedDeclare(false),
|
||||||
declared(false),
|
declared(false),
|
||||||
noAck(true),
|
noAck(true),
|
||||||
recievingMessage(false)
|
recievingMessage(false),
|
||||||
|
consuming(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -190,12 +191,14 @@ void QueuePrivate::getOk(const Frame::Method &frame)
|
||||||
|
|
||||||
void QueuePrivate::consumeOk(const Frame::Method &frame)
|
void QueuePrivate::consumeOk(const Frame::Method &frame)
|
||||||
{
|
{
|
||||||
|
Q_Q(Queue);
|
||||||
qAmqpDebug() << "consume ok: " << name;
|
qAmqpDebug() << "consume ok: " << name;
|
||||||
QByteArray data = frame.arguments();
|
QByteArray data = frame.arguments();
|
||||||
QDataStream stream(&data, QIODevice::ReadOnly);
|
QDataStream stream(&data, QIODevice::ReadOnly);
|
||||||
QString consumerTag = Frame::readField('s',stream).toString();
|
consumerTag = Frame::readField('s',stream).toString();
|
||||||
qAmqpDebug("consumer tag = %s", qPrintable(consumerTag));
|
qAmqpDebug("consumer tag = %s", qPrintable(consumerTag));
|
||||||
consumerTags.append(consumerTag);
|
consuming = true;
|
||||||
|
Q_EMIT q->consuming(consumerTag);
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::deliver(const Frame::Method &frame)
|
void QueuePrivate::deliver(const Frame::Method &frame)
|
||||||
|
|
@ -204,7 +207,7 @@ void QueuePrivate::deliver(const Frame::Method &frame)
|
||||||
QByteArray data = frame.arguments();
|
QByteArray data = frame.arguments();
|
||||||
QDataStream in(&data, QIODevice::ReadOnly);
|
QDataStream in(&data, QIODevice::ReadOnly);
|
||||||
QString consumer = Frame::readField('s',in).toString();
|
QString consumer = Frame::readField('s',in).toString();
|
||||||
if (!consumerTags.contains(consumer)) {
|
if (consumerTag != consumer) {
|
||||||
qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer;
|
qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -411,12 +414,17 @@ void Queue::unbind(const QString &exchangeName, const QString &key)
|
||||||
d->sendFrame(frame);
|
d->sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Queue::consume(int options)
|
bool Queue::consume(int options)
|
||||||
{
|
{
|
||||||
Q_D(Queue);
|
Q_D(Queue);
|
||||||
if (!d->opened) {
|
if (!d->opened) {
|
||||||
qAmqpDebug() << Q_FUNC_INFO << "queue is not open";
|
qAmqpDebug() << Q_FUNC_INFO << "queue is not open";
|
||||||
return;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (d->consuming) {
|
||||||
|
qAmqpDebug() << Q_FUNC_INFO << "already consuming with tag: " << d->consumerTag;
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
Frame::Method frame(Frame::fcBasic, QueuePrivate::bmConsume);
|
Frame::Method frame(Frame::fcBasic, QueuePrivate::bmConsume);
|
||||||
|
|
@ -427,25 +435,32 @@ void Queue::consume(int options)
|
||||||
|
|
||||||
out << qint16(0); //reserved 1
|
out << qint16(0); //reserved 1
|
||||||
Frame::writeField('s', out, d->name);
|
Frame::writeField('s', out, d->name);
|
||||||
Frame::writeField('s', out, d->explicitConsumerTag);
|
Frame::writeField('s', out, d->consumerTag);
|
||||||
|
|
||||||
out << qint8(options);
|
out << qint8(options);
|
||||||
Frame::writeField('F', out, Frame::TableField());
|
Frame::writeField('F', out, Frame::TableField());
|
||||||
|
|
||||||
frame.setArguments(arguments);
|
frame.setArguments(arguments);
|
||||||
d->sendFrame(frame);
|
d->sendFrame(frame);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Queue::setConsumerTag(const QString &consumerTag)
|
void Queue::setConsumerTag(const QString &consumerTag)
|
||||||
{
|
{
|
||||||
Q_D(Queue);
|
Q_D(Queue);
|
||||||
d->explicitConsumerTag = consumerTag;
|
d->consumerTag = consumerTag;
|
||||||
}
|
}
|
||||||
|
|
||||||
QString Queue::consumerTag() const
|
QString Queue::consumerTag() const
|
||||||
{
|
{
|
||||||
Q_D(const Queue);
|
Q_D(const Queue);
|
||||||
return d->explicitConsumerTag;
|
return d->consumerTag;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Queue::isConsuming() const
|
||||||
|
{
|
||||||
|
Q_D(const Queue);
|
||||||
|
return d->consuming;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Queue::get()
|
void Queue::get()
|
||||||
|
|
|
||||||
|
|
@ -55,6 +55,8 @@ public:
|
||||||
|
|
||||||
~Queue();
|
~Queue();
|
||||||
|
|
||||||
|
bool isConsuming() const;
|
||||||
|
|
||||||
void setConsumerTag(const QString &consumerTag);
|
void setConsumerTag(const QString &consumerTag);
|
||||||
QString consumerTag() const;
|
QString consumerTag() const;
|
||||||
|
|
||||||
|
|
@ -71,7 +73,7 @@ public:
|
||||||
void remove(int options = roIfUnused|roIfEmpty|roNoWait);
|
void remove(int options = roIfUnused|roIfEmpty|roNoWait);
|
||||||
|
|
||||||
// AMQP Basic
|
// AMQP Basic
|
||||||
void consume(int options = NoOptions);
|
bool consume(int options = NoOptions);
|
||||||
void get();
|
void get();
|
||||||
void ack(const Message &message);
|
void ack(const Message &message);
|
||||||
|
|
||||||
|
|
@ -83,6 +85,7 @@ Q_SIGNALS:
|
||||||
void messageReceived();
|
void messageReceived();
|
||||||
void empty();
|
void empty();
|
||||||
void purged(int messageCount);
|
void purged(int messageCount);
|
||||||
|
void consuming(const QString &consumerTag);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// reimp Channel
|
// reimp Channel
|
||||||
|
|
|
||||||
|
|
@ -45,11 +45,11 @@ public:
|
||||||
bool delayedDeclare;
|
bool delayedDeclare;
|
||||||
bool declared;
|
bool declared;
|
||||||
bool noAck;
|
bool noAck;
|
||||||
QString explicitConsumerTag;
|
QString consumerTag;
|
||||||
QStringList consumerTags;
|
|
||||||
QQueue<QPair<QString, QString> > delayedBindings;
|
QQueue<QPair<QString, QString> > delayedBindings;
|
||||||
bool recievingMessage;
|
bool recievingMessage;
|
||||||
Message currentMessage;
|
Message currentMessage;
|
||||||
|
bool consuming;
|
||||||
|
|
||||||
Q_DECLARE_PUBLIC(Queue)
|
Q_DECLARE_PUBLIC(Queue)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ private Q_SLOTS:
|
||||||
void removeIfEmpty();
|
void removeIfEmpty();
|
||||||
void unbind();
|
void unbind();
|
||||||
void purge();
|
void purge();
|
||||||
|
void canOnlyStartConsumingOnce();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
QScopedPointer<Client> client;
|
QScopedPointer<Client> client;
|
||||||
|
|
@ -56,7 +57,8 @@ void tst_QAMQPQueue::defaultExchange()
|
||||||
Queue *queue = client->createQueue("test-default-exchange");
|
Queue *queue = client->createQueue("test-default-exchange");
|
||||||
queue->declare();
|
queue->declare();
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
queue->consume();
|
QVERIFY(queue->consume());
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
|
||||||
|
|
||||||
Exchange *defaultExchange = client->createExchange();
|
Exchange *defaultExchange = client->createExchange();
|
||||||
defaultExchange->publish("first message", "test-default-exchange");
|
defaultExchange->publish("first message", "test-default-exchange");
|
||||||
|
|
@ -85,8 +87,9 @@ void tst_QAMQPQueue::standardExchanges()
|
||||||
Queue *queue = client->createQueue(queueName);
|
Queue *queue = client->createQueue(queueName);
|
||||||
queue->declare();
|
queue->declare();
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
queue->consume(); // required because AutoDelete will not delete if
|
QVERIFY(queue->consume()); // required because AutoDelete will not delete if
|
||||||
// there was never a consumer
|
// there was never a consumer
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
|
||||||
|
|
||||||
queue->bind(exchange, routingKey);
|
queue->bind(exchange, routingKey);
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(bound())));
|
QVERIFY(waitForSignal(queue, SIGNAL(bound())));
|
||||||
|
|
@ -126,7 +129,8 @@ void tst_QAMQPQueue::invalidBind()
|
||||||
Queue *queue = client->createQueue("test-invalid-bind");
|
Queue *queue = client->createQueue("test-invalid-bind");
|
||||||
queue->declare();
|
queue->declare();
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
queue->consume(); // for autodelete
|
QVERIFY(queue->consume()); // for autodelete
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
|
||||||
|
|
||||||
queue->bind("non-existant-exchange", "routingKey");
|
queue->bind("non-existant-exchange", "routingKey");
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(error(QAMQP::Error))));
|
QVERIFY(waitForSignal(queue, SIGNAL(error(QAMQP::Error))));
|
||||||
|
|
@ -138,7 +142,8 @@ void tst_QAMQPQueue::unnamed()
|
||||||
Queue *queue = client->createQueue();
|
Queue *queue = client->createQueue();
|
||||||
queue->declare();
|
queue->declare();
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
queue->consume();
|
QVERIFY(queue->consume());
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
|
||||||
|
|
||||||
QVERIFY(!queue->name().isEmpty());
|
QVERIFY(!queue->name().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
@ -204,7 +209,8 @@ void tst_QAMQPQueue::removeIfUnused()
|
||||||
Queue *queue = client->createQueue("test-remove-if-unused");
|
Queue *queue = client->createQueue("test-remove-if-unused");
|
||||||
queue->declare();
|
queue->declare();
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
queue->consume();
|
QVERIFY(queue->consume());
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
|
||||||
|
|
||||||
queue->remove(Queue::roIfUnused);
|
queue->remove(Queue::roIfUnused);
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(error(QAMQP::Error))));
|
QVERIFY(waitForSignal(queue, SIGNAL(error(QAMQP::Error))));
|
||||||
|
|
@ -249,8 +255,9 @@ void tst_QAMQPQueue::unbind()
|
||||||
Queue *queue = client->createQueue("test-unbind");
|
Queue *queue = client->createQueue("test-unbind");
|
||||||
queue->declare();
|
queue->declare();
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
queue->consume(); // required because AutoDelete will not delete if
|
QVERIFY(queue->consume()); // required because AutoDelete will not delete if
|
||||||
// there was never a consumer
|
// there was never a consumer
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
|
||||||
|
|
||||||
queue->bind("amq.topic", "routingKey");
|
queue->bind("amq.topic", "routingKey");
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(bound())));
|
QVERIFY(waitForSignal(queue, SIGNAL(bound())));
|
||||||
|
|
@ -294,5 +301,19 @@ void tst_QAMQPQueue::purge()
|
||||||
QVERIFY(waitForSignal(queue, SIGNAL(removed())));
|
QVERIFY(waitForSignal(queue, SIGNAL(removed())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tst_QAMQPQueue::canOnlyStartConsumingOnce()
|
||||||
|
{
|
||||||
|
Queue *queue = client->createQueue("test-single-consumer");
|
||||||
|
queue->declare();
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
|
||||||
|
QVERIFY(queue->consume());
|
||||||
|
QSignalSpy spy(queue, SIGNAL(consuming(QString)));
|
||||||
|
QVERIFY(waitForSignal(queue, SIGNAL(consuming(QString))));
|
||||||
|
QVERIFY(queue->isConsuming());
|
||||||
|
QList<QVariant> arguments = spy.takeFirst();
|
||||||
|
QCOMPARE(arguments.at(0).toString(), queue->consumerTag());
|
||||||
|
QCOMPARE(queue->consume(), false);
|
||||||
|
}
|
||||||
|
|
||||||
QTEST_MAIN(tst_QAMQPQueue)
|
QTEST_MAIN(tst_QAMQPQueue)
|
||||||
#include "tst_qamqpqueue.moc"
|
#include "tst_qamqpqueue.moc"
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,6 @@ private Q_SLOTS:
|
||||||
void exchangeDeclared() {
|
void exchangeDeclared() {
|
||||||
Queue *temporaryQueue = m_client.createQueue();
|
Queue *temporaryQueue = m_client.createQueue();
|
||||||
connect(temporaryQueue, SIGNAL(declared()), this, SLOT(queueDeclared()));
|
connect(temporaryQueue, SIGNAL(declared()), this, SLOT(queueDeclared()));
|
||||||
connect(temporaryQueue, SIGNAL(bound()), this, SLOT(queueBound()));
|
|
||||||
connect(temporaryQueue, SIGNAL(messageReceived()), this, SLOT(messageReceived()));
|
connect(temporaryQueue, SIGNAL(messageReceived()), this, SLOT(messageReceived()));
|
||||||
temporaryQueue->declare(Queue::Exclusive);
|
temporaryQueue->declare(Queue::Exclusive);
|
||||||
}
|
}
|
||||||
|
|
@ -40,18 +39,14 @@ private Q_SLOTS:
|
||||||
if (!temporaryQueue)
|
if (!temporaryQueue)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
// start consuming
|
||||||
|
temporaryQueue->consume(Queue::coNoAck);
|
||||||
|
|
||||||
foreach (QString severity, m_severities)
|
foreach (QString severity, m_severities)
|
||||||
temporaryQueue->bind("direct_logs", severity);
|
temporaryQueue->bind("direct_logs", severity);
|
||||||
qDebug() << " [*] Waiting for logs. To exit press CTRL+C";
|
qDebug() << " [*] Waiting for logs. To exit press CTRL+C";
|
||||||
}
|
}
|
||||||
|
|
||||||
void queueBound() {
|
|
||||||
Queue *temporaryQueue = qobject_cast<Queue*>(sender());
|
|
||||||
if (!temporaryQueue)
|
|
||||||
return;
|
|
||||||
temporaryQueue->consume(Queue::coNoAck);
|
|
||||||
}
|
|
||||||
|
|
||||||
void messageReceived() {
|
void messageReceived() {
|
||||||
Queue *temporaryQueue = qobject_cast<Queue*>(sender());
|
Queue *temporaryQueue = qobject_cast<Queue*>(sender());
|
||||||
if (!temporaryQueue)
|
if (!temporaryQueue)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue