refactor type handling in QAMQP

There are two distinctive type sets in AMQP, the basic AMQP types
and then the superset of Table value types. This commit attempts to
make that distinction more clear by the addition of a Table class.
Basically, AMQP value type read/write support is left in Frame for the
time being, while table value field read/write support has been moved
to the Table class. Also, a number of type differences exist between
the spec and rabbitmq's binary parser (noted in the errata page) which
were not previously honored.
This commit is contained in:
Matt Broadstone 2014-08-03 16:39:31 -04:00
parent 4ea3bdb851
commit d5cc6258c6
15 changed files with 705 additions and 411 deletions

View File

@ -1,6 +1,6 @@
#include "amqp_authenticator.h" #include "amqp_table.h"
#include "amqp_frame.h" #include "amqp_frame.h"
#include "amqp_authenticator.h"
using namespace QAMQP; using namespace QAMQP;
AMQPlainAuthenticator::AMQPlainAuthenticator(const QString &l, const QString &p) AMQPlainAuthenticator::AMQPlainAuthenticator(const QString &l, const QString &p)
@ -40,9 +40,9 @@ void AMQPlainAuthenticator::setPassword(const QString &p)
void AMQPlainAuthenticator::write(QDataStream &out) void AMQPlainAuthenticator::write(QDataStream &out)
{ {
Frame::writeField('s', out, type()); Frame::writeAmqpField(out, ShortString, type());
Frame::TableField response; Table response;
response["LOGIN"] = login_; response["LOGIN"] = login_;
response["PASSWORD"] = password_; response["PASSWORD"] = password_;
Frame::serialize(out, response); out << response;
} }

View File

@ -156,10 +156,10 @@ void ChannelPrivate::close(int code, const QString &text, int classId, int metho
QByteArray arguments; QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly); QDataStream stream(&arguments, QIODevice::WriteOnly);
Frame::writeField('s',stream, client->virtualHost()); Frame::writeAmqpField(stream, ShortString, client->virtualHost());
stream << qint16(code); stream << qint16(code);
Frame::writeField('s', stream, text); Frame::writeAmqpField(stream, ShortString, text);
stream << qint16(classId); stream << qint16(classId);
stream << qint16(methodId); stream << qint16(methodId);
@ -176,7 +176,8 @@ void ChannelPrivate::close(const Frame::Method &frame)
QDataStream stream(&data, QIODevice::ReadOnly); QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code = 0, classId, methodId; qint16 code = 0, classId, methodId;
stream >> code; stream >> code;
QString text(Frame::readField('s', stream).toString()); QString text = Frame::readAmqpField(stream, ShortString).toString();
stream >> classId; stream >> classId;
stream >> methodId; stream >> methodId;

View File

