diff --git a/src/amqp_authenticator.cpp b/src/amqp_authenticator.cpp index 98047c7..c6de043 100644 --- a/src/amqp_authenticator.cpp +++ b/src/amqp_authenticator.cpp @@ -1,6 +1,6 @@ -#include "amqp_authenticator.h" +#include "amqp_table.h" #include "amqp_frame.h" - +#include "amqp_authenticator.h" using namespace QAMQP; AMQPlainAuthenticator::AMQPlainAuthenticator(const QString &l, const QString &p) @@ -40,9 +40,9 @@ void AMQPlainAuthenticator::setPassword(const QString &p) void AMQPlainAuthenticator::write(QDataStream &out) { - Frame::writeField('s', out, type()); - Frame::TableField response; + Frame::writeAmqpField(out, ShortString, type()); + Table response; response["LOGIN"] = login_; response["PASSWORD"] = password_; - Frame::serialize(out, response); + out << response; } diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 675a7aa..2a53db1 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -156,10 +156,10 @@ void ChannelPrivate::close(int code, const QString &text, int classId, int metho QByteArray arguments; QDataStream stream(&arguments, QIODevice::WriteOnly); - Frame::writeField('s',stream, client->virtualHost()); + Frame::writeAmqpField(stream, ShortString, client->virtualHost()); stream << qint16(code); - Frame::writeField('s', stream, text); + Frame::writeAmqpField(stream, ShortString, text); stream << qint16(classId); stream << qint16(methodId); @@ -176,7 +176,8 @@ void ChannelPrivate::close(const Frame::Method &frame) QDataStream stream(&data, QIODevice::ReadOnly); qint16 code = 0, classId, methodId; stream >> code; - QString text(Frame::readField('s', stream).toString()); + QString text = Frame::readAmqpField(stream, ShortString).toString(); + stream >> classId; stream >> methodId; diff --git a/src/amqp_client.cpp b/src/amqp_client.cpp index 610de62..fb7abcc 100644 --- a/src/amqp_client.cpp +++ b/src/amqp_client.cpp @@ -1,18 +1,18 @@ -#include "amqp_client.h" -#include "amqp_client_p.h" -#include "amqp_global.h" -#include "amqp_exchange.h" -#include "amqp_exchange_p.h" -#include "amqp_queue.h" -#include "amqp_queue_p.h" -#include "amqp_authenticator.h" - #include #include #include #include #include +#include "amqp_global.h" +#include "amqp_exchange.h" +#include "amqp_exchange_p.h" +#include "amqp_queue.h" +#include "amqp_queue_p.h" +#include "amqp_authenticator.h" +#include "amqp_table.h" +#include "amqp_client_p.h" +#include "amqp_client.h" using namespace QAMQP; ClientPrivate::ClientPrivate(Client *q) @@ -88,7 +88,6 @@ void ClientPrivate::parseConnectionString(const QString &uri) return; } - port = connectionString.port(AMQP_PORT); host = connectionString.host(); @@ -329,21 +328,22 @@ void ClientPrivate::start(const Frame::Method &frame) qAmqpDebug(">> Start"); QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); + quint8 version_major = 0; quint8 version_minor = 0; - stream >> version_major >> version_minor; - Frame::TableField table; - Frame::deserialize(stream, table); + Table table; + stream >> table; - QStringList mechanisms = Frame::readField('S', stream).toString().split(' '); - QString locales = Frame::readField('S', stream).toString(); + QStringList mechanisms = Frame::readAmqpField(stream, LongString).toString().split(' '); + QString locales = Frame::readAmqpField(stream, LongString).toString(); qAmqpDebug(">> version_major: %d", version_major); qAmqpDebug(">> version_minor: %d", version_minor); - Frame::print(table); + // NOTE: replace with qDebug overload + // Frame::print(table); qAmqpDebug() << ">> mechanisms: " << mechanisms; qAmqpDebug(">> locales: %s", qPrintable(locales)); @@ -427,7 +427,7 @@ void ClientPrivate::close(const Frame::Method &frame) QDataStream stream(&data, QIODevice::ReadOnly); qint16 code = 0, classId, methodId; stream >> code; - QString text(Frame::readField('s', stream).toString()); + QString text = Frame::readAmqpField(stream, ShortString).toString(); stream >> classId; stream >> methodId; @@ -452,15 +452,15 @@ void ClientPrivate::startOk() QByteArray arguments; QDataStream stream(&arguments, QIODevice::WriteOnly); - Frame::TableField clientProperties; + Table clientProperties; clientProperties["version"] = QString(QAMQP_VERSION); clientProperties["platform"] = QString("Qt %1").arg(qVersion()); clientProperties["product"] = QString("QAMQP"); clientProperties.unite(customProperties); - Frame::serialize(stream, clientProperties); + stream << clientProperties; authenticator->write(stream); - Frame::writeField('s', stream, "en_US"); + Frame::writeAmqpField(stream, ShortString, QLatin1String("en_US")); frame.setArguments(arguments); sendFrame(frame); @@ -491,7 +491,7 @@ void ClientPrivate::open() QByteArray arguments; QDataStream stream(&arguments, QIODevice::WriteOnly); - Frame::writeField('s',stream, virtualHost); + Frame::writeAmqpField(stream, ShortString, virtualHost); stream << qint8(0); stream << qint8(0); @@ -507,7 +507,7 @@ void ClientPrivate::close(int code, const QString &text, int classId, int method QDataStream stream(&arguments, QIODevice::WriteOnly); stream << qint16(code); - Frame::writeField('s', stream, text); + Frame::writeAmqpField(stream, ShortString, text); stream << qint16(classId); stream << qint16(methodId); diff --git a/src/amqp_client_p.h b/src/amqp_client_p.h index 916070e..c289d08 100644 --- a/src/amqp_client_p.h +++ b/src/amqp_client_p.h @@ -6,6 +6,12 @@ #include #include +#ifndef QT_NO_SSL +# include +# include +#endif + +#include "amqp_table.h" #include "amqp_frame.h" #define METHOD_ID_ENUM(name, id) name = id, name ## Ok @@ -92,7 +98,7 @@ public: bool closed; bool connected; QPointer heartbeatTimer; - Frame::TableField customProperties; + Table customProperties; qint16 channelMax; qint16 heartbeatDelay; qint32 frameMax; @@ -106,6 +112,7 @@ public: }; #ifndef QT_NO_SSL +class SslClient; class SslClientPrivate : public ClientPrivate { public: diff --git a/src/amqp_exchange.cpp b/src/amqp_exchange.cpp index a514eb5..4f43240 100644 --- a/src/amqp_exchange.cpp +++ b/src/amqp_exchange.cpp @@ -48,11 +48,11 @@ void ExchangePrivate::declare() QDataStream stream(&args, QIODevice::WriteOnly); stream << qint16(0); //reserved 1 - Frame::writeField('s', stream, name); - Frame::writeField('s', stream, type); + Frame::writeAmqpField(stream, ShortString, name); + Frame::writeAmqpField(stream, ShortString, type); stream << qint8(options); - Frame::writeField('F', stream, arguments); + Frame::writeAmqpField(stream, Hash, arguments); frame.setArguments(args); sendFrame(frame); @@ -133,9 +133,9 @@ void ExchangePrivate::basicReturn(const Frame::Method &frame) quint16 replyCode; stream >> replyCode; - QString replyText = Frame::readField('s', stream).toString(); - QString exchangeName = Frame::readField('s', stream).toString(); - QString routingKey = Frame::readField('s', stream).toString(); + QString replyText = Frame::readAmqpField(stream, ShortString).toString(); + QString exchangeName = Frame::readAmqpField(stream, ShortString).toString(); + QString routingKey = Frame::readAmqpField(stream, ShortString).toString(); Error checkError = static_cast(replyCode); if (checkError != QAMQP::NoError) { @@ -186,12 +186,12 @@ QString Exchange::type() const return d->type; } -void Exchange::declare(ExchangeType type, ExchangeOptions options , const Frame::TableField &args) +void Exchange::declare(ExchangeType type, ExchangeOptions options, const Table &args) { declare(ExchangePrivate::typeToString(type), options, args); } -void Exchange::declare(const QString &type, ExchangeOptions options , const Frame::TableField &args) +void Exchange::declare(const QString &type, ExchangeOptions options, const Table &args) { Q_D(Exchange); d->type = type; @@ -210,7 +210,7 @@ void Exchange::remove(int options) QDataStream stream(&arguments, QIODevice::WriteOnly); stream << qint16(0); //reserved 1 - Frame::writeField('s', stream, d->name); + Frame::writeAmqpField(stream, ShortString, d->name); stream << qint8(options); frame.setArguments(arguments); @@ -232,7 +232,7 @@ void Exchange::publish(const QByteArray &message, const QString &routingKey, } void Exchange::publish(const QByteArray &message, const QString &routingKey, - const QString &mimeType, const QVariantHash &headers, + const QString &mimeType, const Table &headers, const MessageProperties &properties, int publishOptions) { Q_D(Exchange); @@ -243,8 +243,8 @@ void Exchange::publish(const QByteArray &message, const QString &routingKey, QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); - Frame::writeField('s', out, routingKey); + Frame::writeAmqpField(out, ShortString, d->name); + Frame::writeAmqpField(out, ShortString, routingKey); out << qint8(publishOptions); frame.setArguments(arguments); diff --git a/src/amqp_exchange.h b/src/amqp_exchange.h index ccf6c6a..654baac 100644 --- a/src/amqp_exchange.h +++ b/src/amqp_exchange.h @@ -1,6 +1,7 @@ #ifndef amqp_exchange_h__ #define amqp_exchange_h__ +#include "amqp_table.h" #include "amqp_channel.h" namespace QAMQP @@ -56,10 +57,10 @@ public: // AMQP Exchange void declare(ExchangeType type = Direct, ExchangeOptions options = NoOptions, - const Frame::TableField &args = Frame::TableField()); + const Table &args = Table()); void declare(const QString &type = QLatin1String("direct"), ExchangeOptions options = NoOptions, - const Frame::TableField &args = Frame::TableField()); + const Table &args = Table()); void remove(int options = roIfUnused|roNoWait); // AMQP Basic @@ -70,7 +71,7 @@ public: const QString &mimeType, const MessageProperties &properties = MessageProperties(), int publishOptions = poNoOptions); void publish(const QByteArray &message, const QString &routingKey, - const QString &mimeType, const QVariantHash &headers, + const QString &mimeType, const Table &headers, const MessageProperties &properties = MessageProperties(), int publishOptions = poNoOptions); diff --git a/src/amqp_exchange_p.h b/src/amqp_exchange_p.h index 04a1e1c..460404b 100644 --- a/src/amqp_exchange_p.h +++ b/src/amqp_exchange_p.h @@ -1,6 +1,7 @@ #ifndef amqp_exchange_p_h__ #define amqp_exchange_p_h__ +#include "amqp_table.h" #include "amqp_exchange.h" #include "amqp_channel_p.h" @@ -29,7 +30,7 @@ public: QString type; Exchange::ExchangeOptions options; - Frame::TableField arguments; + Table arguments; bool delayedDeclare; bool declared; diff --git a/src/amqp_frame.cpp b/src/amqp_frame.cpp index 8058e6b..322e2ac 100644 --- a/src/amqp_frame.cpp +++ b/src/amqp_frame.cpp @@ -1,11 +1,10 @@ #include "amqp_frame.h" -#include - #include #include #include +#include "amqp_table.h" #include "amqp_global.h" using namespace QAMQP; @@ -141,244 +140,99 @@ void Method::writePayload(QDataStream &stream) const ////////////////////////////////////////////////////////////////////////// -QVariant Frame::readField(qint8 valueType, QDataStream &s) +QVariant Frame::readAmqpField(QDataStream &s, QAMQP::ValueType type) { - QVariant value; - QByteArray tmp; - qint8 nameSize_ = 0; - char octet = 0; - - switch(valueType) { - case 't': - s.readRawData(&octet, sizeof(octet)); - value = QVariant::fromValue(octet > 0); - break; - case 'b': - s.readRawData(&octet, sizeof(octet)); - value = QVariant::fromValue(octet); - break; - case 'B': - s.readRawData(&octet, sizeof(octet)); - value = QVariant::fromValue(octet); - break; - case 'U': + switch (type) { + case Boolean: { - qint16 tmp_value_ = 0; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint8 octet = 0; + s >> octet; + return QVariant::fromValue(octet > 0); } - case 'u': + case ShortShortUint: { - quint16 tmp_value_ = 0; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint8 octet = 0; + s >> octet; + return QVariant::fromValue(octet); } - case 'I': + case ShortUint: { - qint32 tmp_value_ = 0; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint16 tmp_value = 0; + s >> tmp_value; + return QVariant::fromValue(tmp_value); } - case 'i': + case LongUint: { - quint32 tmp_value_ = 0; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint32 tmp_value = 0; + s >> tmp_value; + return QVariant::fromValue(tmp_value); } - case 'L': - { - qlonglong v = 0 ; - s >> v; - value = v; - } - - break; - case 'l': + case LongLongUint: { qulonglong v = 0 ; s >> v; - value = v; + return v; } + case ShortString: + { + qint8 size = 0; + QByteArray buffer; - break; - case 'f': - { - float tmp_value_; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + s >> size; + buffer.resize(size); + s.readRawData(buffer.data(), buffer.size()); + return QString::fromUtf8(buffer.data(), size); } - case 'd': + case LongString: { - double tmp_value_; - s >> tmp_value_; - value = QVariant::fromValue(tmp_value_); - break; + quint32 size = 0; + QByteArray buffer; + + s >> size; + buffer.resize(size); + s.readRawData(buffer.data(), buffer.size()); + return QString::fromUtf8(buffer.data(), buffer.size()); } - case 'D': + case Timestamp: { - Frame::decimal v; - s >> v.scale; - s >> v.value; - value = QVariant::fromValue(v); + qulonglong tmp_value; + s >> tmp_value; + return QDateTime::fromMSecsSinceEpoch(tmp_value); } - break; - case 's': - s >> nameSize_; - tmp.resize(nameSize_); - s.readRawData(tmp.data(), tmp.size()); - value = QString::fromLatin1(tmp.data(), nameSize_); - break; - case 'S': + case Hash: { - quint32 length_ = 0; - s >> length_; - nameSize_ = length_; - tmp.resize(length_); + Table table; + s >> table; + return table; } - s.readRawData(tmp.data(), tmp.size()); - value = QString::fromLatin1(tmp.data(), tmp.size()); - break; - case 'A': - { - qint32 length_ = 0; - qint8 type = 0; - s >> length_; - QList array_; - for (int i =0; i < length_; ++i) { - s >> type; - array_ << readField(type, s); - } - value = array_; - } - break; - case 'T': - { - qulonglong tmp_value_; - s >> tmp_value_; - value = QDateTime::fromMSecsSinceEpoch(tmp_value_); - break; - } - case 'F': - { - TableField table_; - deserialize(s, table_); - value = table_; - } - break; - case 'V': - break; + case Void: + return QVariant(); default: - qWarning("Unknown field type"); + qWarning() << Q_FUNC_INFO << "unsupported value type: " << type; } - return value; + + return QVariant(); } -QDataStream &Frame::deserialize(QDataStream &stream, Frame::TableField &f) +void Frame::writeAmqpField(QDataStream &s, QAMQP::ValueType type, const QVariant &value) { - QByteArray data; - stream >> data; - QDataStream s(&data, QIODevice::ReadOnly); - - while (!s.atEnd()) { - qint8 valueType = 0; - QString name = readField('s', s).toString(); - s >> valueType; - f[name] = readField(valueType, s); - } - - return stream; -} - -QDataStream &Frame::serialize(QDataStream &stream, const TableField &f) -{ - QByteArray data; - QDataStream s(&data, QIODevice::WriteOnly); - TableField::ConstIterator it; - TableField::ConstIterator itEnd = f.constEnd(); - for (it = f.constBegin(); it != itEnd; ++it) { - writeField('s', s, it.key()); - writeField(s, it.value()); - } - - if (data.isEmpty()) { - stream << qint32(0); - } else { - stream << data; - } - - return stream; -} - -void Frame::print(const TableField &f) -{ - TableField::ConstIterator it; - TableField::ConstIterator itEnd = f.constEnd(); - for (it = f.constBegin(); it != itEnd; ++it) { - switch(it.value().type()) { - case QVariant::Hash: - qAmqpDebug() << "\t" << qPrintable(it.key()) << ": FIELD_TABLE"; - break; - case QVariant::List: - qAmqpDebug() << "\t" << qPrintable(it.key()) << ": ARRAY"; - break; - default: - qAmqpDebug() << "\t" << qPrintable(it.key()) << ": " << it.value(); - } - } -} - -void Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType) -{ - if (withType) - s << valueType; - - switch (valueType) { - case 't': + switch (type) { + case Boolean: s << (value.toBool() ? qint8(1) : qint8(0)); break; - case 'b': - s << qint8(value.toInt()); + case ShortShortUint: + s << qint8(value.toUInt()); break; - case 'B': - s << quint8(value.toUInt()); - break; - case 'U': - s << qint16(value.toInt()); - break; - case 'u': + case ShortUint: s << quint16(value.toUInt()); break; - case 'I': - s << qint32(value.toInt()); - break; - case 'i': + case LongUint: s << quint32(value.toUInt()); break; - case 'L': - s << qlonglong(value.toLongLong()); - break; - case 'l': + case LongLongUint: s << qulonglong(value.toULongLong()); break; - case 'f': - s << value.toFloat(); - break; - case 'd': - s << value.toDouble(); - break; - case 'D': - { - Frame::decimal v(value.value()); - s << v.scale; - s << v.value; - } - break; - case 's': + case ShortString: { QString str = value.toString(); if (str.length() >= 256) { @@ -386,104 +240,30 @@ void Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, b } s << quint8(str.length()); - s.writeRawData(str.toLatin1().data(), str.length()); + s.writeRawData(str.toUtf8().data(), str.length()); } break; - case 'S': + case LongString: { QString str = value.toString(); s << quint32(str.length()); s.writeRawData(str.toLatin1().data(), str.length()); } break; - case 'A': - { - QList array_(value.toList()); - s << quint32(array_.count()); - for (int i =0; i < array_.count(); ++i) - writeField(s, array_.at(i)); - } - break; - case 'T': + case Timestamp: s << qulonglong(value.toDateTime().toMSecsSinceEpoch()); break; - case 'F': + case Hash: { - TableField table_(value.toHash()); - serialize(s, table_); + Table table(value.toHash()); + s << table; } break; - case 'V': - break; default: - qWarning("Unknown field type"); + qWarning() << Q_FUNC_INFO << "unsupported value type: " << type; } } -void Frame::writeField(QDataStream &s, const QVariant &value) -{ - char type = 0; - switch (value.type()) { - case QVariant::Bool: - type = 't'; - break; - case QVariant::ByteArray: - type = 'S'; - break; - case QVariant::Int: - { - int i = qAbs(value.toInt()); - if (i <= qint8(0xFF)) { - type = 'b'; - } else if (i <= qint16(0xFFFF)) { - type = 'U'; - } else if (i <= qint16(0xFFFFFFFF)) { - type = 'I'; - } - } - break; - case QVariant::UInt: - { - int i = value.toInt(); - if (i <= qint8(0xFF)) { - type = 'B'; - } else if (i <= qint16(0xFFFF)) { - type = 'u'; - } else if (i <= qint16(0xFFFFFFFF)) { - type = 'i'; - } - } - break; - case QVariant::LongLong: - type = 'L'; - break; - case QVariant::ULongLong: - type = 'l'; - break; - case QVariant::String: - type = 'S'; - break; - case QVariant::DateTime: - type = 'T'; - break; - case QVariant::Double: - type = value.toDouble() > FLT_MAX ? 'd' : 'f'; - break; - case QVariant::Hash: - type = 'F'; - break; - case QVariant::List: - type = 'A'; - break; - - default: - qAmqpDebug() << Q_FUNC_INFO << "unhandled variant type: " << value.type(); - } - - if (type) - writeField(type, s, value, true); -} - ////////////////////////////////////////////////////////////////////////// Content::Content() @@ -522,46 +302,46 @@ qint32 Content::size() const out << prop_; if (prop_ & cpContentType) - writeField('s', out, properties_[cpContentType]); + writeAmqpField(out, ShortString, properties_[cpContentType]); if (prop_ & cpContentEncoding) - writeField('s', out, properties_[cpContentEncoding]); + writeAmqpField(out, ShortString, properties_[cpContentEncoding]); if (prop_ & cpHeaders) - writeField('F', out, properties_[cpHeaders]); + writeAmqpField(out, Hash, properties_[cpHeaders]); if (prop_ & cpDeliveryMode) - writeField('b', out, properties_[cpDeliveryMode]); + writeAmqpField(out, ShortShortUint, properties_[cpDeliveryMode]); if (prop_ & cpPriority) - writeField('b', out, properties_[cpPriority]); + writeAmqpField(out, ShortShortUint, properties_[cpPriority]); if (prop_ & cpCorrelationId) - writeField('s', out, properties_[cpCorrelationId]); + writeAmqpField(out, ShortString, properties_[cpCorrelationId]); if (prop_ & cpReplyTo) - writeField('s', out, properties_[cpReplyTo]); + writeAmqpField(out, ShortString, properties_[cpReplyTo]); if (prop_ & cpExpiration) - writeField('s', out, properties_[cpExpiration]); + writeAmqpField(out, ShortString, properties_[cpExpiration]); if (prop_ & cpMessageId) - writeField('s', out, properties_[cpMessageId]); + writeAmqpField(out, ShortString, properties_[cpMessageId]); if (prop_ & cpTimestamp) - writeField('T', out, properties_[cpTimestamp]); + writeAmqpField(out, Timestamp, properties_[cpTimestamp]); if (prop_ & cpType) - writeField('s', out, properties_[cpType]); + writeAmqpField(out, ShortString, properties_[cpType]); if (prop_ & cpUserId) - writeField('s', out, properties_[cpUserId]); + writeAmqpField(out, ShortString, properties_[cpUserId]); if (prop_ & cpAppId) - writeField('s', out, properties_[cpAppId]); + writeAmqpField(out, ShortString, properties_[cpAppId]); if (prop_ & cpClusterID) - writeField('s', out, properties_[cpClusterID]); + writeAmqpField(out, ShortString, properties_[cpClusterID]); return buffer_.size(); } @@ -599,46 +379,46 @@ void Content::readPayload(QDataStream &in) qint16 flags_ = 0; in >> flags_; if (flags_ & cpContentType) - properties_[cpContentType] = readField('s', in); + properties_[cpContentType] = readAmqpField(in, ShortString); if (flags_ & cpContentEncoding) - properties_[cpContentEncoding] = readField('s', in); + properties_[cpContentEncoding] = readAmqpField(in, ShortString); if (flags_ & cpHeaders) - properties_[cpHeaders] = readField('F', in); + properties_[cpHeaders] = readAmqpField(in, Hash); if (flags_ & cpDeliveryMode) - properties_[cpDeliveryMode] = readField('b', in); + properties_[cpDeliveryMode] = readAmqpField(in, ShortShortUint); if (flags_ & cpPriority) - properties_[cpPriority] = readField('b', in); + properties_[cpPriority] = readAmqpField(in, ShortShortUint); if (flags_ & cpCorrelationId) - properties_[cpCorrelationId] = readField('s', in); + properties_[cpCorrelationId] = readAmqpField(in, ShortString); if (flags_ & cpReplyTo) - properties_[cpReplyTo] = readField('s', in); + properties_[cpReplyTo] = readAmqpField(in, ShortString); if (flags_ & cpExpiration) - properties_[cpExpiration] = readField('s', in); + properties_[cpExpiration] = readAmqpField(in, ShortString); if (flags_ & cpMessageId) - properties_[cpMessageId] = readField('s', in); + properties_[cpMessageId] = readAmqpField(in, ShortString); if (flags_ & cpTimestamp) - properties_[cpTimestamp] = readField('T', in); + properties_[cpTimestamp] = readAmqpField(in, Timestamp); if (flags_ & cpType) - properties_[cpType] = readField('s', in); + properties_[cpType] = readAmqpField(in, ShortString); if (flags_ & cpUserId) - properties_[cpUserId] = readField('s', in); + properties_[cpUserId] = readAmqpField(in, ShortString); if (flags_ & cpAppId) - properties_[cpAppId] = readField('s', in); + properties_[cpAppId] = readAmqpField(in, ShortString); if (flags_ & cpClusterID) - properties_[cpClusterID] = readField('s', in); + properties_[cpClusterID] = readAmqpField(in, ShortString); } ////////////////////////////////////////////////////////////////////////// diff --git a/src/amqp_frame.h b/src/amqp_frame.h index 658e290..c4541e2 100644 --- a/src/amqp_frame.h +++ b/src/amqp_frame.h @@ -65,20 +65,8 @@ namespace Frame quint32 value; }; - /* - * @brief Definition implementation of TableField type - * @detailed Define implementation TableField type in builtin Qt types. - * Key contains field name, value contains field data. - * It can by any type witch support serialization in AMQP types. - */ - typedef QHash TableField; - - QDataStream &serialize(QDataStream &stream, const Frame::TableField &f); - QDataStream &deserialize(QDataStream &stream, Frame::TableField &f); - QVariant readField(qint8 valueType, QDataStream &s); - void writeField(QDataStream &s, const QVariant &value); - void writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType = false); - void print(const Frame::TableField &f); + QVariant readAmqpField(QDataStream &s, QAMQP::ValueType type); + void writeAmqpField(QDataStream &s, QAMQP::ValueType type, const QVariant &value); /* * @brief Base class for any frames. @@ -401,6 +389,6 @@ typedef Frame::Content::Property MessageProperty; } // namespace QAMQP Q_DECLARE_METATYPE(QAMQP::Frame::decimal) -Q_DECLARE_METATYPE(QAMQP::Frame::TableField) +//Q_DECLARE_METATYPE(QAMQP::Frame::TableField) #endif // amqp_frame_h__ diff --git a/src/amqp_global.h b/src/amqp_global.h index 13eb40b..364e397 100644 --- a/src/amqp_global.h +++ b/src/amqp_global.h @@ -47,8 +47,36 @@ namespace QAMQP { +enum ValueType +{ + Invalid = -1, -enum Error { + // basic AMQP types + Boolean, + ShortUint, + LongUint, + LongLongUint, + ShortString, + LongString, + + // field-value types + ShortShortInt, + ShortShortUint, + ShortInt, + LongInt, + LongLongInt, + Float, + Double, + Decimal, + Array, + Timestamp, + Hash, + Bytes, + Void +}; + +enum Error +{ NoError = 0, ContentTooLargeError = 311, UnroutableKey = 312, diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index f365959..b44c7d3 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -2,7 +2,7 @@ #include "amqp_queue_p.h" #include "amqp_exchange.h" #include "amqp_message_p.h" - +#include "amqp_table.h" using namespace QAMQP; #include @@ -90,8 +90,12 @@ void QueuePrivate::_q_content(const Frame::Content &frame) currentMessage.d->leftSize = frame.bodySize(); QHash::ConstIterator it; QHash::ConstIterator itEnd = frame.properties_.constEnd(); - for (it = frame.properties_.constBegin(); it != itEnd; ++it) - currentMessage.d->properties[static_cast(it.key())] = it.value(); + for (it = frame.properties_.constBegin(); it != itEnd; ++it) { + Message::Property property = static_cast(it.key()); + if (property == Message::Headers) + currentMessage.d->headers = (it.value()).toHash(); + currentMessage.d->properties[property] = it.value(); + } } void QueuePrivate::_q_body(const Frame::ContentBody &frame) @@ -123,7 +127,7 @@ void QueuePrivate::declareOk(const Frame::Method &frame) QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); - name = Frame::readField('s', stream).toString(); + name = Frame::readAmqpField(stream, ShortString).toString(); qint32 messageCount = 0, consumerCount = 0; stream >> messageCount >> consumerCount; qAmqpDebug("message count %d\nConsumer count: %d", messageCount, consumerCount); @@ -184,10 +188,10 @@ void QueuePrivate::getOk(const Frame::Method &frame) QDataStream in(&data, QIODevice::ReadOnly); Message message; - message.d->deliveryTag = Frame::readField('L',in).toLongLong(); - message.d->redelivered = Frame::readField('t',in).toBool(); - message.d->exchangeName = Frame::readField('s',in).toString(); - message.d->routingKey = Frame::readField('s',in).toString(); + message.d->deliveryTag = Frame::readAmqpField(in, LongLongUint).toLongLong(); + message.d->redelivered = Frame::readAmqpField(in, Boolean).toBool(); + message.d->exchangeName = Frame::readAmqpField(in, ShortString).toString(); + message.d->routingKey = Frame::readAmqpField(in, ShortString).toString(); currentMessage = message; } @@ -197,7 +201,7 @@ void QueuePrivate::consumeOk(const Frame::Method &frame) qAmqpDebug() << "consume ok: " << name; QByteArray data = frame.arguments(); QDataStream stream(&data, QIODevice::ReadOnly); - consumerTag = Frame::readField('s',stream).toString(); + consumerTag = Frame::readAmqpField(stream, ShortString).toString(); qAmqpDebug("consumer tag = %s", qPrintable(consumerTag)); consuming = true; Q_EMIT q->consuming(consumerTag); @@ -208,17 +212,17 @@ void QueuePrivate::deliver(const Frame::Method &frame) qAmqpDebug() << Q_FUNC_INFO; QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); - QString consumer = Frame::readField('s',in).toString(); + QString consumer = Frame::readAmqpField(in, ShortString).toString(); if (consumerTag != consumer) { qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer; return; } Message message; - message.d->deliveryTag = Frame::readField('L',in).toLongLong(); - message.d->redelivered = Frame::readField('t',in).toBool(); - message.d->exchangeName = Frame::readField('s',in).toString(); - message.d->routingKey = Frame::readField('s',in).toString(); + message.d->deliveryTag = Frame::readAmqpField(in, LongLongUint).toLongLong(); + message.d->redelivered = Frame::readAmqpField(in, Boolean).toBool(); + message.d->exchangeName = Frame::readAmqpField(in, ShortString).toString(); + message.d->routingKey = Frame::readAmqpField(in, ShortString).toString(); currentMessage = message; } @@ -231,9 +235,9 @@ void QueuePrivate::declare() QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, name); + Frame::writeAmqpField(out, ShortString, name); out << qint8(options); - Frame::writeField('F', out, Frame::TableField()); + Frame::writeAmqpField(out, Hash, Table()); frame.setArguments(arguments); sendFrame(frame); @@ -248,7 +252,7 @@ void QueuePrivate::cancelOk(const Frame::Method &frame) qAmqpDebug() << Q_FUNC_INFO; QByteArray data = frame.arguments(); QDataStream in(&data, QIODevice::ReadOnly); - QString consumer = Frame::readField('s',in).toString(); + QString consumer = Frame::readAmqpField(in, ShortString).toString(); if (consumerTag != consumer) { qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer; return; @@ -323,7 +327,7 @@ void Queue::remove(int options) QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); + Frame::writeAmqpField(out, ShortString, d->name); out << qint8(options); frame.setArguments(arguments); @@ -343,7 +347,7 @@ void Queue::purge() QByteArray arguments; QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); + Frame::writeAmqpField(out, ShortString, d->name); out << qint8(0); // no-wait frame.setArguments(arguments); @@ -375,12 +379,12 @@ void Queue::bind(const QString &exchangeName, const QString &key) QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); // reserved 1 - Frame::writeField('s', out, d->name); - Frame::writeField('s', out, exchangeName); - Frame::writeField('s', out, key); + Frame::writeAmqpField(out, ShortString, d->name); + Frame::writeAmqpField(out, ShortString, exchangeName); + Frame::writeAmqpField(out, ShortString, key); out << qint8(0); // no-wait - Frame::writeField('F', out, Frame::TableField()); + Frame::writeAmqpField(out, Hash, Table()); frame.setArguments(arguments); d->sendFrame(frame); @@ -410,10 +414,10 @@ void Queue::unbind(const QString &exchangeName, const QString &key) QByteArray arguments; QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); - Frame::writeField('s', out, exchangeName); - Frame::writeField('s', out, key); - Frame::writeField('F', out, Frame::TableField()); + Frame::writeAmqpField(out, ShortString, d->name); + Frame::writeAmqpField(out, ShortString, exchangeName); + Frame::writeAmqpField(out, ShortString, key); + Frame::writeAmqpField(out, Hash, Table()); frame.setArguments(arguments); d->sendFrame(frame); @@ -439,11 +443,11 @@ bool Queue::consume(int options) QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); - Frame::writeField('s', out, d->consumerTag); + Frame::writeAmqpField(out, ShortString, d->name); + Frame::writeAmqpField(out, ShortString, d->consumerTag); out << qint8(options); - Frame::writeField('F', out, Frame::TableField()); + Frame::writeAmqpField(out, Hash, Table()); frame.setArguments(arguments); d->sendFrame(frame); @@ -483,7 +487,7 @@ void Queue::get(bool noAck) QDataStream out(&arguments, QIODevice::WriteOnly); out << qint16(0); //reserved 1 - Frame::writeField('s', out, d->name); + Frame::writeAmqpField(out, ShortString, d->name); out << qint8(noAck ? 1 : 0); // no-ack frame.setArguments(arguments); @@ -530,7 +534,7 @@ bool Queue::cancel(bool noWait) QByteArray arguments; QDataStream out(&arguments, QIODevice::WriteOnly); - Frame::writeField('s', out, d->consumerTag); + Frame::writeAmqpField(out, ShortString, d->consumerTag); out << (noWait ? qint8(0x01) : qint8(0x0)); frame.setArguments(arguments); diff --git a/src/amqp_table.cpp b/src/amqp_table.cpp new file mode 100644 index 0000000..3bcd654 --- /dev/null +++ b/src/amqp_table.cpp @@ -0,0 +1,371 @@ +#include + +#include +#include + +#include "amqp_frame.h" +#include "amqp_table.h" +using namespace QAMQP; + +/* + * field value types according to: https://www.rabbitmq.com/amqp-0-9-1-errata.html +t - Boolean +b - Signed 8-bit + Unsigned 8-bit +s - Signed 16-bit + Unsigned 16-bit +I - Signed 32-bit + Unsigned 32-bit +l - Signed 64-bit + Unsigned 64-bit +f - 32-bit float +d - 64-bit float +D - Decimal +S - Long string +A - Array +T - Timestamp (u64) +F - Nested Table +V - Void +x - Byte array +*/ + +ValueType valueTypeForOctet(qint8 octet) +{ + switch (octet) { + case 't': return Boolean; + case 'b': return ShortShortInt; + case 's': return ShortInt; + case 'I': return LongInt; + case 'l': return LongLongInt; + case 'f': return Float; + case 'd': return Double; + case 'D': return Decimal; + case 'S': return LongString; + case 'A': return Array; + case 'T': return Timestamp; + case 'F': return Hash; + case 'V': return Void; + case 'x': return Bytes; + default: + qAmqpDebug() << Q_FUNC_INFO << "invalid octet received: " << char(octet); + } + + return Invalid; +} + +qint8 valueTypeToOctet(ValueType type) +{ + switch (type) { + case Boolean: return 't'; + case ShortShortInt: return 'b'; + case ShortInt: return 's'; + case LongInt: return 'I'; + case LongLongInt: return 'l'; + case Float: return 'f'; + case Double: return 'd'; + case Decimal: return 'D'; + case LongString: return 'S'; + case Array: return 'A'; + case Timestamp: return 'T'; + case Hash: return 'F'; + case Void: return 'V'; + case Bytes: return 'x'; + default: + qAmqpDebug() << Q_FUNC_INFO << "invalid type received: " << char(type); + } + + return 'V'; +} + +void Table::writeFieldValue(QDataStream &stream, const QVariant &value) +{ + ValueType type; + switch (value.userType()) { + case QMetaType::Bool: + type = Boolean; + break; + case QMetaType::QByteArray: + type = Bytes; + break; + case QMetaType::Int: + { + int i = qAbs(value.toInt()); + if (i <= qint8(UINT8_MAX)) { + type = ShortShortInt; + } else if (i <= qint16(UINT16_MAX)) { + type = ShortInt; + } else { + type = LongInt; + } + } + break; + case QMetaType::UShort: + type = ShortInt; + break; + case QMetaType::UInt: + { + int i = value.toInt(); + if (i <= qint8(UINT8_MAX)) { + type = ShortShortInt; + } else if (i <= qint16(UINT16_MAX)) { + type = ShortInt; + } else { + type = LongInt; + } + } + break; + case QMetaType::LongLong: + case QMetaType::ULongLong: + type = LongLongInt; + break; + case QMetaType::QString: + type = LongString; + break; + case QMetaType::QDateTime: + type = Timestamp; + break; + case QMetaType::Double: + type = value.toDouble() > FLT_MAX ? Double : Float; + break; + case QMetaType::QVariantHash: + type = Hash; + break; + case QMetaType::QVariantList: + type = Array; + break; + case QMetaType::Void: + type = Void; + break; + default: + if (value.userType() == qMetaTypeId()) { + type = Decimal; + break; + } else if (!value.isValid()) { + type = Void; + break; + } + + qAmqpDebug() << Q_FUNC_INFO << "unhandled type: " << value.userType(); + return; + } + + // write the field value type, a requirement for field tables only + stream << valueTypeToOctet(type); + writeFieldValue(stream, type, value); +} + +void Table::writeFieldValue(QDataStream &stream, ValueType type, const QVariant &value) +{ + switch (type) { + case Boolean: + case ShortShortUint: + case ShortUint: + case LongUint: + case LongLongUint: + case ShortString: + case LongString: + case Timestamp: + case Hash: + return Frame::writeAmqpField(stream, type, value); + + case ShortShortInt: + stream << qint8(value.toInt()); + break; + case ShortInt: + stream << qint16(value.toInt()); + break; + case LongInt: + stream << qint32(value.toInt()); + break; + case LongLongInt: + stream << qlonglong(value.toLongLong()); + break; + case Float: + { + float g = value.toFloat(); + QDataStream::FloatingPointPrecision oldPrecision = stream.floatingPointPrecision(); + stream.setFloatingPointPrecision(QDataStream::SinglePrecision); + stream << g; + stream.setFloatingPointPrecision(oldPrecision); + } + break; + case Double: + { + double g = value.toDouble(); + QDataStream::FloatingPointPrecision oldPrecision = stream.floatingPointPrecision(); + stream.setFloatingPointPrecision(QDataStream::DoublePrecision); + stream << g; + stream.setFloatingPointPrecision(oldPrecision); + } + break; + case Decimal: + { + Frame::decimal v(value.value()); + stream << v.scale; + stream << v.value; + } + break; + case Array: + { + QByteArray buffer; + QDataStream arrayStream(&buffer, QIODevice::WriteOnly); + QVariantList array(value.toList()); + for (int i = 0; i < array.size(); ++i) + writeFieldValue(arrayStream, array.at(i)); + + if (buffer.isEmpty()) { + stream << qint32(0); + } else { + stream << buffer; + } + } + break; + case Bytes: + { + QByteArray ba = value.toByteArray(); + stream << quint32(ba.length()); + stream.writeRawData(ba.data(), ba.length()); + } + break; + case Void: + stream << qint32(0); + break; + + default: + qAmqpDebug() << Q_FUNC_INFO << "unhandled type: " << type; + } +} + +QVariant Table::readFieldValue(QDataStream &stream, ValueType type) +{ + switch (type) { + case Boolean: + case ShortShortUint: + case ShortUint: + case LongUint: + case LongLongUint: + case ShortString: + case LongString: + case Timestamp: + case Hash: + return Frame::readAmqpField(stream, type); + + case ShortShortInt: + { + char octet; + stream.readRawData(&octet, sizeof(octet)); + return QVariant::fromValue(octet); + } + case ShortInt: + { + qint16 tmp_value = 0; + stream >> tmp_value; + return QVariant::fromValue(tmp_value); + } + case LongInt: + { + qint32 tmp_value = 0; + stream >> tmp_value; + return QVariant::fromValue(tmp_value); + } + case LongLongInt: + { + qlonglong v = 0 ; + stream >> v; + return v; + } + case Float: + { + float tmp_value; + QDataStream::FloatingPointPrecision precision = stream.floatingPointPrecision(); + stream.setFloatingPointPrecision(QDataStream::SinglePrecision); + stream >> tmp_value; + stream.setFloatingPointPrecision(precision); + return QVariant::fromValue(tmp_value); + } + case Double: + { + double tmp_value; + QDataStream::FloatingPointPrecision precision = stream.floatingPointPrecision(); + stream.setFloatingPointPrecision(QDataStream::DoublePrecision); + stream >> tmp_value; + stream.setFloatingPointPrecision(precision); + return QVariant::fromValue(tmp_value); + } + case Decimal: + { + Frame::decimal v; + stream >> v.scale; + stream >> v.value; + return QVariant::fromValue(v); + } + case Array: + { + QByteArray data; + quint32 size = 0; + stream >> size; + data.resize(size); + stream.readRawData(data.data(), data.size()); + + qint8 type = 0; + QVariantList result; + QDataStream arrayStream(&data, QIODevice::ReadOnly); + while (!arrayStream.atEnd()) { + arrayStream >> type; + result.append(readFieldValue(arrayStream, valueTypeForOctet(type))); + } + + return result; + } + case Bytes: + { + QByteArray bytes; + quint32 length = 0; + stream >> length; + bytes.resize(length); + stream.readRawData(bytes.data(), bytes.size()); + return bytes; + } + case Void: + break; + default: + qAmqpDebug() << Q_FUNC_INFO << "unhandled type: " << type; + } + + return QVariant(); +} + +QDataStream &operator<<(QDataStream &stream, const Table &table) +{ + QByteArray data; + QDataStream s(&data, QIODevice::WriteOnly); + Table::ConstIterator it; + Table::ConstIterator itEnd = table.constEnd(); + for (it = table.constBegin(); it != itEnd; ++it) { + Table::writeFieldValue(s, ShortString, it.key()); + Table::writeFieldValue(s, it.value()); + } + + if (data.isEmpty()) { + stream << qint32(0); + } else { + stream << data; + } + + return stream; +} + +QDataStream &operator>>(QDataStream &stream, Table &table) +{ + QByteArray data; + stream >> data; + QDataStream tableStream(&data, QIODevice::ReadOnly); + while (!tableStream.atEnd()) { + qint8 octet = 0; + QString field = Frame::readAmqpField(tableStream, ShortString).toString(); + tableStream >> octet; + table[field] = Table::readFieldValue(tableStream, valueTypeForOctet(octet)); + } + + return stream; +} diff --git a/src/amqp_table.h b/src/amqp_table.h new file mode 100644 index 0000000..f6f448a --- /dev/null +++ b/src/amqp_table.h @@ -0,0 +1,29 @@ +#ifndef amqp_table_h__ +#define amqp_table_h__ + +#include "amqp_global.h" +#include + +namespace QAMQP { + +class QAMQP_EXPORT Table : public QVariantHash +{ +public: + Table() {} + inline Table(const QVariantHash &variantHash) + : QVariantHash(variantHash) + { + } + + static void writeFieldValue(QDataStream &stream, const QVariant &value); + static void writeFieldValue(QDataStream &stream, ValueType type, const QVariant &value); + static QVariant readFieldValue(QDataStream &stream, ValueType type); +}; + +} // namespace QAMQP + +QAMQP_EXPORT QDataStream &operator<<(QDataStream &, const QAMQP::Table &table); +QAMQP_EXPORT QDataStream &operator>>(QDataStream &, QAMQP::Table &table); +Q_DECLARE_METATYPE(QAMQP::Table) + +#endif // amqp_table_h__ diff --git a/src/src.pro b/src/src.pro index 2f4b128..30e450d 100644 --- a/src/src.pro +++ b/src/src.pro @@ -25,7 +25,8 @@ INSTALL_HEADERS += \ amqp_frame.h \ amqp_global.h \ amqp_message.h \ - amqp_queue.h + amqp_queue.h \ + amqp_table.h HEADERS += \ $${INSTALL_HEADERS} \ @@ -38,7 +39,8 @@ SOURCES += \ amqp_exchange.cpp \ amqp_frame.cpp \ amqp_message.cpp \ - amqp_queue.cpp + amqp_queue.cpp \ + amqp_table.cpp # install headers.files = $${INSTALL_HEADERS} diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 48ede33..5542240 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -1,3 +1,5 @@ +#include + #include #include @@ -42,6 +44,7 @@ private Q_SLOTS: void invalidQos(); void qos(); void invalidRoutingKey(); + void tableFieldDataTypes(); private: void declareQueueAndVerifyConsuming(Queue *queue); @@ -531,5 +534,84 @@ void tst_QAMQPQueue::invalidRoutingKey() QCOMPARE(client->error(), QAMQP::FrameError); } +void tst_QAMQPQueue::tableFieldDataTypes() +{ + Queue *queue = client->createQueue("test-table-field-data-types"); + declareQueueAndVerifyConsuming(queue); + + Frame::decimal decimal; + decimal.scale = 2; + decimal.value = 12345; + QVariant decimalVariant = QVariant::fromValue(decimal); + + Table nestedTable; + nestedTable.insert("boolean", true); + nestedTable.insert("long-int", qint32(-65536)); + + QVariantList array; + array.append(true); + array.append(qint32(-65536)); + + QDateTime timestamp = QDateTime::currentDateTime(); + + Table headers; + headers.insert("boolean", true); + headers.insert("short-short-int", qint8(-15)); + headers.insert("short-short-uint", quint8(15)); + headers.insert("short-int", qint16(-256)); + headers.insert("short-uint", QVariant::fromValue(quint16(256))); + headers.insert("long-int", qint32(-65536)); + headers.insert("long-uint", quint32(65536)); + headers.insert("long-long-int", qint64(-2147483648)); + headers.insert("long-long-uint", quint64(2147483648)); + headers.insert("float", 230.7); + headers.insert("double", double(FLT_MAX)); + headers.insert("decimal-value", decimalVariant); + headers.insert("short-string", QLatin1String("test")); + headers.insert("long-string", QLatin1String("test")); + headers.insert("timestamp", timestamp); + headers.insert("nested-table", nestedTable); + headers.insert("array", array); + headers.insert("bytes", QByteArray("abcdefg1234567")); + + Exchange *defaultExchange = client->createExchange(); + defaultExchange->publish("dummy", "test-table-field-data-types", "text.plain", headers); + + QVERIFY(waitForSignal(queue, SIGNAL(messageReceived()))); + Message message = queue->dequeue(); + + QCOMPARE(message.header("boolean").toBool(), true); + QCOMPARE(qint8(message.header("short-short-int").toInt()), qint8(-15)); + QCOMPARE(quint8(message.header("short-short-uint").toUInt()), quint8(15)); + QCOMPARE(qint16(message.header("short-int").toInt()), qint16(-256)); + QCOMPARE(quint16(message.header("short-uint").toUInt()), quint16(256)); + QCOMPARE(qint32(message.header("long-int").toInt()), qint32(-65536)); + QCOMPARE(quint32(message.header("long-uint").toUInt()), quint32(65536)); + QCOMPARE(qint64(message.header("long-long-int").toLongLong()), qint64(-2147483648)); + QCOMPARE(quint64(message.header("long-long-uint").toLongLong()), quint64(2147483648)); + QCOMPARE(message.header("float").toFloat(), float(230.7)); + QCOMPARE(message.header("double").toDouble(), double(FLT_MAX)); + QCOMPARE(message.header("short-string").toString(), QLatin1String("test")); + QCOMPARE(message.header("long-string").toString(), QLatin1String("test")); + QCOMPARE(message.header("timestamp").toDateTime(), timestamp); + QCOMPARE(message.header("bytes").toByteArray(), QByteArray("abcdefg1234567")); + + QVERIFY(message.hasHeader("nested-table")); + Table compareTable(message.header("nested-table").toHash()); + foreach (QString key, nestedTable.keys()) { + QVERIFY(compareTable.contains(key)); + QCOMPARE(nestedTable.value(key), compareTable.value(key)); + } + + QVERIFY(message.hasHeader("array")); + QVariantList compareArray = message.header("array").toList(); + QCOMPARE(array, compareArray); + + Frame::decimal receivedDecimal = message.header("decimal-value").value(); + QCOMPARE(receivedDecimal.scale, qint8(2)); + QCOMPARE(receivedDecimal.value, quint32(12345)); +} + + QTEST_MAIN(tst_QAMQPQueue) #include "tst_qamqpqueue.moc"