add publish options

- add publish options to Exchange (these need to move to a Basic abstraction)
- listen for bmReturn in Exchange to catch errors
- added auto tests to check behavior of an invalid mandatory publish
- cleaned up bit fields in exchange and queue
This commit is contained in:
Matt Broadstone 2014-06-23 15:36:03 -04:00
parent cb52911bed
commit 4f808bef92
6 changed files with 93 additions and 27 deletions

View File

@ -64,19 +64,37 @@ bool ExchangePrivate::_q_method(const Frame::Method &frame)
if (ChannelPrivate::_q_method(frame))
return true;
if (frame.methodClass() != Frame::fcExchange)
return false;
switch(frame.id()) {
if (frame.methodClass() == Frame::fcExchange) {
switch (frame.id()) {
case miDeclareOk:
declareOk(frame);
break;
case miDeleteOk:
deleteOk(frame);
break;
default:
qDebug() << Q_FUNC_INFO << "unhandled exchange method: " << frame.id();
break;
}
return true;
} else if (frame.methodClass() == Frame::fcBasic) {
switch (frame.id()) {
case bmReturn:
basicReturn(frame);
break;
default:
qDebug() << Q_FUNC_INFO << "unhandled basic method: " << frame.id();
break;
}
return true;
}
return false;
}
void ExchangePrivate::declareOk(const Frame::Method &frame)
@ -107,6 +125,31 @@ void ExchangePrivate::_q_disconnected()
declared = false;
}
void ExchangePrivate::basicReturn(const Frame::Method &frame)
{
Q_Q(Exchange);
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
quint16 replyCode;
stream >> replyCode;
QString replyText = Frame::readField('s', stream).toString();
QString exchangeName = Frame::readField('s', stream).toString();
QString routingKey = Frame::readField('s', stream).toString();
Error checkError = static_cast<Error>(replyCode);
if (checkError != QAMQP::NoError) {
error = checkError;
errorString = qPrintable(replyText);
Q_EMIT q->error(error);
}
qAmqpDebug(">> replyCode: %d", replyCode);
qAmqpDebug(">> replyText: %s", qPrintable(replyText));
qAmqpDebug(">> exchangeName: %s", qPrintable(exchangeName));
qAmqpDebug(">> routingKey: %s", qPrintable(routingKey));
}
//////////////////////////////////////////////////////////////////////////
Exchange::Exchange(int channelNumber, Client *parent)
@ -175,20 +218,22 @@ void Exchange::remove(int options)
}
void Exchange::publish(const QString &message, const QString &routingKey,
const MessageProperties &properties)
const MessageProperties &properties, int publishOptions)
{
publish(message.toUtf8(), routingKey, QLatin1String("text.plain"), QVariantHash(), properties);
publish(message.toUtf8(), routingKey, QLatin1String("text.plain"),
QVariantHash(), properties, publishOptions);
}
void Exchange::publish(const QByteArray &message, const QString &routingKey,
const QString &mimeType, const MessageProperties &properties)
const QString &mimeType, const MessageProperties &properties,
int publishOptions)
{
publish(message, routingKey, mimeType, QVariantHash(), properties);
publish(message, routingKey, mimeType, QVariantHash(), properties, publishOptions);
}
void Exchange::publish(const QByteArray &message, const QString &routingKey,
const QString &mimeType, const QVariantHash &headers,
const MessageProperties &properties)
const MessageProperties &properties, int publishOptions)
{
Q_D(Exchange);
Frame::Method frame(Frame::fcBasic, ExchangePrivate::bmPublish);
@ -200,7 +245,7 @@ void Exchange::publish(const QByteArray &message, const QString &routingKey,
out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name);
Frame::writeField('s', out, routingKey);
out << qint8(0);
out << qint8(publishOptions);
frame.setArguments(arguments);
d->sendFrame(frame);

View File

@ -26,9 +26,16 @@ public:
};
QString type() const;
enum PublishOption {
poNoOptions = 0x0,
poMandatory = 0x01,
poImmediate = 0x02
};
Q_DECLARE_FLAGS(PublishOptions, PublishOption)
enum RemoveOption {
roForce = 0x0,
roIfUnused = 0x1,
roIfUnused = 0x01,
roNoWait = 0x04
};
Q_DECLARE_FLAGS(RemoveOptions, RemoveOption)
@ -37,8 +44,8 @@ public:
NoOptions = 0x0,
Passive = 0x01,
Durable = 0x02,
AutoDelete = 0x4,
Internal = 0x8,
AutoDelete = 0x04,
Internal = 0x08,
NoWait = 0x10
};
Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption)
@ -57,12 +64,15 @@ public:
// AMQP Basic
void publish(const QString &message, const QString &routingKey,
const MessageProperties &properties = MessageProperties());
const MessageProperties &properties = MessageProperties(),
int publishOptions = poNoOptions);
void publish(const QByteArray &message, const QString &routingKey,
const QString &mimeType, const MessageProperties &properties = MessageProperties());
const QString &mimeType, const MessageProperties &properties = MessageProperties(),
int publishOptions = poNoOptions);
void publish(const QByteArray &message, const QString &routingKey,
const QString &mimeType, const QVariantHash &headers,
const MessageProperties &properties = MessageProperties());
const MessageProperties &properties = MessageProperties(),
int publishOptions = poNoOptions);
Q_SIGNALS:
void declared();

View File

@ -25,6 +25,7 @@ public:
virtual bool _q_method(const Frame::Method &frame);
void declareOk(const Frame::Method &frame);
void deleteOk(const Frame::Method &frame);
void basicReturn(const Frame::Method &frame);
QString type;
Exchange::ExchangeOptions options;

View File

@ -35,6 +35,7 @@ namespace QAMQP {
enum Error {
NoError = 0,
ContentTooLargeError = 311,
UnroutableKey = 312,
NoConsumersError = 313,
ConnectionForcedError = 320,
InvalidPathError = 402,

View File

@ -30,24 +30,24 @@ public:
NoOptions = 0x0,
Passive = 0x01,
Durable = 0x02,
Exclusive = 0x4,
AutoDelete = 0x8,
Exclusive = 0x04,
AutoDelete = 0x08,
NoWait = 0x10
};
Q_DECLARE_FLAGS(QueueOptions, QueueOption)
int options() const;
enum ConsumeOption {
coNoLocal = 0x1,
coNoLocal = 0x01,
coNoAck = 0x02,
coExclusive = 0x04,
coNoWait = 0x8
coNoWait = 0x08
};
Q_DECLARE_FLAGS(ConsumeOptions, ConsumeOption)
enum RemoveOption {
roForce = 0x0,
roIfUnused = 0x1,
roIfUnused = 0x01,
roIfEmpty = 0x02,
roNoWait = 0x04
};

View File

@ -22,6 +22,7 @@ private Q_SLOTS:
void invalidDeclaration();
void invalidRedeclaration();
void removeIfUnused();
void invalidMandatoryRouting();
private:
QScopedPointer<Client> client;
@ -140,5 +141,13 @@ void tst_QAMQPExchange::removeIfUnused()
QVERIFY(waitForSignal(queue, SIGNAL(removed())));
}
void tst_QAMQPExchange::invalidMandatoryRouting()
{
Exchange *defaultExchange = client->createExchange();
defaultExchange->publish("some message", "unroutable-key", MessageProperties(), Exchange::poMandatory);
QVERIFY(waitForSignal(defaultExchange, SIGNAL(error(QAMQP::Error))));
QCOMPARE(defaultExchange->error(), QAMQP::UnroutableKey);
}
QTEST_MAIN(tst_QAMQPExchange)
#include "tst_qamqpexchange.moc"