@ -1,18 +1,18 @@
#include "amqp_client.h"
#include "amqp_client_p.h"
#include "amqp_global.h"
#include "amqp_exchange.h"
#include "amqp_exchange_p.h"
#include "amqp_queue.h"
#include "amqp_queue_p.h"
#include "amqp_authenticator.h"
#include <QTimer> #include <QTimer>
#include <QTcpSocket> #include <QTcpSocket>
#include <QTextStream> #include <QTextStream>
#include <QStringList> #include <QStringList>
#include <QtEndian> #include <QtEndian>
#include "amqp_global.h"
#include "amqp_exchange.h"
#include "amqp_exchange_p.h"
#include "amqp_queue.h"
#include "amqp_queue_p.h"
#include "amqp_authenticator.h"
#include "amqp_table.h"
#include "amqp_client_p.h"
#include "amqp_client.h"
using namespace QAMQP; using namespace QAMQP;
ClientPrivate::ClientPrivate(Client *q) ClientPrivate::ClientPrivate(Client *q)
@ -88,7 +88,6 @@ void ClientPrivate::parseConnectionString(const QString &uri)
return; return;
} }
port = connectionString.port(AMQP_PORT); port = connectionString.port(AMQP_PORT);
host = connectionString.host(); host = connectionString.host();
@ -329,21 +328,22 @@ void ClientPrivate::start(const Frame::Method &frame)
qAmqpDebug(">> Start"); qAmqpDebug(">> Start");
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly); QDataStream stream(&data, QIODevice::ReadOnly);
quint8 version_major = 0; quint8 version_major = 0;
quint8 version_minor = 0; quint8 version_minor = 0;
stream >> version_major >> version_minor; stream >> version_major >> version_minor;
Frame::TableField table; Table table;
Frame::deserialize(stream, table); stream >> table;
QStringList mechanisms = Frame::readField('S', stream).toString().split(' '); QStringList mechanisms = Frame::readAmqpField(stream, LongString).toString().split(' ');
QString locales = Frame::readField('S', stream).toString(); QString locales = Frame::readAmqpField(stream, LongString).toString();
qAmqpDebug(">> version_major: %d", version_major); qAmqpDebug(">> version_major: %d", version_major);
qAmqpDebug(">> version_minor: %d", version_minor); qAmqpDebug(">> version_minor: %d", version_minor);
Frame::print(table); // NOTE: replace with qDebug overload
// Frame::print(table);
qAmqpDebug() << ">> mechanisms: " << mechanisms; qAmqpDebug() << ">> mechanisms: " << mechanisms;
qAmqpDebug(">> locales: %s", qPrintable(locales)); qAmqpDebug(">> locales: %s", qPrintable(locales));
@ -427,7 +427,7 @@ void ClientPrivate::close(const Frame::Method &frame)
QDataStream stream(&data, QIODevice::ReadOnly); QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code = 0, classId, methodId; qint16 code = 0, classId, methodId;
stream >> code; stream >> code;
QString text(Frame::readField('s', stream).toString()); QString text = Frame::readAmqpField(stream, ShortString).toString();
stream >> classId; stream >> classId;
stream >> methodId; stream >> methodId;
@ -452,15 +452,15 @@ void ClientPrivate::startOk()
QByteArray arguments; QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly); QDataStream stream(&arguments, QIODevice::WriteOnly);
Frame::TableField clientProperties; Table clientProperties;
clientProperties["version"] = QString(QAMQP_VERSION); clientProperties["version"] = QString(QAMQP_VERSION);
clientProperties["platform"] = QString("Qt %1").arg(qVersion()); clientProperties["platform"] = QString("Qt %1").arg(qVersion());
clientProperties["product"] = QString("QAMQP"); clientProperties["product"] = QString("QAMQP");
clientProperties.unite(customProperties); clientProperties.unite(customProperties);
Frame::serialize(stream, clientProperties); stream << clientProperties;
authenticator->write(stream); authenticator->write(stream);
Frame::writeField('s', stream, "en_US"); Frame::writeAmqpField(stream, ShortString, QLatin1String("en_US"));
frame.setArguments(arguments); frame.setArguments(arguments);
sendFrame(frame); sendFrame(frame);
@ -491,7 +491,7 @@ void ClientPrivate::open()
QByteArray arguments; QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly); QDataStream stream(&arguments, QIODevice::WriteOnly);
Frame::writeField('s',stream, virtualHost); Frame::writeAmqpField(stream, ShortString, virtualHost);
stream << qint8(0); stream << qint8(0);
stream << qint8(0); stream << qint8(0);
@ -507,7 +507,7 @@ void ClientPrivate::close(int code, const QString &text, int classId, int method
QDataStream stream(&arguments, QIODevice::WriteOnly); QDataStream stream(&arguments, QIODevice::WriteOnly);
stream << qint16(code); stream << qint16(code);
Frame::writeField('s', stream, text); Frame::writeAmqpField(stream, ShortString, text);
stream << qint16(classId); stream << qint16(classId);
stream << qint16(methodId); stream << qint16(methodId);

View File

@ -6,6 +6,12 @@
#include <QPointer> #include <QPointer>
#include <QAbstractSocket> #include <QAbstractSocket>
#ifndef QT_NO_SSL
# include <QSslConfiguration>
# include <QSslError>
#endif
#include "amqp_table.h"
#include "amqp_frame.h" #include "amqp_frame.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok #define METHOD_ID_ENUM(name, id) name = id, name ## Ok
@ -92,7 +98,7 @@ public:
bool closed; bool closed;
bool connected; bool connected;
QPointer<QTimer> heartbeatTimer; QPointer<QTimer> heartbeatTimer;
Frame::TableField customProperties; Table customProperties;
qint16 channelMax; qint16 channelMax;
qint16 heartbeatDelay; qint16 heartbeatDelay;
qint32 frameMax; qint32 frameMax;
@ -106,6 +112,7 @@ public:
}; };
#ifndef QT_NO_SSL #ifndef QT_NO_SSL
class SslClient;
class SslClientPrivate : public ClientPrivate class SslClientPrivate : public ClientPrivate
{ {
public: public:

View File

@ -48,11 +48,11 @@ void ExchangePrivate::declare()
QDataStream stream(&args, QIODevice::WriteOnly); QDataStream stream(&args, QIODevice::WriteOnly);
stream << qint16(0); //reserved 1 stream << qint16(0); //reserved 1
Frame::writeField('s', stream, name); Frame::writeAmqpField(stream, ShortString, name);
Frame::writeField('s', stream, type); Frame::writeAmqpField(stream, ShortString, type);
stream << qint8(options); stream << qint8(options);
Frame::writeField('F', stream, arguments); Frame::writeAmqpField(stream, Hash, arguments);
frame.setArguments(args); frame.setArguments(args);
sendFrame(frame); sendFrame(frame);
@ -133,9 +133,9 @@ void ExchangePrivate::basicReturn(const Frame::Method &frame)
quint16 replyCode; quint16 replyCode;
stream >> replyCode; stream >> replyCode;
QString replyText = Frame::readField('s', stream).toString(); QString replyText = Frame::readAmqpField(stream, ShortString).toString();
QString exchangeName = Frame::readField('s', stream).toString(); QString exchangeName = Frame::readAmqpField(stream, ShortString).toString();
QString routingKey = Frame::readField('s', stream).toString(); QString routingKey = Frame::readAmqpField(stream, ShortString).toString();
Error checkError = static_cast<Error>(replyCode); Error checkError = static_cast<Error>(replyCode);
if (checkError != QAMQP::NoError) { if (checkError != QAMQP::NoError) {
@ -186,12 +186,12 @@ QString Exchange::type() const
return d->type; return d->type;
} }
void Exchange::declare(ExchangeType type, ExchangeOptions options , const Frame::TableField &args) void Exchange::declare(ExchangeType type, ExchangeOptions options, const Table &args)
{ {
declare(ExchangePrivate::typeToString(type), options, args); declare(ExchangePrivate::typeToString(type), options, args);
} }
void Exchange::declare(const QString &type, ExchangeOptions options , const Frame::TableField &args) void Exchange::declare(const QString &type, ExchangeOptions options, const Table &args)
{ {
Q_D(Exchange); Q_D(Exchange);
d->type = type; d->type = type;
@ -210,7 +210,7 @@ void Exchange::remove(int options)
QDataStream stream(&arguments, QIODevice::WriteOnly); QDataStream stream(&arguments, QIODevice::WriteOnly);
stream << qint16(0); //reserved 1 stream << qint16(0); //reserved 1
Frame::writeField('s', stream, d->name); Frame::writeAmqpField(stream, ShortString, d->name);
stream << qint8(options); stream << qint8(options);
frame.setArguments(arguments); frame.setArguments(arguments);
@ -232,7 +232,7 @@ void Exchange::publish(const QByteArray &message, const QString &routingKey,
} }
void Exchange::publish(const QByteArray &message, const QString &routingKey, void Exchange::publish(const QByteArray &message, const QString &routingKey,
const QString &mimeType, const QVariantHash &headers, const QString &mimeType, const Table &headers,
const MessageProperties &properties, int publishOptions) const MessageProperties &properties, int publishOptions)
{ {
Q_D(Exchange); Q_D(Exchange);
@ -243,8 +243,8 @@ void Exchange::publish(const QByteArray &message, const QString &routingKey,
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserved 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeAmqpField(out, ShortString, d->name);
Frame::writeField('s', out, routingKey); Frame::writeAmqpField(out, ShortString, routingKey);
out << qint8(publishOptions); out << qint8(publishOptions);
frame.setArguments(arguments); frame.setArguments(arguments);

View File

@ -1,6 +1,7 @@
#ifndef amqp_exchange_h__ #ifndef amqp_exchange_h__
#define amqp_exchange_h__ #define amqp_exchange_h__
#include "amqp_table.h"
#include "amqp_channel.h" #include "amqp_channel.h"
namespace QAMQP namespace QAMQP
@ -56,10 +57,10 @@ public:
// AMQP Exchange // AMQP Exchange
void declare(ExchangeType type = Direct, void declare(ExchangeType type = Direct,
ExchangeOptions options = NoOptions, ExchangeOptions options = NoOptions,
const Frame::TableField &args = Frame::TableField()); const Table &args = Table());
void declare(const QString &type = QLatin1String("direct"), void declare(const QString &type = QLatin1String("direct"),
ExchangeOptions options = NoOptions, ExchangeOptions options = NoOptions,
const Frame::TableField &args = Frame::TableField()); const Table &args = Table());
void remove(int options = roIfUnused|roNoWait); void remove(int options = roIfUnused|roNoWait);
// AMQP Basic // AMQP Basic
@ -70,7 +71,7 @@ public:
const QString &mimeType, const MessageProperties &properties = MessageProperties(), const QString &mimeType, const MessageProperties &properties = MessageProperties(),
int publishOptions = poNoOptions); int publishOptions = poNoOptions);
void publish(const QByteArray &message, const QString &routingKey, void publish(const QByteArray &message, const QString &routingKey,
const QString &mimeType, const QVariantHash &headers, const QString &mimeType, const Table &headers,
const MessageProperties &properties = MessageProperties(), const MessageProperties &properties = MessageProperties(),
int publishOptions = poNoOptions); int publishOptions = poNoOptions);

View File

@ -1,6 +1,7 @@
#ifndef amqp_exchange_p_h__ #ifndef amqp_exchange_p_h__
#define amqp_exchange_p_h__ #define amqp_exchange_p_h__
#include "amqp_table.h"
#include "amqp_exchange.h" #include "amqp_exchange.h"
#include "amqp_channel_p.h" #include "amqp_channel_p.h"
@ -29,7 +30,7 @@ public:
QString type; QString type;
Exchange::ExchangeOptions options; Exchange::ExchangeOptions options;
Frame::TableField arguments; Table arguments;
bool delayedDeclare; bool delayedDeclare;
bool declared; bool declared;

View File

@ -1,11 +1,10 @@
#include "amqp_frame.h" #include "amqp_frame.h"
#include <float.h>
#include <QDateTime> #include <QDateTime>
#include <QList> #include <QList>
#include <QDebug> #include <QDebug>
#include "amqp_table.h"
#include "amqp_global.h" #include "amqp_global.h"
using namespace QAMQP; using namespace QAMQP;
@ -141,244 +140,99 @@ void Method::writePayload(QDataStream &stream) const
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
QVariant Frame::readField(qint8 valueType, QDataStream &s) QVariant Frame::readAmqpField(QDataStream &s, QAMQP::ValueType type)
{ {
QVariant value; switch (type) {
QByteArray tmp; case Boolean:
qint8 nameSize_ = 0;
char octet = 0;
switch(valueType) {
case 't':
s.readRawData(&octet, sizeof(octet));
value = QVariant::fromValue<bool>(octet > 0);
break;
case 'b':
s.readRawData(&octet, sizeof(octet));
value = QVariant::fromValue<int>(octet);
break;
case 'B':
s.readRawData(&octet, sizeof(octet));
value = QVariant::fromValue<uint>(octet);
break;
case 'U':
{ {
qint16 tmp_value_ = 0; quint8 octet = 0;
s >> tmp_value_; s >> octet;
value = QVariant::fromValue<int>(tmp_value_); return QVariant::fromValue<bool>(octet > 0);
break;
} }
case 'u': case ShortShortUint:
{ {
quint16 tmp_value_ = 0; quint8 octet = 0;
s >> tmp_value_; s >> octet;
value = QVariant::fromValue<uint>(tmp_value_); return QVariant::fromValue<int>(octet);
break;
} }
case 'I': case ShortUint:
{ {
qint32 tmp_value_ = 0; quint16 tmp_value = 0;
s >> tmp_value_; s >> tmp_value;
value = QVariant::fromValue<int>(tmp_value_); return QVariant::fromValue<uint>(tmp_value);
break;
} }
case 'i': case LongUint:
{ {
quint32 tmp_value_ = 0; quint32 tmp_value = 0;
s >> tmp_value_; s >> tmp_value;
value = QVariant::fromValue<uint>(tmp_value_); return QVariant::fromValue<uint>(tmp_value);
break;
} }
case 'L': case LongLongUint:
{
qlonglong v = 0 ;
s >> v;
value = v;
}
break;
case 'l':
{ {
qulonglong v = 0 ; qulonglong v = 0 ;
s >> v; s >> v;
value = v; return v;
} }
case ShortString:
{
qint8 size = 0;
QByteArray buffer;
break; s >> size;
case 'f': buffer.resize(size);
{ s.readRawData(buffer.data(), buffer.size());
float tmp_value_; return QString::fromUtf8(buffer.data(), size);
s >> tmp_value_;
value = QVariant::fromValue<float>(tmp_value_);
break;
} }
case 'd': case LongString:
{ {
double tmp_value_; quint32 size = 0;
s >> tmp_value_; QByteArray buffer;
value = QVariant::fromValue<double>(tmp_value_);
break; s >> size;
buffer.resize(size);
s.readRawData(buffer.data(), buffer.size());
return QString::fromUtf8(buffer.data(), buffer.size());
} }
case 'D': case Timestamp:
{ {
Frame::decimal v; qulonglong tmp_value;
s >> v.scale; s >> tmp_value;
s >> v.value; return QDateTime::fromMSecsSinceEpoch(tmp_value);
value = QVariant::fromValue<Frame::decimal>(v);
} }
break; case Hash:
case 's':
s >> nameSize_;
tmp.resize(nameSize_);
s.readRawData(tmp.data(), tmp.size());
value = QString::fromLatin1(tmp.data(), nameSize_);
break;
case 'S':
{ {
quint32 length_ = 0; Table table;
s >> length_; s >> table;
nameSize_ = length_; return table;
tmp.resize(length_);
} }
s.readRawData(tmp.data(), tmp.size()); case Void:
value = QString::fromLatin1(tmp.data(), tmp.size()); return QVariant();
break;
case 'A':
{
qint32 length_ = 0;
qint8 type = 0;
s >> length_;
QList<QVariant> array_;
for (int i =0; i < length_; ++i) {
s >> type;
array_ << readField(type, s);
}
value = array_;
}
break;
case 'T':
{
qulonglong tmp_value_;
s >> tmp_value_;
value = QDateTime::fromMSecsSinceEpoch(tmp_value_);
break;
}
case 'F':
{
TableField table_;
deserialize(s, table_);
value = table_;
}
break;
case 'V':
break;
default: default:
qWarning("Unknown field type"); qWarning() << Q_FUNC_INFO << "unsupported value type: " << type;
} }
return value;
return QVariant();
} }
QDataStream &Frame::deserialize(QDataStream &stream, Frame::TableField &f) void Frame::writeAmqpField(QDataStream &s, QAMQP::ValueType type, const QVariant &value)
{ {
QByteArray data; switch (type) {
stream >> data; case Boolean:
QDataStream s(&data, QIODevice::ReadOnly);
while (!s.atEnd()) {
qint8 valueType = 0;
QString name = readField('s', s).toString();
s >> valueType;
f[name] = readField(valueType, s);
}
return stream;
}
QDataStream &Frame::serialize(QDataStream &stream, const TableField &f)
{
QByteArray data;
QDataStream s(&data, QIODevice::WriteOnly);
TableField::ConstIterator it;
TableField::ConstIterator itEnd = f.constEnd();
for (it = f.constBegin(); it != itEnd; ++it) {
writeField('s', s, it.key());
writeField(s, it.value());
}
if (data.isEmpty()) {
stream << qint32(0);
} else {
stream << data;
}
return stream;
}
void Frame::print(const TableField &f)
{
TableField::ConstIterator it;
TableField::ConstIterator itEnd = f.constEnd();
for (it = f.constBegin(); it != itEnd; ++it) {
switch(it.value().type()) {
case QVariant::Hash:
qAmqpDebug() << "\t" << qPrintable(it.key()) << ": FIELD_TABLE";
break;
case QVariant::List:
qAmqpDebug() << "\t" << qPrintable(it.key()) << ": ARRAY";
break;
default:
qAmqpDebug() << "\t" << qPrintable(it.key()) << ": " << it.value();
}
}
}
void Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType)
{
if (withType)
s << valueType;
switch (valueType) {
case 't':
s << (value.toBool() ? qint8(1) : qint8(0)); s << (value.toBool() ? qint8(1) : qint8(0));
break; break;
case 'b': case ShortShortUint:
s << qint8(value.toInt()); s << qint8(value.toUInt());
break; break;
case 'B': case ShortUint:
s << quint8(value.toUInt());
break;
case 'U':
s << qint16(value.toInt());
break;
case 'u':
s << quint16(value.toUInt()); s << quint16(value.toUInt());
break; break;
case 'I': case LongUint:
s << qint32(value.toInt());
break;
case 'i':
s << quint32(value.toUInt()); s << quint32(value.toUInt());
break; break;
case 'L': case LongLongUint:
s << qlonglong(value.toLongLong());
break;
case 'l':
s << qulonglong(value.toULongLong()); s << qulonglong(value.toULongLong());
break; break;
case 'f': case ShortString:
s << value.toFloat();
break;
case 'd':
s << value.toDouble();
break;
case 'D':
{
Frame::decimal v(value.value<Frame::decimal>());
s << v.scale;
s << v.value;
}
break;
case 's':
{ {
QString str = value.toString(); QString str = value.toString();
if (str.length() >= 256) { if (str.length() >= 256) {
@ -386,104 +240,30 @@ void Frame::writeField(qint8 valueType, QDataStream &s, const QVariant &value, b
} }
s << quint8(str.length()); s << quint8(str.length());
s.writeRawData(str.toLatin1().data(), str.length()); s.writeRawData(str.toUtf8().data(), str.length());
} }
break; break;
case 'S': case LongString:
{ {
QString str = value.toString(); QString str = value.toString();
s << quint32(str.length()); s << quint32(str.length());
s.writeRawData(str.toLatin1().data(), str.length()); s.writeRawData(str.toLatin1().data(), str.length());
} }
break; break;
case 'A': case Timestamp:
{
QList<QVariant> array_(value.toList());
s << quint32(array_.count());
for (int i =0; i < array_.count(); ++i)
writeField(s, array_.at(i));
}
break;
case 'T':
s << qulonglong(value.toDateTime().toMSecsSinceEpoch()); s << qulonglong(value.toDateTime().toMSecsSinceEpoch());
break; break;
case 'F': case Hash:
{ {
TableField table_(value.toHash()); Table table(value.toHash());
serialize(s, table_); s << table;
} }
break; break;
case 'V':
break;
default: default:
qWarning("Unknown field type"); qWarning() << Q_FUNC_INFO << "unsupported value type: " << type;
} }
} }
void Frame::writeField(QDataStream &s, const QVariant &value)
{
char type = 0;
switch (value.type()) {
case QVariant::Bool:
type = 't';
break;
case QVariant::ByteArray:
type = 'S';
break;
case QVariant::Int:
{
int i = qAbs(value.toInt());
if (i <= qint8(0xFF)) {
type = 'b';
} else if (i <= qint16(0xFFFF)) {
type = 'U';
} else if (i <= qint16(0xFFFFFFFF)) {
type = 'I';
}
}
break;
case QVariant::UInt:
{
int i = value.toInt();
if (i <= qint8(0xFF)) {
type = 'B';
} else if (i <= qint16(0xFFFF)) {
type = 'u';
} else if (i <= qint16(0xFFFFFFFF)) {
type = 'i';
}
}
break;
case QVariant::LongLong:
type = 'L';
break;
case QVariant::ULongLong:
type = 'l';
break;
case QVariant::String:
type = 'S';
break;
case QVariant::DateTime:
type = 'T';
break;
case QVariant::Double:
type = value.toDouble() > FLT_MAX ? 'd' : 'f';
break;
case QVariant::Hash:
type = 'F';
break;
case QVariant::List:
type = 'A';
break;
default:
qAmqpDebug() << Q_FUNC_INFO << "unhandled variant type: " << value.type();
}
if (type)
writeField(type, s, value, true);
}
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
Content::Content() Content::Content()
@ -522,46 +302,46 @@ qint32 Content::size() const
out << prop_; out << prop_;
if (prop_ & cpContentType) if (prop_ & cpContentType)
writeField('s', out, properties_[cpContentType]); writeAmqpField(out, ShortString, properties_[cpContentType]);
if (prop_ & cpContentEncoding) if (prop_ & cpContentEncoding)
writeField('s', out, properties_[cpContentEncoding]); writeAmqpField(out, ShortString, properties_[cpContentEncoding]);
if (prop_ & cpHeaders) if (prop_ & cpHeaders)
writeField('F', out, properties_[cpHeaders]); writeAmqpField(out, Hash, properties_[cpHeaders]);
if (prop_ & cpDeliveryMode) if (prop_ & cpDeliveryMode)
writeField('b', out, properties_[cpDeliveryMode]); writeAmqpField(out, ShortShortUint, properties_[cpDeliveryMode]);
if (prop_ & cpPriority) if (prop_ & cpPriority)
writeField('b', out, properties_[cpPriority]); writeAmqpField(out, ShortShortUint, properties_[cpPriority]);
if (prop_ & cpCorrelationId) if (prop_ & cpCorrelationId)
writeField('s', out, properties_[cpCorrelationId]); writeAmqpField(out, ShortString, properties_[cpCorrelationId]);
if (prop_ & cpReplyTo) if (prop_ & cpReplyTo)
writeField('s', out, properties_[cpReplyTo]); writeAmqpField(out, ShortString, properties_[cpReplyTo]);
if (prop_ & cpExpiration) if (prop_ & cpExpiration)
writeField('s', out, properties_[cpExpiration]); writeAmqpField(out, ShortString, properties_[cpExpiration]);
if (prop_ & cpMessageId) if (prop_ & cpMessageId)
writeField('s', out, properties_[cpMessageId]); writeAmqpField(out, ShortString, properties_[cpMessageId]);
if (prop_ & cpTimestamp) if (prop_ & cpTimestamp)
writeField('T', out, properties_[cpTimestamp]); writeAmqpField(out, Timestamp, properties_[cpTimestamp]);
if (prop_ & cpType) if (prop_ & cpType)
writeField('s', out, properties_[cpType]); writeAmqpField(out, ShortString, properties_[cpType]);
if (prop_ & cpUserId) if (prop_ & cpUserId)
writeField('s', out, properties_[cpUserId]); writeAmqpField(out, ShortString, properties_[cpUserId]);
if (prop_ & cpAppId) if (prop_ & cpAppId)
writeField('s', out, properties_[cpAppId]); writeAmqpField(out, ShortString, properties_[cpAppId]);
if (prop_ & cpClusterID) if (prop_ & cpClusterID)
writeField('s', out, properties_[cpClusterID]); writeAmqpField(out, ShortString, properties_[cpClusterID]);
return buffer_.size(); return buffer_.size();
} }
@ -599,46 +379,46 @@ void Content::readPayload(QDataStream &in)
qint16 flags_ = 0; qint16 flags_ = 0;
in >> flags_; in >> flags_;
if (flags_ & cpContentType) if (flags_ & cpContentType)
properties_[cpContentType] = readField('s', in); properties_[cpContentType] = readAmqpField(in, ShortString);
if (flags_ & cpContentEncoding) if (flags_ & cpContentEncoding)
properties_[cpContentEncoding] = readField('s', in); properties_[cpContentEncoding] = readAmqpField(in, ShortString);
if (flags_ & cpHeaders) if (flags_ & cpHeaders)
properties_[cpHeaders] = readField('F', in); properties_[cpHeaders] = readAmqpField(in, Hash);
if (flags_ & cpDeliveryMode) if (flags_ & cpDeliveryMode)
properties_[cpDeliveryMode] = readField('b', in); properties_[cpDeliveryMode] = readAmqpField(in, ShortShortUint);
if (flags_ & cpPriority) if (flags_ & cpPriority)
properties_[cpPriority] = readField('b', in); properties_[cpPriority] = readAmqpField(in, ShortShortUint);
if (flags_ & cpCorrelationId) if (flags_ & cpCorrelationId)
properties_[cpCorrelationId] = readField('s', in); properties_[cpCorrelationId] = readAmqpField(in, ShortString);
if (flags_ & cpReplyTo) if (flags_ & cpReplyTo)
properties_[cpReplyTo] = readField('s', in); properties_[cpReplyTo] = readAmqpField(in, ShortString);
if (flags_ & cpExpiration) if (flags_ & cpExpiration)
properties_[cpExpiration] = readField('s', in); properties_[cpExpiration] = readAmqpField(in, ShortString);
if (flags_ & cpMessageId) if (flags_ & cpMessageId)
properties_[cpMessageId] = readField('s', in); properties_[cpMessageId] = readAmqpField(in, ShortString);
if (flags_ & cpTimestamp) if (flags_ & cpTimestamp)
properties_[cpTimestamp] = readField('T', in); properties_[cpTimestamp] = readAmqpField(in, Timestamp);
if (flags_ & cpType) if (flags_ & cpType)
properties_[cpType] = readField('s', in); properties_[cpType] = readAmqpField(in, ShortString);
if (flags_ & cpUserId) if (flags_ & cpUserId)
properties_[cpUserId] = readField('s', in); properties_[cpUserId] = readAmqpField(in, ShortString);
if (flags_ & cpAppId) if (flags_ & cpAppId)
properties_[cpAppId] = readField('s', in); properties_[cpAppId] = readAmqpField(in, ShortString);
if (flags_ & cpClusterID) if (flags_ & cpClusterID)
properties_[cpClusterID] = readField('s', in); properties_[cpClusterID] = readAmqpField(in, ShortString);
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////

View File

@ -65,20 +65,8 @@ namespace Frame
quint32 value; quint32 value;
}; };
/* QVariant readAmqpField(QDataStream &s, QAMQP::ValueType type);
* @brief Definition implementation of TableField type void writeAmqpField(QDataStream &s, QAMQP::ValueType type, const QVariant &value);
* @detailed Define implementation TableField type in builtin Qt types.
* Key contains field name, value contains field data.
* It can by any type witch support serialization in AMQP types.
*/
typedef QHash<QString, QVariant> TableField;
QDataStream &serialize(QDataStream &stream, const Frame::TableField &f);
QDataStream &deserialize(QDataStream &stream, Frame::TableField &f);
QVariant readField(qint8 valueType, QDataStream &s);
void writeField(QDataStream &s, const QVariant &value);
void writeField(qint8 valueType, QDataStream &s, const QVariant &value, bool withType = false);
void print(const Frame::TableField &f);
/* /*
* @brief Base class for any frames. * @brief Base class for any frames.
@ -401,6 +389,6 @@ typedef Frame::Content::Property MessageProperty;
} // namespace QAMQP } // namespace QAMQP
Q_DECLARE_METATYPE(QAMQP::Frame::decimal) Q_DECLARE_METATYPE(QAMQP::Frame::decimal)
Q_DECLARE_METATYPE(QAMQP::Frame::TableField) //Q_DECLARE_METATYPE(QAMQP::Frame::TableField)
#endif // amqp_frame_h__ #endif // amqp_frame_h__

View File

@ -47,8 +47,36 @@
namespace QAMQP { namespace QAMQP {
enum ValueType
{
Invalid = -1,
enum Error { // basic AMQP types
Boolean,
ShortUint,
LongUint,
LongLongUint,
ShortString,
LongString,
// field-value types
ShortShortInt,
ShortShortUint,
ShortInt,
LongInt,
LongLongInt,
Float,
Double,
Decimal,
Array,
Timestamp,
Hash,
Bytes,
Void
};
enum Error
{
NoError = 0, NoError = 0,
ContentTooLargeError = 311, ContentTooLargeError = 311,
UnroutableKey = 312, UnroutableKey = 312,

View File

@ -2,7 +2,7 @@
#include "amqp_queue_p.h" #include "amqp_queue_p.h"
#include "amqp_exchange.h" #include "amqp_exchange.h"
#include "amqp_message_p.h" #include "amqp_message_p.h"
#include "amqp_table.h"
using namespace QAMQP; using namespace QAMQP;
#include <QCoreApplication> #include <QCoreApplication>
@ -90,8 +90,12 @@ void QueuePrivate::_q_content(const Frame::Content &frame)
currentMessage.d->leftSize = frame.bodySize(); currentMessage.d->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator it; QHash<int, QVariant>::ConstIterator it;
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd(); QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
for (it = frame.properties_.constBegin(); it != itEnd; ++it) for (it = frame.properties_.constBegin(); it != itEnd; ++it) {
currentMessage.d->properties[static_cast<Message::Property>(it.key())] = it.value(); Message::Property property = static_cast<Message::Property>(it.key());
if (property == Message::Headers)
currentMessage.d->headers = (it.value()).toHash();
currentMessage.d->properties[property] = it.value();
}
} }
void QueuePrivate::_q_body(const Frame::ContentBody &frame) void QueuePrivate::_q_body(const Frame::ContentBody &frame)
@ -123,7 +127,7 @@ void QueuePrivate::declareOk(const Frame::Method &frame)
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly); QDataStream stream(&data, QIODevice::ReadOnly);
name = Frame::readField('s', stream).toString(); name = Frame::readAmqpField(stream, ShortString).toString();
qint32 messageCount = 0, consumerCount = 0; qint32 messageCount = 0, consumerCount = 0;
stream >> messageCount >> consumerCount; stream >> messageCount >> consumerCount;
qAmqpDebug("message count %d\nConsumer count: %d", messageCount, consumerCount); qAmqpDebug("message count %d\nConsumer count: %d", messageCount, consumerCount);
@ -184,10 +188,10 @@ void QueuePrivate::getOk(const Frame::Method &frame)
QDataStream in(&data, QIODevice::ReadOnly); QDataStream in(&data, QIODevice::ReadOnly);
Message message; Message message;
message.d->deliveryTag = Frame::readField('L',in).toLongLong(); message.d->deliveryTag = Frame::readAmqpField(in, LongLongUint).toLongLong();
message.d->redelivered = Frame::readField('t',in).toBool(); message.d->redelivered = Frame::readAmqpField(in, Boolean).toBool();
message.d->exchangeName = Frame::readField('s',in).toString(); message.d->exchangeName = Frame::readAmqpField(in, ShortString).toString();
message.d->routingKey = Frame::readField('s',in).toString(); message.d->routingKey = Frame::readAmqpField(in, ShortString).toString();
currentMessage = message; currentMessage = message;
} }
@ -197,7 +201,7 @@ void QueuePrivate::consumeOk(const Frame::Method &frame)
qAmqpDebug() << "consume ok: " << name; qAmqpDebug() << "consume ok: " << name;
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly); QDataStream stream(&data, QIODevice::ReadOnly);
consumerTag = Frame::readField('s',stream).toString(); consumerTag = Frame::readAmqpField(stream, ShortString).toString();
qAmqpDebug("consumer tag = %s", qPrintable(consumerTag)); qAmqpDebug("consumer tag = %s", qPrintable(consumerTag));
consuming = true; consuming = true;
Q_EMIT q->consuming(consumerTag); Q_EMIT q->consuming(consumerTag);
@ -208,17 +212,17 @@ void QueuePrivate::deliver(const Frame::Method &frame)
qAmqpDebug() << Q_FUNC_INFO; qAmqpDebug() << Q_FUNC_INFO;
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly); QDataStream in(&data, QIODevice::ReadOnly);
QString consumer = Frame::readField('s',in).toString(); QString consumer = Frame::readAmqpField(in, ShortString).toString();
if (consumerTag != consumer) { if (consumerTag != consumer) {
qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer; qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer;
return; return;
} }
Message message; Message message;
message.d->deliveryTag = Frame::readField('L',in).toLongLong(); message.d->deliveryTag = Frame::readAmqpField(in, LongLongUint).toLongLong();
message.d->redelivered = Frame::readField('t',in).toBool(); message.d->redelivered = Frame::readAmqpField(in, Boolean).toBool();
message.d->exchangeName = Frame::readField('s',in).toString(); message.d->exchangeName = Frame::readAmqpField(in, ShortString).toString();
message.d->routingKey = Frame::readField('s',in).toString(); message.d->routingKey = Frame::readAmqpField(in, ShortString).toString();
currentMessage = message; currentMessage = message;
} }
@ -231,9 +235,9 @@ void QueuePrivate::declare()
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserved 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, name); Frame::writeAmqpField(out, ShortString, name);
out << qint8(options); out << qint8(options);
Frame::writeField('F', out, Frame::TableField()); Frame::writeAmqpField(out, Hash, Table());
frame.setArguments(arguments); frame.setArguments(arguments);
sendFrame(frame); sendFrame(frame);
@ -248,7 +252,7 @@ void QueuePrivate::cancelOk(const Frame::Method &frame)
qAmqpDebug() << Q_FUNC_INFO; qAmqpDebug() << Q_FUNC_INFO;
QByteArray data = frame.arguments(); QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly); QDataStream in(&data, QIODevice::ReadOnly);
QString consumer = Frame::readField('s',in).toString(); QString consumer = Frame::readAmqpField(in, ShortString).toString();
if (consumerTag != consumer) { if (consumerTag != consumer) {
qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer; qAmqpDebug() << Q_FUNC_INFO << "invalid consumer tag: " << consumer;
return; return;
@ -323,7 +327,7 @@ void Queue::remove(int options)
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserved 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeAmqpField(out, ShortString, d->name);
out << qint8(options); out << qint8(options);
frame.setArguments(arguments); frame.setArguments(arguments);
@ -343,7 +347,7 @@ void Queue::purge()
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserved 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeAmqpField(out, ShortString, d->name);
out << qint8(0); // no-wait out << qint8(0); // no-wait
frame.setArguments(arguments); frame.setArguments(arguments);
@ -375,12 +379,12 @@ void Queue::bind(const QString &exchangeName, const QString &key)
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); // reserved 1 out << qint16(0); // reserved 1
Frame::writeField('s', out, d->name); Frame::writeAmqpField(out, ShortString, d->name);
Frame::writeField('s', out, exchangeName); Frame::writeAmqpField(out, ShortString, exchangeName);
Frame::writeField('s', out, key); Frame::writeAmqpField(out, ShortString, key);
out << qint8(0); // no-wait out << qint8(0); // no-wait
Frame::writeField('F', out, Frame::TableField()); Frame::writeAmqpField(out, Hash, Table());
frame.setArguments(arguments); frame.setArguments(arguments);
d->sendFrame(frame); d->sendFrame(frame);
@ -410,10 +414,10 @@ void Queue::unbind(const QString &exchangeName, const QString &key)
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserved 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeAmqpField(out, ShortString, d->name);
Frame::writeField('s', out, exchangeName); Frame::writeAmqpField(out, ShortString, exchangeName);
Frame::writeField('s', out, key); Frame::writeAmqpField(out, ShortString, key);
Frame::writeField('F', out, Frame::TableField()); Frame::writeAmqpField(out, Hash, Table());
frame.setArguments(arguments); frame.setArguments(arguments);
d->sendFrame(frame); d->sendFrame(frame);
@ -439,11 +443,11 @@ bool Queue::consume(int options)
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserved 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeAmqpField(out, ShortString, d->name);
Frame::writeField('s', out, d->consumerTag); Frame::writeAmqpField(out, ShortString, d->consumerTag);
out << qint8(options); out << qint8(options);
Frame::writeField('F', out, Frame::TableField()); Frame::writeAmqpField(out, Hash, Table());
frame.setArguments(arguments); frame.setArguments(arguments);
d->sendFrame(frame); d->sendFrame(frame);
@ -483,7 +487,7 @@ void Queue::get(bool noAck)
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
out << qint16(0); //reserved 1 out << qint16(0); //reserved 1
Frame::writeField('s', out, d->name); Frame::writeAmqpField(out, ShortString, d->name);
out << qint8(noAck ? 1 : 0); // no-ack out << qint8(noAck ? 1 : 0); // no-ack
frame.setArguments(arguments); frame.setArguments(arguments);
@ -530,7 +534,7 @@ bool Queue::cancel(bool noWait)
QByteArray arguments; QByteArray arguments;
QDataStream out(&arguments, QIODevice::WriteOnly); QDataStream out(&arguments, QIODevice::WriteOnly);
Frame::writeField('s', out, d->consumerTag); Frame::writeAmqpField(out, ShortString, d->consumerTag);
out << (noWait ? qint8(0x01) : qint8(0x0)); out << (noWait ? qint8(0x01) : qint8(0x0));
frame.setArguments(arguments); frame.setArguments(arguments);

371
src/amqp_table.cpp Normal file
View File

@ -0,0 +1,371 @@
#include <float.h>
#include <QDateTime>
#include <QDebug>
#include "amqp_frame.h"
#include "amqp_table.h"
using namespace QAMQP;
/*
* field value types according to: https://www.rabbitmq.com/amqp-0-9-1-errata.html
t - Boolean
b - Signed 8-bit
Unsigned 8-bit
s - Signed 16-bit
Unsigned 16-bit
I - Signed 32-bit
Unsigned 32-bit
l - Signed 64-bit
Unsigned 64-bit
f - 32-bit float
d - 64-bit float
D - Decimal
S - Long string
A - Array
T - Timestamp (u64)
F - Nested Table
V - Void
x - Byte array
*/
ValueType valueTypeForOctet(qint8 octet)
{
switch (octet) {
case 't': return Boolean;
case 'b': return ShortShortInt;
case 's': return ShortInt;
case 'I': return LongInt;
case 'l': return LongLongInt;
case 'f': return Float;
case 'd': return Double;
case 'D': return Decimal;
case 'S': return LongString;
case 'A': return Array;
case 'T': return Timestamp;
case 'F': return Hash;
case 'V': return Void;
case 'x': return Bytes;
default:
qAmqpDebug() << Q_FUNC_INFO << "invalid octet received: " << char(octet);
}
return Invalid;
}
qint8 valueTypeToOctet(ValueType type)
{
switch (type) {
case Boolean: return 't';
case ShortShortInt: return 'b';
case ShortInt: return 's';
case LongInt: return 'I';
case LongLongInt: return 'l';
case Float: return 'f';
case Double: return 'd';
case Decimal: return 'D';
case LongString: return 'S';
case Array: return 'A';
case Timestamp: return 'T';
case Hash: return 'F';
case Void: return 'V';
case Bytes: return 'x';
default:
qAmqpDebug() << Q_FUNC_INFO << "invalid type received: " << char(type);
}
return 'V';
}
void Table::writeFieldValue(QDataStream &stream, const QVariant &value)
{
ValueType type;
switch (value.userType()) {
case QMetaType::Bool:
type = Boolean;
break;
case QMetaType::QByteArray:
type = Bytes;
break;
case QMetaType::Int:
{
int i = qAbs(value.toInt());
if (i <= qint8(UINT8_MAX)) {
type = ShortShortInt;
} else if (i <= qint16(UINT16_MAX)) {
type = ShortInt;
} else {
type = LongInt;
}
}
break;
case QMetaType::UShort:
type = ShortInt;
break;
case QMetaType::UInt:
{
int i = value.toInt();
if (i <= qint8(UINT8_MAX)) {
type = ShortShortInt;
} else if (i <= qint16(UINT16_MAX)) {
type = ShortInt;
} else {
type = LongInt;
}
}
break;
case QMetaType::LongLong:
case QMetaType::ULongLong:
type = LongLongInt;
break;
case QMetaType::QString:
type = LongString;
break;
case QMetaType::QDateTime:
type = Timestamp;
break;
case QMetaType::Double:
type = value.toDouble() > FLT_MAX ? Double : Float;
break;
case QMetaType::QVariantHash:
type = Hash;
break;
case QMetaType::QVariantList:
type = Array;
break;
case QMetaType::Void:
type = Void;
break;
default:
if (value.userType() == qMetaTypeId<Frame::decimal>()) {
type = Decimal;
break;
} else if (!value.isValid()) {
type = Void;
break;
}
qAmqpDebug() << Q_FUNC_INFO << "unhandled type: " << value.userType();
return;
}
// write the field value type, a requirement for field tables only
stream << valueTypeToOctet(type);
writeFieldValue(stream, type, value);
}
void Table::writeFieldValue(QDataStream &stream, ValueType type, const QVariant &value)
{
switch (type) {
case Boolean:
case ShortShortUint:
case ShortUint:
case LongUint:
case LongLongUint:
case ShortString:
case LongString:
case Timestamp:
case Hash:
return Frame::writeAmqpField(stream, type, value);
case ShortShortInt:
stream << qint8(value.toInt());
break;
case ShortInt:
stream << qint16(value.toInt());
break;
case LongInt:
stream << qint32(value.toInt());
break;
case LongLongInt:
stream << qlonglong(value.toLongLong());
break;
case Float:
{
float g = value.toFloat();
QDataStream::FloatingPointPrecision oldPrecision = stream.floatingPointPrecision();
stream.setFloatingPointPrecision(QDataStream::SinglePrecision);
stream << g;
stream.setFloatingPointPrecision(oldPrecision);
}
break;
case Double:
{
double g = value.toDouble();
QDataStream::FloatingPointPrecision oldPrecision = stream.floatingPointPrecision();
stream.setFloatingPointPrecision(QDataStream::DoublePrecision);
stream << g;
stream.setFloatingPointPrecision(oldPrecision);
}
break;
case Decimal:
{
Frame::decimal v(value.value<Frame::decimal>());
stream << v.scale;
stream << v.value;
}
break;
case Array:
{
QByteArray buffer;
QDataStream arrayStream(&buffer, QIODevice::WriteOnly);
QVariantList array(value.toList());
for (int i = 0; i < array.size(); ++i)
writeFieldValue(arrayStream, array.at(i));
if (buffer.isEmpty()) {
stream << qint32(0);
} else {
stream << buffer;
}
}
break;
case Bytes:
{
QByteArray ba = value.toByteArray();
stream << quint32(ba.length());
stream.writeRawData(ba.data(), ba.length());
}
break;
case Void:
stream << qint32(0);
break;
default:
qAmqpDebug() << Q_FUNC_INFO << "unhandled type: " << type;
}
}
QVariant Table::readFieldValue(QDataStream &stream, ValueType type)
{
switch (type) {
case Boolean:
case ShortShortUint:
case ShortUint:
case LongUint:
case LongLongUint:
case ShortString:
case LongString:
case Timestamp:
case Hash:
return Frame::readAmqpField(stream, type);
case ShortShortInt:
{
char octet;
stream.readRawData(&octet, sizeof(octet));
return QVariant::fromValue<int>(octet);
}
case ShortInt:
{
qint16 tmp_value = 0;
stream >> tmp_value;
return QVariant::fromValue<int>(tmp_value);
}
case LongInt:
{
qint32 tmp_value = 0;
stream >> tmp_value;
return QVariant::fromValue<int>(tmp_value);
}
case LongLongInt:
{
qlonglong v = 0 ;
stream >> v;
return v;
}
case Float:
{
float tmp_value;
QDataStream::FloatingPointPrecision precision = stream.floatingPointPrecision();
stream.setFloatingPointPrecision(QDataStream::SinglePrecision);
stream >> tmp_value;
stream.setFloatingPointPrecision(precision);
return QVariant::fromValue<float>(tmp_value);
}
case Double:
{
double tmp_value;
QDataStream::FloatingPointPrecision precision = stream.floatingPointPrecision();
stream.setFloatingPointPrecision(QDataStream::DoublePrecision);
stream >> tmp_value;
stream.setFloatingPointPrecision(precision);
return QVariant::fromValue<double>(tmp_value);
}
case Decimal:
{
Frame::decimal v;
stream >> v.scale;
stream >> v.value;
return QVariant::fromValue<Frame::decimal>(v);
}
case Array:
{
QByteArray data;
quint32 size = 0;
stream >> size;
data.resize(size);
stream.readRawData(data.data(), data.size());
qint8 type = 0;
QVariantList result;
QDataStream arrayStream(&data, QIODevice::ReadOnly);
while (!arrayStream.atEnd()) {
arrayStream >> type;
result.append(readFieldValue(arrayStream, valueTypeForOctet(type)));
}
return result;
}
case Bytes:
{
QByteArray bytes;
quint32 length = 0;
stream >> length;
bytes.resize(length);
stream.readRawData(bytes.data(), bytes.size());
return bytes;
}
case Void:
break;
default:
qAmqpDebug() << Q_FUNC_INFO << "unhandled type: " << type;
}
return QVariant();
}
QDataStream &operator<<(QDataStream &stream, const Table &table)
{
QByteArray data;
QDataStream s(&data, QIODevice::WriteOnly);
Table::ConstIterator it;
Table::ConstIterator itEnd = table.constEnd();
for (it = table.constBegin(); it != itEnd; ++it) {
Table::writeFieldValue(s, ShortString, it.key());
Table::writeFieldValue(s, it.value());
}
if (data.isEmpty()) {
stream << qint32(0);
} else {
stream << data;
}
return stream;
}
QDataStream &operator>>(QDataStream &stream, Table &table)
{
QByteArray data;
stream >> data;
QDataStream tableStream(&data, QIODevice::ReadOnly);
while (!tableStream.atEnd()) {
qint8 octet = 0;
QString field = Frame::readAmqpField(tableStream, ShortString).toString();
tableStream >> octet;
table[field] = Table::readFieldValue(tableStream, valueTypeForOctet(octet));
}
return stream;
}

