From d5cc6258c6393751466a39e15009ae099f9f03a3 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Sun, 3 Aug 2014 16:39:31 -0400 Subject: [PATCH] refactor type handling in QAMQP There are two distinctive type sets in AMQP, the basic AMQP types and then the superset of Table value types. This commit attempts to make that distinction more clear by the addition of a Table class. Basically, AMQP value type read/write support is left in Frame for the time being, while table value field read/write support has been moved to the Table class. Also, a number of type differences exist between the spec and rabbitmq's binary parser (noted in the errata page) which were not previously honored. --- src/amqp_authenticator.cpp | 10 +- src/amqp_channel.cpp | 7 +- src/amqp_client.cpp | 44 +-- src/amqp_client_p.h | 9 +- src/amqp_exchange.cpp | 24 +- src/amqp_exchange.h | 7 +- src/amqp_exchange_p.h | 3 +- src/amqp_frame.cpp | 408 ++++++----------------- src/amqp_frame.h | 18 +- src/amqp_global.h | 30 +- src/amqp_queue.cpp | 68 ++-- src/amqp_table.cpp | 371 +++++++++++++++++++++ src/amqp_table.h | 29 ++ src/src.pro | 6 +- tests/auto/qamqpqueue/tst_qamqpqueue.cpp | 82 +++++ 15 files changed, 705 insertions(+), 411 deletions(-) create mode 100644 src/amqp_table.cpp create mode 100644 src/amqp_table.h diff --git a/src/amqp_authenticator.cpp b/src/amqp_authenticator.cpp index 98047c7..c6de043 100644 --- a/src/amqp_authenticator.cpp +++ b/src/amqp_authenticator.cpp @@ -1,6 +1,6 @@ -#include "amqp_authenticator.h" +#include "amqp_table.h" #include "amqp_frame.h" - +#include "amqp_authenticator.h" using namespace QAMQP; AMQPlainAuthenticator::AMQPlainAuthenticator(const QString &l, const QString &p) @@ -40,9 +40,9 @@ void AMQPlainAuthenticator::setPassword(const QString &p) void AMQPlainAuthenticator::write(QDataStream &out) { - Frame::writeField('s', out, type()); - Frame::TableField response; + Frame::writeAmqpField(out, ShortString, type()); + Table response; response["LOGIN"] = login_; response["PASSWORD"] = password_; - Frame::serialize(out, response); + out << response; } diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 675a7aa..2a53db1 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -156,10 +156,10 @@ void ChannelPrivate::close(int code, const QString &text, int classId, int metho QByteArray arguments; QDataStream stream(&arguments, QIODevice::WriteOnly); - Frame::writeField('s',stream, client->virtualHost()); + Frame::writeAmqpField(stream, ShortString, client->virtualHost()); stream << qint16(code); - Frame::writeField('s', stream, text); + Frame::writeAmqpField(stream, ShortString, text); stream << qint16(classId); stream << qint16(methodId); @@ -176,7 +176,8 @@ void ChannelPrivate::close(const Frame::Method &frame) QDataStream stream(&data, QIODevice::ReadOnly); qint16 code = 0, classId, methodId; stream >> code; - QString text(Frame::readField('s', stream).toString()); + QString text = Frame::readAmqpField(stream, ShortString).toString(); + stream >> classId; stream >> methodId; diff --git a/src/amqp_client.cpp b/src/amqp_client.cpp index 610de62..fb7abcc 100644 --- a/src/amqp_client.cpp +++ b/src/amqp_client.cpp @@ -1,18 +1,18 @@ -#include "amqp_client.h" -#include "amqp_client_p.h" -#include "amqp_global.h" -#include "amqp_exchange.h" -#include "amqp_exchange_p.h" -#include "amqp_queue.h" -#include "amqp_queue_p.h" -#include "amqp_authenticator.h" - #include #include #include #include #include +#include "amqp_global.h" +#include "amqp_exchange.h" +#include "amqp_exchange_p.h" +#include "amqp_queue.h" +#include "amqp_queue_p.h" +#include "amqp_authenticator.h" +#include "amqp_table.h" +#include "amqp_client_p.h" +#include "amqp_client.h" using namespace QAMQP; ClientPrivate::ClientPrivate(Client *q) @@ -88,7 +88,6 @@ void ClientPrivate::parseConnectionString(const QString &uri) return; } - port = connectionString.port(AMQP_PORT); host = connectionString.host(); @@ -329,21 +328,22 @@ void ClientPrivate::start(const Frame::Method &frame) qAmqpDebug(">> Start"); QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); + quint8 version_major = 0; quint8 version_minor = 0; - stream >> version_major >> version_minor; - Frame::TableField table; - Frame::deserialize(stream, table); + Table table; + stream >> table; - QStringList mechanisms = Frame::readField('S', stream).toString().split(' '); - QString locales = Frame::readField('S', stream).toString(); + QStringList mechanisms = Frame::readAmqpField(stream, LongString).toString().split(' '); + QString locales = Frame::readAmqpField(stream, LongString).toString(); qAmqpDebug(">> version_major: %d", version_major); qAmqpDebug(">> version_minor: %d", version_minor); - Frame::print(table); + // NOTE: replace with qDebug overload + // Frame::print(table); qAmqpDebug() << ">> mechanisms: " << mechanisms; qAmqpDebug(">> locales: %s", qPrintable(locales)); @@ -427,7 +427,7 @@ void ClientPrivate::close(const Frame::Method &frame) QDataStream stream(&data, QIODevice::ReadOnly); qint16 code = 0, classId, methodId; stream >> code; - QString text(Frame::readField('s', stream).toString()); + QString text = Frame::readAmqpField(stream, ShortString).toString(); stream >> classId; stream >> methodId; @@ -452,15 +452,15 @@ void ClientPrivate::startOk() QByteArray arguments; QDataStream stream(&arguments, QIODevice::WriteOnly); - Frame::TableField clientProperties; + Table clientProperties; clientProperties["version"] = QString(QAMQP_VERSION); clientProperties["platform"] = QString("Qt %1").arg(qVersion()); clientProperties["product"] = QString("QAMQP"); clientProperties.unite(customProperties); - Frame::serialize(stream, clientProperties); + stream << clientProperties; authenticator->write(stream); - Frame::writeField('s', stream, "en_US"); + Frame::writeAmqpField(stream, ShortString, QLatin1String("en_US")); frame.setArguments(arguments); sendFrame(frame); @@ -491,7 +491,7 @@ void ClientPrivate::open() QByteArray arguments; QDataStream stream(&arguments, QIODevice::WriteOnly); - Frame::writeField('s',stream, virtualHost); + Frame::writeAmqpField(stream, ShortString, virtualHost); stream << qint8(0); stream << qint8(0); @@ -507,7 +507,7 @@ void ClientPrivate::close(int code, const QString &text, int classId, int method QDataStream stream(&arguments, QIODevice::WriteOnly); stream << qint16(code); - Frame::writeField('s', stream, text); + Frame::writeAmqpField(stream, ShortString, text); stream << qint16(classId); stream << qint16(methodId); diff --git a/src/amqp_client_p.h b/src/amqp_client_p.h index 916070e..c289d08 100644 --- a/src/amqp_client_p.h +++ b/src/amqp_client_p.h @@ -6,6 +6,12 @@ #include #include +#ifndef QT_NO_SSL +# include +# include +#endif + +#include "amqp_table.h" #include "amqp_frame.h" #define METHOD_ID_ENUM(name, id) name = id, name ## Ok @@ -92,7 +98,7 @@ public: bool closed; bool connected; QPointer heartbeatTimer; - Frame::TableField customProperties; + Table customProperties; qint16 channelMax; qint16 heartbeatDelay; qint32 frameMax; @@ -106,6 +112,7 @@ public: }; #ifndef QT_NO_SSL +class SslClient; class SslClientPrivate : public ClientPrivate { public: diff --git a/src/amqp_exchange.cpp b/src/amqp_exchange.cpp index a514eb5..4f43240 100644 --- a/src/amqp_exchange.cpp +++ b/src/amqp_exchange.cpp @@ -48,11 +48,11 @@ void ExchangePrivate::declare() QDataStream stream(&args, QIODevice::WriteOnly); stream << qint16(0); //reserved 1 - Frame::writeField('s', stream, name); - Frame::writeField('s', stream, type); + Frame::writeAmqpField(stream, ShortString, name); + Frame::writeAmqpField(stream, ShortString, type); stream << qint8(options); - Frame::writeField('F', stream, arguments); + Frame::writeAmqpField(stream, Hash, arguments); frame.setArguments(args); sendFrame(frame); @@ -133,9 +133,9 @@ void ExchangePrivate::basicReturn(const Frame::Method &frame) quint16 replyCode; stream >> replyCode; - QString replyText = Frame::readField('s', stream).toString(); - QString exchangeName = Frame::readField('s', stream).toString(); - QString routingKey = Frame::readField('s', stream).toString(); + QString replyText = Frame::readAmqpField(stream, ShortString).toString(); + QString exchangeName = Frame::readAmqpField(stream, ShortString).toString(); + QString routingKey = Frame::readAmqpField(stream, ShortString).toString(); Error checkError = static_cast(replyCode); if (checkError != QAMQP::NoError) { @@ -186,12 +186,12 @@ QString Exchange::type() const return d->type; } -void Exchange::declare(ExchangeType type, ExchangeOptions options , const Frame::TableField &args) +void Exchange::declare(ExchangeType type, ExchangeOptions options, const Table &args) { declare(ExchangePrivate::typeToString(type), options, args); } -void Exchange::declare(const QString &type, ExchangeOptions options , const Frame::TableField &args) +void Exchange::declare(const QString &type, ExchangeOptions options, const Table &args) { Q_D(Exchange); d->type = type; @@ -210,7 +210,7 @@ void Exchange::remove(int options) QDataStream stream(&arguments, QIODevice::WriteOnly); stream << qint16(0); //reserved 1 - Frame::writeField('s', stream, d->name); + Frame::writeAmqpField(stream, ShortString, d->name); stream << qint8(options); frame.setArguments(arguments); @@ -232,7 +232,7 @@ void Exchange::publish(const QByteArray &message, const QString &routingKey, } void Exchange::publish(const QByteArray &message, const QString &routingKey, - const QString &mimeType, const QVariantHash &headers, + const QString &mimeType, const Table &headers, const MessageProperties &properties, int publishOptions) { Q_D(Exchange); @@ -243,8 +243,8 @@ void Exchange::publish(const QByteArray &message, const QString &routingKey, QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); - Frame::writeField('s', out, routingKey); + Frame::writeAmqpField(out, ShortString, d->name); + Frame::writeAmqpField(out, ShortString, routingKey); out << qint8(publishOptions); frame.setArguments(arguments); diff --git a/src/amqp_exchange.h b/src/amqp_exchange.h index ccf6c6a..654baac 100644 --- a/src/amqp_exchange.h +++ b/src/amqp_exchange.h @@ -1,6 +1,7 @@ #ifndef amqp_exchange_h__ #define amqp_exchange_h__ +#include "amqp_table.h" #include "amqp_channel.h" namespace QAMQP @@ -56,10 +57,10 @@ public: // AMQP Exchange void declare(ExchangeType type = Direct, ExchangeOptions options = NoOptions, - const Frame::TableField &args = Frame::TableField()); + const Table &args = Table()); void declare(const QString &type = QLatin1String("direct"), ExchangeOptions options = NoOptions, - const Frame::TableField &args = Frame::TableField()); + const Table &args = Table()); void remove(int options = roIfUnused|roNoWait); // AMQP Basic @@ -70,7 +71,7 @@ public: const QString &mimeType, const MessageProperties &properties = MessageProperties(), int publishOptions = poNoOptions); void publish(const QByteArray &message, const QString &routingKey, - const QString &mimeType, const QVariantHash &headers, + const QString &mimeType, const Table &headers, const MessageProperties &properties = MessageProperties(), int publishOptions = poNoOptions); diff --git a/src/amqp_exchange_p.h b/src/amqp_exchange_p.h index 04a1e1c..460404b 100644 --- a/src/amqp_exchange_p.h +++ b/src/amqp_exchange_p.h @@ -1,6 +1,7 @@ #ifndef amqp_exchange_p_h__ #define amqp_exchange_p_h__ +#include "amqp_table.h" #include "amqp_exchange.h" #include "amqp_channel_p.h" @@ -29,7 +30,7 @@ public: QString type; Exchange::ExchangeOptions options; - Frame::TableField arguments; + Table arguments; bool delayedDeclare; bool declared; diff --git a/src/amqp_frame.cpp b/src/amqp_frame.cpp index 8058e6b..322e2ac 100644 --- a/src/amqp_frame.cpp +++ b/src/amqp_frame.cpp @@ -1,11 +1,10 @@ #include "amqp_frame.h" -#include - #include #include #include +#include "amqp_table.h" #include "amqp_global.h" using namespace QAMQP; @@ -141,244 +140,99 @@ void Method::writePayload(QDataStream &stream) const ////////////////////////////////////////////////////////////////////////// -QVariant Frame::readField(qint8 valueType, QDataStream &s) +QVariant Frame::readAmqpField(QDataStream &s, QAMQP::ValueType type) { - QVariant value; - QByteArray tmp; - qint8 nameSize_ = 0; - char octet = 0; - - switch(valueType) { - case 't': - s.readRawData(&octet, sizeof(octet)); - value = QVariant::fromValue(octet > 0); - break; - case 'b': - s.readRawData(&octet, sizeof(octet)); - value = QVariant::fromValue(octet); - break; - case 'B': - s.readRawData(&octet, sizeof(octet)); - value = QVariant::fromValue(octet); - break; - case 'U': + switch (type) { + case Boolean: { - qint16 tmp_value_ = 0; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint8 octet = 0; + s >> octet; + return QVariant::fromValue(octet > 0); } - case 'u': + case ShortShortUint: { - quint16 tmp_value_ = 0; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint8 octet = 0; + s >> octet; + return QVariant::fromValue(octet); } - case 'I': + case ShortUint: { - qint32 tmp_value_ = 0; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint16 tmp_value = 0; + s >> tmp_value; + return QVariant::fromValue(tmp_value); } - case 'i': + case LongUint: { - quint32 tmp_value_ = 0; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint32 tmp_value = 0; + s >> tmp_value; + return QVariant::fromValue(tmp_value); } - case 'L': - { - qlonglong v = 0 ; - s >> v; - value = v; - } - - break; - case 'l': + case LongLongUint: { qulonglong v = 0 ; s >> v; - value = v; + return v; } + case ShortString: + { + qint8 size = 0; + QByteArray buffer; - break; - case 'f': - { - float tmp_value_; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + s >> size; + buffer.resize(size); + s.readRawData(buffer.data(), buffer.size()); + return QString::fromUtf8(buffer.data(), size); } - case 'd': + case LongString: { - double tmp_value_; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint32 size = 0; + QByteArray buffer; + + s >> size; + buffer.resize(size); + s.readRawData(buffer.data(), buffer.size()); + return QString::fromUtf8(buffer.data(), buffer.size()); } - case 'D': + case Timestamp: { - Frame::decimal v; - s >> v.scale; - s >> v.value; - value = QVariant::fromValue(v); + qulonglong tmp_value; + s >> tmp_value; + return QDateTime::fromMSecsSinceEpoch(tmp_value); } - break; - case 's': - s >> nameSize_; - tmp.resize(nameSize_); - s.readRawData(tmp.data(), tmp.size()); - value = QString::fromLatin1(tmp.data(), nameSize_); - break; - case 'S': + case Hash: { - quint32 length_ = 0; - s >> length_; - nameSize_ = length_; - tmp.resize(length_); + Table table; + s >> table; + return table; } - s.readRawData(tmp.data(), tmp.size()); - value = QString::fromLatin1(tmp.data(), tmp.size()); - break; - case 'A': - { - qint32 length_ = 0; - qint8 type = 0; - s >> length_; - QList array_; - for (int i =0; i < length_; ++i) { - s >> type; - array_ << readField(type, s); - } - value = array_; - } - break; - case 'T': - { - qulonglong tmp_value_; - s >> tmp_value_; - value = QDateTime::fromMSecsSinceEpoch(tmp_value_); - break; - } - case 'F': - { - TableField table_; - deserialize(s, table_); - value = table_; - } - break; - case 'V': - break; + case Void: + return QVariant(); default: - qWarning("Unknown field type"); + qWarning() << Q_FUNC_INFO << "unsupported value type: " << type; } - return value; + + return QVariant(); } -QDataStream &Frame::deserialize(QDataStream &stream, Frame::TableField &f) +void Frame::writeAmqpField(QDataStream &s, QAMQP::ValueType type, const QVariant &value) { - QByteArray data; - stream >> data; - QDataStream s(&data, QIODevice::ReadOnly); - - while (!s.atEnd()) { - qint8 valueType = 0; - QString name = readField('s', s).toString(); - s >> valueType; - f[name] = readField(valueType, s); - } - - return stream; -} - -QDataStream &Frame::serialize(QDataStream &stream, const TableField &f) -{ - QByteArray data; - QDataStream s(&data, QIODevice::WriteOnly); - TableField::ConstIterator it; - TableField::ConstIterator itEnd = f.constEnd(); - for (it = f.constBegin(); it != itEnd; ++it) { - writeField('s', s, it.key()); - writeField(s, it.value()); - } - - if (data.isEmpty()) { - stream << qint32(0); - } else { - stream << data; - } - - return stream; -} - -void Frame::print(const TableField &f) -{ - TableField::ConstIterator it; - TableField::ConstIterator itEnd = f.constEnd(); - for (it = f.constBegin(); it != itEnd; ++it) { - switch(it.value().type()) { - case QVariant::Hash: - qAmqpDebug() << "\t" << qPrintable(it.key()) << ": FIELD_TABLE"; - break; - case QVariant::List: - qAmqpDebug() << "\t" << qPrintable(it.key()) << ": ARRAY"; - break; - default: - qAmqpDebug() << "\t" << qPrintable(it.key()) << ": " << it.value(); - } - } -} - -void Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType) -{ - if (withType) - s << valueType; - - switch (valueType) { - case 't': + switch (type) { + case Boolean: s << (value.toBool() ? qint8(1) : qint8(0)); break; - case 'b': - s << qint8(value.toInt()); + case ShortShortUint: + s << qint8(value.toUInt()); break; - case 'B': - s << quint8(value.toUInt()); - break; - case 'U': - s << qint16(value.toInt()); - break; - case 'u': + case ShortUint: s << quint16(value.toUInt()); break; - case 'I': - s << qint32(value.toInt()); - break; - case 'i': + case LongUint: s << quint32(value.toUInt()); break; - case 'L': - s << qlonglong(value.toLongLong()); - break; - case 'l': + case LongLongUint: s << qulonglong(value.toULongLong()); break; - case 'f': - s << value.toFloat(); - break; - case 'd': - s << value.toDouble(); - break; - case 'D': - { - Frame::decimal v(value.value()); - s << v.scale; - s << v.value; - } - break; - case 's': + case ShortString: { QString str = value.toString(); if (str.length() >= 256) { @@ -386,104 +240,30 @@ void Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, b } s << quint8(str.length()); - s.writeRawData(str.toLatin1().data(), str.length()); + s.writeRawData(str.toUtf8().data(), str.length()); } break; - case 'S': + case LongString: { QString str = value.toString(); s << quint32(str.length()); s.writeRawData(str.toLatin1().data(), str.length()); } break; - case 'A': - { - QList array_(value.toList()); - s << quint32(array_.count()); - for (int i =0; i < array_.count(); ++i) - writeField(s, array_.at(i)); - } - break; - case 'T': + case Timestamp: s << qulonglong(value.toDateTime().toMSecsSinceEpoch()); break; - case 'F': + case Hash: { - TableField table_(value.toHash()); - serialize(s, table_); + Table table(value.toHash()); + s << table; } break; - case 'V': - break; default: - qWarning("Unknown field type"); + qWarning() << Q_FUNC_INFO << "unsupported value type: " << type; } } -void Frame::writeField(QDataStream &s, const QVariant &value) -{ - char type = 0; - switch (value.type()) { - case QVariant::Bool: - type = 't'; - break; - case QVariant::ByteArray: - type = 'S'; - break; - case QVariant::Int: - { - int i = qAbs(value.toInt()); - if (i <= qint8(0xFF)) { - type = 'b'; - } else if (i <= qint16(0xFFFF)) { - type = 'U'; - } else if (i <= qint16(0xFFFFFFFF)) { - type = 'I'; - } - } - break; - case QVariant::UInt: - { - int i = value.toInt(); - if (i <= qint8(0xFF)) { - type = 'B'; - } else if (i <= qint16(0xFFFF)) { - type = 'u'; - } else if (i <= qint16(0xFFFFFFFF)) { - type = 'i'; - } - } - break; - case QVariant::LongLong: - type = 'L'; - break; - case QVariant::ULongLong: - type = 'l'; - break; - case QVariant::String: - type = 'S'; - break; - case QVariant::DateTime: - type = 'T'; - break; - case QVariant::Double: - type = value.toDouble() > FLT_MAX ? 'd' : 'f'; - break; - case QVariant::Hash: - type = 'F'; - break; - case QVariant::List: - type = 'A'; - break; - - default: - qAmqpDebug() << Q_FUNC_INFO << "unhandled variant type: " << value.type(); - } - - if (type) - writeField(type, s, value, true); -} - ////////////////////////////////////////////////////////////////////////// Content::Content() @@ -522,46 +302,46 @@ qint32 Content::size() const out << prop_; if (prop_ & cpContentType) - writeField('s', out, properties_[cpContentType]); + writeAmqpField(out, ShortString, properties_[cpContentType]); if (prop_ & cpContentEncoding) - writeField('s', out, properties_[cpContentEncoding]); + writeAmqpField(out, ShortString, properties_[cpContentEncoding]); if (prop_ & cpHeaders) - writeField('F', out, properties_[cpHeaders]); + writeAmqpField(out, Hash, properties_[cpHeaders]); if (prop_ & cpDeliveryMode) - writeField('b', out, properties_[cpDeliveryMode]); + writeAmqpField(out, ShortShortUint, properties_[cpDeliveryMode]); if (prop_ & cpPriority) - writeField('b', out, properties_[cpPriority]); + writeAmqpField(out, ShortShortUint, properties_[cpPriority]); if (prop_ & cpCorrelationId) - writeField('s', out, properties_[cpCorrelationId]); + writeAmqpField(out, ShortString, properties_[cpCorrelationId]); if (prop_ & cpReplyTo) - writeField('s', out, properties_[cpReplyTo]); + writeAmqpField(out, ShortString, properties_[cpReplyTo]); if (prop_ & cpExpiration) - writeField('s', out, properties_[cpExpiration]); + writeAmqpField(out, ShortString, properties_[cpExpiration]); if (prop_ & cpMessageId) - writeField('s', out, properties_[cpMessageId]); + writeAmqpField(out, ShortString, properties_[cpMessageId]); if (prop_ & cpTimestamp) - writeField('T', out, properties_[cpTimestamp]); + writeAmqpField(out, Timestamp, properties_[cpTimestamp]); if (prop_ & cpType) - writeField('s', out, properties_[cpType]); + writeAmqpField(out, ShortString, properties_[cpType]); if (prop_ & cpUserId) - writeField('s', out, properties_[cpUserId]); + writeAmqpField(out, ShortString, properties_[cpUserId]); if (prop_ & cpAppId) - writeField('s', out, properties_[cpAppId]); + writeAmqpField(out, ShortString, properties_[cpAppId]); if (prop_ & cpClusterID) - writeField('s', out, properties_[cpClusterID]); + writeAmqpField(out, ShortString, properties_[cpClusterID]); return buffer_.size(); } @@ -599,46 +379,46 @@ void Content::readPayload(QDataStream &in) qint16 flags_ = 0; in >> flags_; if (flags_ & cpContentType) - properties_[cpContentType] = readField('s', in); + properties_[cpContentType] = readAmqpField(in, ShortString); if (flags_ & cpContentEncoding) - properties_[cpContentEncoding] = readField('s', in); + properties_[cpContentEncoding] = readAmqpField(in, ShortString); if (flags_ & cpHeaders) - properties_[cpHeaders] = readField('F', in); + properties_[cpHeaders] = readAmqpField(in, Hash); if (flags_ & cpDeliveryMode) - properties_[cpDeliveryMode] = readField('b', in); + properties_[cpDeliveryMode] = readAmqpField(in, ShortShortUint); if (flags_ & cpPriority) - properties_[cpPriority] = readField('b', in); + properties_[cpPriority] = readAmqpField(in, ShortShortUint); if (flags_ & cpCorrelationId) - properties_[cpCorrelationId] = readField('s', in); + properties_[cpCorrelationId] = readAmqpField(in, ShortString); if (flags_ & cpReplyTo) - properties_[cpReplyTo] = readField('s', in); + properties_[cpReplyTo] = readAmqpField(in, ShortString); if (flags_ & cpExpiration) - properties_[cpExpiration] = readField('s', in); + properties_[cpExpiration] = readAmqpField(in, ShortString); if (flags_ & cpMessageId) - properties_[cpMessageId] = readField('s', in); + properties_[cpMessageId] = readAmqpField(in, ShortString); if (flags_ & cpTimestamp) - properties_[cpTimestamp] = readField('T', in); + properties_[cpTimestamp] = readAmqpField(in, Timestamp); if (flags_ & cpType) - properties_[cpType] = readField('s', in); + properties_[cpType] = readAmqpField(in, ShortString); if (flags_ & cpUserId) - properties_[cpUserId] = readField('s', in); + properties_[cpUserId] = readAmqpField(in, ShortString); if (flags_ & cpAppId) - properties_[cpAppId] = readField('s', in); + properties_[cpAppId] = readAmqpField(in, ShortString); if (flags_ & cpClusterID) - properties_[cpClusterID] = readField('s', in); + properties_[cpClusterID] = readAmqpField(in, ShortString); } ////////////////////////////////////////////////////////////////////////// diff --git a/src/amqp_frame.h b/src/amqp_frame.h index 658e290..c4541e2 100644 --- a/src/amqp_frame.h +++ b/src/amqp_frame.h @@ -65,20 +65,8 @@ namespace Frame quint32 value; }; - /* - * @brief Definition implementation of TableField type - * @detailed Define implementation TableField type in builtin Qt types. - * Key contains field name, value contains field data. - * It can by any type witch support serialization in AMQP types. - */ - typedef QHash TableField; - - QDataStream &serialize(QDataStream &stream, const Frame::TableField &f); - QDataStream &deserialize(QDataStream &stream, Frame::TableField &f); - QVariant readField(qint8 valueType, QDataStream &s); - void writeField(QDataStream &s, const QVariant &value); - void writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType = false); - void print(const Frame::TableField &f); + QVariant readAmqpField(QDataStream &s, QAMQP::ValueType type); + void writeAmqpField(QDataStream &s, QAMQP::ValueType type, const QVariant &value); /* * @brief Base class for any frames. @@ -401,6 +389,6 @@ typedef Frame::Content::Property MessageProperty; } // namespace QAMQP Q_DECLARE_METATYPE(QAMQP::Frame::decimal) -Q_DECLARE_METATYPE(QAMQP::Frame::TableField) +//Q_DECLARE_METATYPE(QAMQP::Frame::TableField) #endif // amqp_frame_h__ diff --git a/src/amqp_global.h b/src/amqp_global.h index 13eb40b..364e397 100644 --- a/src/amqp_global.h +++ b/src/amqp_global.h @@ -47,8 +47,36 @@ namespace QAMQP { +enum ValueType +{ + Invalid = -1, -enum Error { + // basic AMQP types + Boolean, + ShortUint, + LongUint, + LongLongUint, + ShortString, + LongString, + + // field-value types + ShortShortInt, + ShortShortUint, + ShortInt, + LongInt, + LongLongInt, + Float, + Double, + Decimal, + Array, + Timestamp, + Hash, + Bytes, + Void +}; + +enum Error +{ NoError = 0, ContentTooLargeError = 311, UnroutableKey = 312, diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index f365959..b44c7d3 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -2,7 +2,7 @@ #include "amqp_queue_p.h" #include "amqp_exchange.h" #include "amqp_message_p.h" - +#include "amqp_table.h" using namespace QAMQP; #include @@ -90,8 +90,12 @@ void QueuePrivate::_q_content(const Frame::Content &frame) currentMessage.d->leftSize = frame.bodySize(); QHash::ConstIterator it; QHash::ConstIterator itEnd = frame.properties_.constEnd(); - for (it = frame.properties_.constBegin(); it != itEnd; ++it) - currentMessage.d->properties[static_cast(it.key())] = it.value(); + for (it = frame.properties_.constBegin(); it != itEnd; ++it) { + Message::Property property = static_cast(it.key()); + if (property == Message::Headers) + currentMessage.d->headers = (it.value()).toHash(); + currentMessage.d->properties[property] = it.value(); + } } void QueuePrivate::_q_body(const Frame::ContentBody &frame) @@ -123,7 +127,7 @@ void QueuePrivate::declareOk(const Frame::Method &frame) QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); - name = Frame::readField('s', stream).toString(); + name = Frame::readAmqpField(stream, ShortString).toString(); qint32 messageCount = 0, consumerCount = 0; stream >> messageCount >> consumerCount; qAmqpDebug("message count %d\nConsumer count: %d", messageCount, consumerCount); @@ -184,10 +188,10 @@ void QueuePrivate::getOk(const Frame::Method &frame) QDataStream in(&data, QIODevice::ReadOnly); Message message; - message.d->deliveryTag = Frame::readField('L',in).toLongLong(); - message.d->redelivered = Frame::readField('t',in).toBool(); - message.d->exchangeName = Frame::readField('s',in).toString(); - message.d->routingKey = Frame::readField('s',in).toString(); + message.d->deliveryTag = Frame::readAmqpField(in, LongLongUint).toLongLong(); + message.d->redelivered = Frame::readAmqpField(in, Boolean).toBool(); + message.d->exchangeName = Frame::readAmqpField(in, ShortString).toString(); + message.d->routingKey = Frame::readAmqpField(in, ShortString).toString(); currentMessage = message; } @@ -197,7 +201,7 @@ void QueuePrivate::consumeOk(const Frame::Method &frame) qAmqpDebug() << "consume ok: " << name; QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); - consumerTag = Frame::readField('s',stream).toString(); + consumerTag = Frame::readAmqpField(stream, ShortString).toString(); qAmqpDebug("consumer tag = %s", qPrintable(consumerTag)); consuming = true; Q_EMIT q->consuming(consumerTag); @@ -208,17 +212,17 @@ void QueuePrivate::deliver(const Frame::Method &frame) qAmqpDebug() << Q_FUNC_INFO; QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); - QString consumer = Frame::readField('s',in).toString(); + QString consumer = Frame::readAmqpField(in, ShortString).toString(); if (consumerTag != consumer) { qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer; return; } Message message; - message.d->deliveryTag = Frame::readField('L',in).toLongLong(); - message.d->redelivered = Frame::readField('t',in).toBool(); - message.d->exchangeName = Frame::readField('s',in).toString(); - message.d->routingKey = Frame::readField('s',in).toString(); + message.d->deliveryTag = Frame::readAmqpField(in, LongLongUint).toLongLong(); + message.d->redelivered = Frame::readAmqpField(in, Boolean).toBool(); + message.d->exchangeName = Frame::readAmqpField(in, ShortString).toString(); + message.d->routingKey = Frame::readAmqpField(in, ShortString).toString(); currentMessage = message; } @@ -231,9 +235,9 @@ void QueuePrivate::declare() QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, name); + Frame::writeAmqpField(out, ShortString, name); out << qint8(options); - Frame::writeField('F', out, Frame::TableField()); + Frame::writeAmqpField(out, Hash, Table()); frame.setArguments(arguments); sendFrame(frame); @@ -248,7 +252,7 @@ void QueuePrivate::cancelOk(const Frame::Method &frame) qAmqpDebug() << Q_FUNC_INFO; QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); - QString consumer = Frame::readField('s',in).toString(); + QString consumer = Frame::readAmqpField(in, ShortString).toString(); if (consumerTag != consumer) { qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer; return; @@ -323,7 +327,7 @@ void Queue::remove(int options) QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); + Frame::writeAmqpField(out, ShortString, d->name); out << qint8(options); frame.setArguments(arguments); @@ -343,7 +347,7 @@ void Queue::purge() QByteArray arguments; QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); + Frame::writeAmqpField(out, ShortString, d->name); out << qint8(0); // no-wait frame.setArguments(arguments); @@ -375,12 +379,12 @@ void Queue::bind(const QString &exchangeName, const QString &key) QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); // reserved 1 - Frame::writeField('s', out, d->name); - Frame::writeField('s', out, exchangeName); - Frame::writeField('s', out, key); + Frame::writeAmqpField(out, ShortString, d->name); + Frame::writeAmqpField(out, ShortString, exchangeName); + Frame::writeAmqpField(out, ShortString, key); out << qint8(0); // no-wait - Frame::writeField('F', out, Frame::TableField()); + Frame::writeAmqpField(out, Hash, Table()); frame.setArguments(arguments); d->sendFrame(frame); @@ -410,10 +414,10 @@ void Queue::unbind(const QString &exchangeName, const QString &key) QByteArray arguments; QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); - Frame::writeField('s', out, exchangeName); - Frame::writeField('s', out, key); - Frame::writeField('F', out, Frame::TableField()); + Frame::writeAmqpField(out, ShortString, d->name); + Frame::writeAmqpField(out, ShortString, exchangeName); + Frame::writeAmqpField(out, ShortString, key); + Frame::writeAmqpField(out, Hash, Table()); frame.setArguments(arguments); d->sendFrame(frame); @@ -439,11 +443,11 @@ bool Queue::consume(int options) QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); - Frame::writeField('s', out, d->consumerTag); + Frame::writeAmqpField(out, ShortString, d->name); + Frame::writeAmqpField(out, ShortString, d->consumerTag); out << qint8(options); - Frame::writeField('F', out, Frame::TableField()); + Frame::writeAmqpField(out, Hash, Table()); frame.setArguments(arguments); d->sendFrame(frame); @@ -483,7 +487,7 @@ void Queue::get(bool noAck) QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); + Frame::writeAmqpField(out, ShortString, d->name); out << qint8(noAck ? 1 : 0); // no-ack frame.setArguments(arguments); @@ -530,7 +534,7 @@ bool Queue::cancel(bool noWait) QByteArray arguments; QDataStream out(&arguments, QIODevice::WriteOnly); - Frame::writeField('s', out, d->consumerTag); + Frame::writeAmqpField(out, ShortString, d->consumerTag); out << (noWait ? qint8(0x01) : qint8(0x0)); frame.setArguments(arguments); diff --git a/src/amqp_table.cpp b/src/amqp_table.cpp new file mode 100644 index 0000000..3bcd654 --- /dev/null +++ b/src/amqp_table.cpp @@ -0,0 +1,371 @@ +#include + +#include +#include + +#include "amqp_frame.h" +#include "amqp_table.h" +using namespace QAMQP; + +/* + * field value types according to: https://www.rabbitmq.com/amqp-0-9-1-errata.html +t - Boolean +b - Signed 8-bit + Unsigned 8-bit +s - Signed 16-bit + Unsigned 16-bit +I - Signed 32-bit + Unsigned 32-bit +l - Signed 64-bit + Unsigned 64-bit +f - 32-bit float +d - 64-bit float +D - Decimal +S - Long string +A - Array +T - Timestamp (u64) +F - Nested Table +V - Void +x - Byte array +*/ + +ValueType valueTypeForOctet(qint8 octet) +{ + switch (octet) { + case 't': return Boolean; + case 'b': return ShortShortInt; + case 's': return ShortInt; + case 'I': return LongInt; + case 'l': return LongLongInt; + case 'f': return Float; + case 'd': return Double; + case 'D': return Decimal; + case 'S': return LongString; + case 'A': return Array; + case 'T': return Timestamp; + case 'F': return Hash; + case 'V': return Void; + case 'x': return Bytes; + default: + qAmqpDebug() << Q_FUNC_INFO << "invalid octet received: " << char(octet); + } + + return Invalid; +} + +qint8 valueTypeToOctet(ValueType type) +{ + switch (type) { + case Boolean: return 't'; + case ShortShortInt: return 'b'; + case ShortInt: return 's'; + case LongInt: return 'I'; + case LongLongInt: return 'l'; + case Float: return 'f'; + case Double: return 'd'; + case Decimal: return 'D'; + case LongString: return 'S'; + case Array: return 'A'; + case Timestamp: return 'T'; + case Hash: return 'F'; + case Void: return 'V'; + case Bytes: return 'x'; + default: + qAmqpDebug() << Q_FUNC_INFO << "invalid type received: " << char(type); + } + + return 'V'; +} + +void Table::writeFieldValue(QDataStream &stream, const QVariant &value) +{ + ValueType type; + switch (value.userType()) { + case QMetaType::Bool: + type = Boolean; + break; + case QMetaType::QByteArray: + type = Bytes; + break; + case QMetaType::Int: + { + int i = qAbs(value.toInt()); + if (i <= qint8(UINT8_MAX)) { + type = ShortShortInt; + } else if (i <= qint16(UINT16_MAX)) { + type = ShortInt; + } else { + type = LongInt; + } + } + break; + case QMetaType::UShort: + type = ShortInt; + break; + case QMetaType::UInt: + { + int i = value.toInt(); + if (i <= qint8(UINT8_MAX)) { + type = ShortShortInt; + } else if (i <= qint16(UINT16_MAX)) { + type = ShortInt; + } else { + type = LongInt; + } + } + break; + case QMetaType::LongLong: + case QMetaType::ULongLong: + type = LongLongInt; + break; + case QMetaType::QString: + type = LongString; + break; + case QMetaType::QDateTime: + type = Timestamp; + break; + case QMetaType::Double: + type = value.toDouble() > FLT_MAX ? Double : Float; + break; + case QMetaType::QVariantHash: + type = Hash; + break; + case QMetaType::QVariantList: + type = Array; + break; + case QMetaType::Void: + type = Void; + break; + default: + if (value.userType() == qMetaTypeId()) { + type = Decimal; + break; + } else if (!value.isValid()) { + type = Void; + break; + } + + qAmqpDebug() << Q_FUNC_INFO << "unhandled type: " << value.userType(); + return; + } + + // write the field value type, a requirement for field tables only + stream << valueTypeToOctet(type); + writeFieldValue(stream, type, value); +} + +void Table::writeFieldValue(QDataStream &stream, ValueType type, const QVariant &value) +{ + switch (type) { + case Boolean: + case ShortShortUint: + case ShortUint: + case LongUint: + case LongLongUint: + case ShortString: + case LongString: + case Timestamp: + case Hash: + return Frame::writeAmqpField(stream, type, value); + + case ShortShortInt: + stream << qint8(value.toInt()); + break; + case ShortInt: + stream << qint16(value.toInt()); + break; + case LongInt: + stream << qint32(value.toInt()); + break; + case LongLongInt: + stream << qlonglong(value.toLongLong()); + break; + case Float: + { + float g = value.toFloat(); + QDataStream::FloatingPointPrecision oldPrecision = stream.floatingPointPrecision(); + stream.setFloatingPointPrecision(QDataStream::SinglePrecision); + stream << g; + stream.setFloatingPointPrecision(oldPrecision); + } + break; + case Double: + { + double g = value.toDouble(); + QDataStream::FloatingPointPrecision oldPrecision = stream.floatingPointPrecision(); + stream.setFloatingPointPrecision(QDataStream::DoublePrecision); + stream << g; + stream.setFloatingPointPrecision(oldPrecision); + } + break; + case Decimal: + { + Frame::decimal v(value.value()); + stream << v.scale; + stream << v.value; + } + break; + case Array: + { + QByteArray buffer; + QDataStream arrayStream(&buffer, QIODevice::WriteOnly); + QVariantList array(value.toList()); + for (int i = 0; i < array.size(); ++i) + writeFieldValue(arrayStream, array.at(i)); + + if (buffer.isEmpty()) { + stream << qint32(0); + } else { + stream << buffer; + } + } + break; + case Bytes: + { + QByteArray ba = value.toByteArray(); + stream << quint32(ba.length()); + stream.writeRawData(ba.data(), ba.length()); + } + break; + case Void: + stream << qint32(0); + break; + + default: + qAmqpDebug() << Q_FUNC_INFO << "unhandled type: " << type; + } +} + +QVariant Table::readFieldValue(QDataStream &stream, ValueType type) +{ + switch (type) { + case Boolean: + case ShortShortUint: + case ShortUint: + case LongUint: + case LongLongUint: + case ShortString: + case LongString: + case Timestamp: + case Hash: + return Frame::readAmqpField(stream, type); + + case ShortShortInt: + { + char octet; + stream.readRawData(&octet, sizeof(octet)); + return QVariant::fromValue(octet); + } + case ShortInt: + { + qint16 tmp_value = 0; + stream >> tmp_value; + return QVariant::fromValue(tmp_value); + } + case LongInt: + { + qint32 tmp_value = 0; + stream >> tmp_value; + return QVariant::fromValue(tmp_value); + } + case LongLongInt: + { + qlonglong v = 0 ; + stream >> v; + return v; + } + case Float: + { + float tmp_value; + QDataStream::FloatingPointPrecision precision = stream.floatingPointPrecision(); + stream.setFloatingPointPrecision(QDataStream::SinglePrecision); + stream >> tmp_value; + stream.setFloatingPointPrecision(precision); + return QVariant::fromValue(tmp_value); + } + case Double: + { + double tmp_value; + QDataStream::FloatingPointPrecision precision = stream.floatingPointPrecision(); + stream.setFloatingPointPrecision(QDataStream::DoublePrecision); + stream >> tmp_value; + stream.setFloatingPointPrecision(precision); + return QVariant::fromValue(tmp_value); + } + case Decimal: + { + Frame::decimal v; + stream >> v.scale; + stream >> v.value; + return QVariant::fromValue(v); + } + case Array: + { + QByteArray data; + quint32 size = 0; + stream >> size; + data.resize(size); + stream.readRawData(data.data(), data.size()); + + qint8 type = 0; + QVariantList result; + QDataStream arrayStream(&data, QIODevice::ReadOnly); + while (!arrayStream.atEnd()) { + arrayStream >> type; + result.append(readFieldValue(arrayStream, valueTypeForOctet(type))); + } + + return result; + } + case Bytes: + { + QByteArray bytes; + quint32 length = 0; + stream >> length; + bytes.resize(length); + stream.readRawData(bytes.data(), bytes.size()); + return bytes; + } + case Void: + break; + default: + qAmqpDebug() << Q_FUNC_INFO << "unhandled type: " << type; + } + + return QVariant(); +} + +QDataStream &operator<<(QDataStream &stream, const Table &table) +{ + QByteArray data; + QDataStream s(&data, QIODevice::WriteOnly); + Table::ConstIterator it; + Table::ConstIterator itEnd = table.constEnd(); + for (it = table.constBegin(); it != itEnd; ++it) { + Table::writeFieldValue(s, ShortString, it.key()); + Table::writeFieldValue(s, it.value()); + } + + if (data.isEmpty()) { + stream << qint32(0); + } else { + stream << data; + } + + return stream; +} + +QDataStream &operator>>(QDataStream &stream, Table &table) +{ + QByteArray data; + stream >> data; + QDataStream tableStream(&data, QIODevice::ReadOnly); + while (!tableStream.atEnd()) { + qint8 octet = 0; + QString field = Frame::readAmqpField(tableStream, ShortString).toString(); + tableStream >> octet; + table[field] = Table::readFieldValue(tableStream, valueTypeForOctet(octet)); + } + + return stream; +} diff --git a/src/amqp_table.h b/src/amqp_table.h new file mode 100644 index 0000000..f6f448a --- /dev/null +++ b/src/amqp_table.h @@ -0,0 +1,29 @@ +#ifndef amqp_table_h__ +#define amqp_table_h__ + +#include "amqp_global.h" +#include + +namespace QAMQP { + +class QAMQP_EXPORT Table : public QVariantHash +{ +public: + Table() {} + inline Table(const QVariantHash &variantHash) + : QVariantHash(variantHash) + { + } + + static void writeFieldValue(QDataStream &stream, const QVariant &value); + static void writeFieldValue(QDataStream &stream, ValueType type, const QVariant &value); + static QVariant readFieldValue(QDataStream &stream, ValueType type); +}; + +} // namespace QAMQP + +QAMQP_EXPORT QDataStream &operator<<(QDataStream &, const QAMQP::Table &table); +QAMQP_EXPORT QDataStream &operator>>(QDataStream &, QAMQP::Table &table); +Q_DECLARE_METATYPE(QAMQP::Table) + +#endif // amqp_table_h__ diff --git a/src/src.pro b/src/src.pro index 2f4b128..30e450d 100644 --- a/src/src.pro +++ b/src/src.pro @@ -25,7 +25,8 @@ INSTALL_HEADERS += \ amqp_frame.h \ amqp_global.h \ amqp_message.h \ - amqp_queue.h + amqp_queue.h \ + amqp_table.h HEADERS += \ $${INSTALL_HEADERS} \ @@ -38,7 +39,8 @@ SOURCES += \ amqp_exchange.cpp \ amqp_frame.cpp \ amqp_message.cpp \ - amqp_queue.cpp + amqp_queue.cpp \ + amqp_table.cpp # install headers.files = $${INSTALL_HEADERS} diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 48ede33..5542240 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -1,3 +1,5 @@ +#include + #include #include @@ -42,6 +44,7 @@ private Q_SLOTS: void invalidQos(); void qos(); void invalidRoutingKey(); + void tableFieldDataTypes(); private: void declareQueueAndVerifyConsuming(Queue *queue); @@ -531,5 +534,84 @@ void tst_QAMQPQueue::invalidRoutingKey() QCOMPARE(client->error(), QAMQP::FrameError); } +void tst_QAMQPQueue::tableFieldDataTypes() +{ + Queue *queue = client->createQueue("test-table-field-data-types"); + declareQueueAndVerifyConsuming(queue); + + Frame::decimal decimal; + decimal.scale = 2; + decimal.value = 12345; + QVariant decimalVariant = QVariant::fromValue(decimal); + + Table nestedTable; + nestedTable.insert("boolean", true); + nestedTable.insert("long-int", qint32(-65536)); + + QVariantList array; + array.append(true); + array.append(qint32(-65536)); + + QDateTime timestamp = QDateTime::currentDateTime(); + + Table headers; + headers.insert("boolean", true); + headers.insert("short-short-int", qint8(-15)); + headers.insert("short-short-uint", quint8(15)); + headers.insert("short-int", qint16(-256)); + headers.insert("short-uint", QVariant::fromValue(quint16(256))); + headers.insert("long-int", qint32(-65536)); + headers.insert("long-uint", quint32(65536)); + headers.insert("long-long-int", qint64(-2147483648)); + headers.insert("long-long-uint", quint64(2147483648)); + headers.insert("float", 230.7); + headers.insert("double", double(FLT_MAX)); + headers.insert("decimal-value", decimalVariant); + headers.insert("short-string", QLatin1String("test")); + headers.insert("long-string", QLatin1String("test")); + headers.insert("timestamp", timestamp); + headers.insert("nested-table", nestedTable); + headers.insert("array", array); + headers.insert("bytes", QByteArray("abcdefg1234567")); + + Exchange *defaultExchange = client->createExchange(); + defaultExchange->publish("dummy", "test-table-field-data-types", "text.plain", headers); + + QVERIFY(waitForSignal(queue, SIGNAL(messageReceived()))); + Message message = queue->dequeue(); + + QCOMPARE(message.header("boolean").toBool(), true); + QCOMPARE(qint8(message.header("short-short-int").toInt()), qint8(-15)); + QCOMPARE(quint8(message.header("short-short-uint").toUInt()), quint8(15)); + QCOMPARE(qint16(message.header("short-int").toInt()), qint16(-256)); + QCOMPARE(quint16(message.header("short-uint").toUInt()), quint16(256)); + QCOMPARE(qint32(message.header("long-int").toInt()), qint32(-65536)); + QCOMPARE(quint32(message.header("long-uint").toUInt()), quint32(65536)); + QCOMPARE(qint64(message.header("long-long-int").toLongLong()), qint64(-2147483648)); + QCOMPARE(quint64(message.header("long-long-uint").toLongLong()), quint64(2147483648)); + QCOMPARE(message.header("float").toFloat(), float(230.7)); + QCOMPARE(message.header("double").toDouble(), double(FLT_MAX)); + QCOMPARE(message.header("short-string").toString(), QLatin1String("test")); + QCOMPARE(message.header("long-string").toString(), QLatin1String("test")); + QCOMPARE(message.header("timestamp").toDateTime(), timestamp); + QCOMPARE(message.header("bytes").toByteArray(), QByteArray("abcdefg1234567")); + + QVERIFY(message.hasHeader("nested-table")); + Table compareTable(message.header("nested-table").toHash()); + foreach (QString key, nestedTable.keys()) { + QVERIFY(compareTable.contains(key)); + QCOMPARE(nestedTable.value(key), compareTable.value(key)); + } + + QVERIFY(message.hasHeader("array")); + QVariantList compareArray = message.header("array").toList(); + QCOMPARE(array, compareArray); + + Frame::decimal receivedDecimal = message.header("decimal-value").value(); + QCOMPARE(receivedDecimal.scale, qint8(2)); + QCOMPARE(receivedDecimal.value, quint32(12345)); +} + + QTEST_MAIN(tst_QAMQPQueue) #include "tst_qamqpqueue.moc"