29
src/amqp_table.h Normal file
View File

@ -0,0 +1,29 @@
#ifndef amqp_table_h__
#define amqp_table_h__
#include "amqp_global.h"
#include <QVariantHash>
namespace QAMQP {
class QAMQP_EXPORT Table : public QVariantHash
{
public:
Table() {}
inline Table(const QVariantHash &variantHash)
: QVariantHash(variantHash)
{
}
static void writeFieldValue(QDataStream &stream, const QVariant &value);
static void writeFieldValue(QDataStream &stream, ValueType type, const QVariant &value);
static QVariant readFieldValue(QDataStream &stream, ValueType type);
};
} // namespace QAMQP
QAMQP_EXPORT QDataStream &operator<<(QDataStream &, const QAMQP::Table &table);
QAMQP_EXPORT QDataStream &operator>>(QDataStream &, QAMQP::Table &table);
Q_DECLARE_METATYPE(QAMQP::Table)
#endif // amqp_table_h__

View File

@ -25,7 +25,8 @@ INSTALL_HEADERS += \
amqp_frame.h \ amqp_frame.h \
amqp_global.h \ amqp_global.h \
amqp_message.h \ amqp_message.h \
amqp_queue.h amqp_queue.h \
amqp_table.h
HEADERS += \ HEADERS += \
$${INSTALL_HEADERS} \ $${INSTALL_HEADERS} \
@ -38,7 +39,8 @@ SOURCES += \
amqp_exchange.cpp \ amqp_exchange.cpp \
amqp_frame.cpp \ amqp_frame.cpp \
amqp_message.cpp \ amqp_message.cpp \
amqp_queue.cpp amqp_queue.cpp \
amqp_table.cpp
# install # install
headers.files = $${INSTALL_HEADERS} headers.files = $${INSTALL_HEADERS}

View File

@ -1,3 +1,5 @@
#include <float.h>
#include <QScopedPointer> #include <QScopedPointer>
#include <QtTest/QtTest> #include <QtTest/QtTest>
@ -42,6 +44,7 @@ private Q_SLOTS:
void invalidQos(); void invalidQos();
void qos(); void qos();
void invalidRoutingKey(); void invalidRoutingKey();
void tableFieldDataTypes();
private: private:
void declareQueueAndVerifyConsuming(Queue *queue); void declareQueueAndVerifyConsuming(Queue *queue);
@ -531,5 +534,84 @@ void tst_QAMQPQueue::invalidRoutingKey()
QCOMPARE(client->error(), QAMQP::FrameError); QCOMPARE(client->error(), QAMQP::FrameError);
} }
void tst_QAMQPQueue::tableFieldDataTypes()
{
Queue *queue = client->createQueue("test-table-field-data-types");
declareQueueAndVerifyConsuming(queue);
Frame::decimal decimal;
decimal.scale = 2;
decimal.value = 12345;
QVariant decimalVariant = QVariant::fromValue<Frame::decimal>(decimal);
Table nestedTable;
nestedTable.insert("boolean", true);
nestedTable.insert("long-int", qint32(-65536));
QVariantList array;
array.append(true);
array.append(qint32(-65536));
QDateTime timestamp = QDateTime::currentDateTime();
Table headers;
headers.insert("boolean", true);
headers.insert("short-short-int", qint8(-15));
headers.insert("short-short-uint", quint8(15));
headers.insert("short-int", qint16(-256));
headers.insert("short-uint", QVariant::fromValue(quint16(256)));
headers.insert("long-int", qint32(-65536));
headers.insert("long-uint", quint32(65536));
headers.insert("long-long-int", qint64(-2147483648));
headers.insert("long-long-uint", quint64(2147483648));
headers.insert("float", 230.7);
headers.insert("double", double(FLT_MAX));
headers.insert("decimal-value", decimalVariant);
headers.insert("short-string", QLatin1String("test"));
headers.insert("long-string", QLatin1String("test"));
headers.insert("timestamp", timestamp);
headers.insert("nested-table", nestedTable);
headers.insert("array", array);
headers.insert("bytes", QByteArray("abcdefg1234567"));
Exchange *defaultExchange = client->createExchange();
defaultExchange->publish("dummy", "test-table-field-data-types", "text.plain", headers);
QVERIFY(waitForSignal(queue, SIGNAL(messageReceived())));
Message message = queue->dequeue();
QCOMPARE(message.header("boolean").toBool(), true);
QCOMPARE(qint8(message.header("short-short-int").toInt()), qint8(-15));
QCOMPARE(quint8(message.header("short-short-uint").toUInt()), quint8(15));
QCOMPARE(qint16(message.header("short-int").toInt()), qint16(-256));
QCOMPARE(quint16(message.header("short-uint").toUInt()), quint16(256));
QCOMPARE(qint32(message.header("long-int").toInt()), qint32(-65536));
QCOMPARE(quint32(message.header("long-uint").toUInt()), quint32(65536));
QCOMPARE(qint64(message.header("long-long-int").toLongLong()), qint64(-2147483648));
QCOMPARE(quint64(message.header("long-long-uint").toLongLong()), quint64(2147483648));
QCOMPARE(message.header("float").toFloat(), float(230.7));
QCOMPARE(message.header("double").toDouble(), double(FLT_MAX));
QCOMPARE(message.header("short-string").toString(), QLatin1String("test"));
QCOMPARE(message.header("long-string").toString(), QLatin1String("test"));
QCOMPARE(message.header("timestamp").toDateTime(), timestamp);
QCOMPARE(message.header("bytes").toByteArray(), QByteArray("abcdefg1234567"));
QVERIFY(message.hasHeader("nested-table"));
Table compareTable(message.header("nested-table").toHash());
foreach (QString key, nestedTable.keys()) {
QVERIFY(compareTable.contains(key));
QCOMPARE(nestedTable.value(key), compareTable.value(key));
}
QVERIFY(message.hasHeader("array"));
QVariantList compareArray = message.header("array").toList();
QCOMPARE(array, compareArray);
Frame::decimal receivedDecimal = message.header("decimal-value").value<Frame::decimal>();
QCOMPARE(receivedDecimal.scale, qint8(2));
QCOMPARE(receivedDecimal.value, quint32(12345));
}
QTEST_MAIN(tst_QAMQPQueue) QTEST_MAIN(tst_QAMQPQueue)
#include "tst_qamqpqueue.moc" #include "tst_qamqpqueue.moc"