This commit is contained in:
fuCtor 2012-02-02 05:13:16 -08:00
parent a4b412839c
commit 1945970929
7 changed files with 131 additions and 41 deletions

View File

@ -58,14 +58,15 @@ void ConnectionPrivate::startOk()
clientProperties["version"] = "0.0.1"; clientProperties["version"] = "0.0.1";
clientProperties["platform"] = QString("Qt %1").arg(qVersion()); clientProperties["platform"] = QString("Qt %1").arg(qVersion());
clientProperties["product"] = "QAMQP"; clientProperties["product"] = "QAMQP";
clientProperties["site"] = "http://vmp.ru";
QAMQP::Frame::serialize(stream, clientProperties); QAMQP::Frame::serialize(stream, clientProperties);
QAMQP::Frame::writeField('s', stream, "AMQPLAIN"); QAMQP::Frame::writeField(QAMQP::Frame::fkShortString, stream, "AMQPLAIN");
QAMQP::Frame::TableField response; QAMQP::Frame::TableField response;
response["LOGIN"] = client_->user(); response["LOGIN"] = client_->user();
response["PASSWORD"] = client_->password(); response["PASSWORD"] = client_->password();
QAMQP::Frame::serialize(stream, response); QAMQP::Frame::serialize(stream, response);
QAMQP::Frame::writeField('s', stream, "en_US"); QAMQP::Frame::writeField(QAMQP::Frame::fkShortString, stream, "en_US");
frame.setArguments(arguments_); frame.setArguments(arguments_);
@ -99,7 +100,7 @@ void ConnectionPrivate::open()
QByteArray arguments_; QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly); QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::writeField('s',stream, client_->virtualHost()); QAMQP::Frame::writeField(QAMQP::Frame::fkShortString,stream, client_->virtualHost());
stream << qint8(0); stream << qint8(0);
stream << qint8(0); stream << qint8(0);
@ -122,8 +123,8 @@ void ConnectionPrivate::start( const QAMQP::Frame::Method & frame )
QAMQP::Frame::TableField table; QAMQP::Frame::TableField table;
QAMQP::Frame::deserialize(stream, table); QAMQP::Frame::deserialize(stream, table);
QString mechanisms = QAMQP::Frame::readField('S', stream).toString(); QString mechanisms = QAMQP::Frame::readField(QAMQP::Frame::fkLongString, stream).toString();
QString locales = QAMQP::Frame::readField('S', stream).toString(); QString locales = QAMQP::Frame::readField(QAMQP::Frame::fkLongString, stream).toString();
qDebug(">> version_major: %d", version_major); qDebug(">> version_major: %d", version_major);
qDebug(">> version_minor: %d", version_minor); qDebug(">> version_minor: %d", version_minor);
@ -175,7 +176,7 @@ void ConnectionPrivate::close( const QAMQP::Frame::Method & frame )
QDataStream stream(&data, QIODevice::ReadOnly); QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code_ = 0, classId, methodId; qint16 code_ = 0, classId, methodId;
stream >> code_; stream >> code_;
QString text(QAMQP::Frame::readField('s', stream).toString()); QString text(QAMQP::Frame::readField(QAMQP::Frame::fkShortString, stream).toString());
stream >> classId; stream >> classId;
stream >> methodId; stream >> methodId;
@ -191,10 +192,10 @@ void ConnectionPrivate::close(int code, const QString & text, int classId, int m
QByteArray arguments_; QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly); QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::writeField('s',stream, client_->virtualHost()); QAMQP::Frame::writeField(QAMQP::Frame::fkShortString,stream, client_->virtualHost());
stream << qint16(code); stream << qint16(code);
QAMQP::Frame::writeField('s', stream, text); QAMQP::Frame::writeField(QAMQP::Frame::fkShortString, stream, text);
stream << qint16(classId); stream << qint16(classId);
stream << qint16(methodId); stream << qint16(methodId);

View File

@ -148,7 +148,7 @@ void QAMQP::Frame::Method::writePayload( QDataStream & stream ) const
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s ) QVariant QAMQP::Frame::readField( FieldValueKind valueType, QDataStream &s )
{ {
QVariant value; QVariant value;
QByteArray tmp; QByteArray tmp;
@ -232,7 +232,7 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s )
for (int i =0; i < length_; ++i) for (int i =0; i < length_; ++i)
{ {
s >> type; s >> type;
array_ << readField(type, s); array_ << readField(FieldValueKind(type), s);
} }
value = array_; value = array_;
} }
@ -266,9 +266,9 @@ QDataStream & QAMQP::Frame::deserialize( QDataStream & stream, QAMQP::Frame::Tab
{ {
qint8 valueType = 0; qint8 valueType = 0;
QString name = readField('s', s).toString(); QString name = readField(fkShortString, s).toString();
s >> valueType; s >> valueType;
f[name] = readField(valueType, s); f[name] = readField(FieldValueKind(valueType), s);
} }
return stream; return stream;
@ -281,7 +281,7 @@ QDataStream & QAMQP::Frame::serialize( QDataStream & stream, const TableField &
TableField::ConstIterator i; TableField::ConstIterator i;
for(i = f.begin(); i != f.end(); ++i) for(i = f.begin(); i != f.end(); ++i)
{ {
writeField('s', s, i.key()); writeField(fkShortString, s, i.key());
writeField(s, i.value()); writeField(s, i.value());
} }
stream << data; stream << data;
@ -307,7 +307,7 @@ void QAMQP::Frame::print( const TableField & f )
} }
} }
void QAMQP::Frame::writeField( qint8 valueType, QDataStream &s, const QVariant & value, bool withType ) void QAMQP::Frame::writeField( FieldValueKind valueType, QDataStream &s, const QVariant & value, bool withType )
{ {
QByteArray tmp; QByteArray tmp;
qint8 nameSize_; qint8 nameSize_;
@ -457,5 +457,5 @@ void QAMQP::Frame::writeField( QDataStream &s, const QVariant & value )
} }
if(type) if(type)
writeField(type, s, value, true); writeField(FieldValueKind(type), s, value, true);
} }

View File

@ -4,6 +4,7 @@
#include <QDataStream> #include <QDataStream>
#include <QHash> #include <QHash>
#include <QVariant> #include <QVariant>
#include <QSharedPointer>
namespace QAMQP namespace QAMQP
{ {
@ -27,6 +28,28 @@ namespace QAMQP
fcTx = 90, fcTx = 90,
}; };
enum FieldValueKind {
fkBoolean = 't',
fkI8 = 'b',
fkU8 = 'B',
fkI16 = 'U',
fkU16 = 'u',
fkI32 = 'I',
fkU32 = 'i',
fkI64 = 'l',
fkU64 = 'L',
fkFloat = 'f',
fkDouble = 'd',
fkDecimal = 'D',
fkLongString = 'S',
fkShortString = 's',
fkArray = 'A',
fkTimestamp = 'T',
fkTable = 'F',
fkVoid = 'V',
fkBytes = 'x'
};
struct decimal struct decimal
{ {
qint8 scale; qint8 scale;
@ -40,9 +63,9 @@ namespace QAMQP
QDataStream & serialize( QDataStream & stream, const QAMQP::Frame::TableField & f ); QDataStream & serialize( QDataStream & stream, const QAMQP::Frame::TableField & f );
QDataStream & deserialize( QDataStream & stream, QAMQP::Frame::TableField & f ); QDataStream & deserialize( QDataStream & stream, QAMQP::Frame::TableField & f );
QVariant readField( qint8 valueType, QDataStream &s ); QVariant readField( FieldValueKind valueType, QDataStream &s );
void writeField( QDataStream &s, const QVariant & value ); void writeField( QDataStream &s, const QVariant & value );
void writeField( qint8 valueType, QDataStream &s, const QVariant & value, bool withType = false ); void writeField( FieldValueKind valueType, QDataStream &s, const QVariant & value, bool withType = false );
void print( const QAMQP::Frame::TableField & f ); void print( const QAMQP::Frame::TableField & f );
class Base class Base
@ -71,6 +94,8 @@ namespace QAMQP
}; };
typedef QSharedPointer<QAMQP::Frame::Base> BasePtr;
class Method : public Base class Method : public Base
{ {
public: public:

View File

@ -101,8 +101,8 @@ void QAMQP::Network::readyRead()
} }
} }
void QAMQP::Network::sendFrame( const QAMQP::Frame::Base & frame ) void QAMQP::Network::sendFrame( const QAMQP::Frame::BasePtr &frame )
{ {
QDataStream stream(socket_); QDataStream stream(socket_);
frame.toStream(stream); frame->toStream(stream);
} }

View File

@ -5,6 +5,7 @@
#include <QTcpSocket> #include <QTcpSocket>
#include <QPointer> #include <QPointer>
#include <QBuffer> #include <QBuffer>
#include <QQueue>
#include "amqp_frame.h" #include "amqp_frame.h"
@ -22,7 +23,7 @@ namespace QAMQP
void disconnect(); void disconnect();
void sendFrame(); void sendFrame();
void sendFrame(const QAMQP::Frame::Base & frame); void sendFrame(const QAMQP::Frame::BasePtr &frame);
signals: signals:
void method(const QAMQP::Frame::Method & method); void method(const QAMQP::Frame::Method & method);
@ -36,6 +37,7 @@ namespace QAMQP
private: private:
QPointer<QTcpSocket> socket_; QPointer<QTcpSocket> socket_;
QPointer<QBuffer> buffer_; QPointer<QBuffer> buffer_;
QQueue<QAMQP::Frame::BasePtr> outFrames_;
int offsetBuf; int offsetBuf;
int leftSize; int leftSize;
qint8 lastType_; qint8 lastType_;

51
src/amqp_private.cpp Normal file
View File

@ -0,0 +1,51 @@
#include "amqp_private.h"
QAMQP::Base::Base( int version /*= QObjectPrivateVersion*/ )
{
outArguments_ = QSharedPointer<QBuffer>(new QBuffer());
outArguments_->open(QIODevice::WriteOnly);
}
QAMQP::Base::~Base()
{
outArguments_.clear();
inFrame_.clear();
outFrame_.clear();
}
void QAMQP::Base::startMethod( int id )
{
outFrame_ = QAMQP::Frame::BasePtr(new QAMQP::Frame::Method(methodClass(), id));
streamOut_.setDevice(outArguments_.data());
}
void QAMQP::Base::writeArgument( QAMQP::Frame::FieldValueKind type, QVariant value )
{
}
void QAMQP::Base::writeArgument( QVariant value )
{
}
void QAMQP::Base::endWrite()
{
}
void QAMQP::Base::startRead( QAMQP::Frame::BasePtr frame )
{
}
QVariant QAMQP::Base::readArgument( QAMQP::Frame::FieldValueKind type )
{
}
void QAMQP::Base::endRead()
{
}

View File

@ -1,27 +1,38 @@
#ifndef amqp_private_h__
#define amqp_private_h__
#include <QtCore/private/qobject_p.h>
#include <QDataStream>
#include <QBuffer> #include <QBuffer>
#include "amqp_frame.h"
namespace QAMQP namespace QAMQP
{ {
class Base: public QObjectPrivate
{
public:
Base(int version = QObjectPrivateVersion);
~Base();
void init(QAMQP::Client * client);
virtual QAMQP::Frame::MethodClass methodClass() const = 0;
void startMethod(int id);
void writeArgument(QAMQP::Frame::FieldValueKind type, QVariant value);
void writeArgument(QVariant value);
void endWrite();
enum amqp_field_value_kind_t { void startRead(QAMQP::Frame::BasePtr frame);
AMQP_FIELD_KIND_BOOLEAN = 't', QVariant readArgument(QAMQP::Frame::FieldValueKind type);
AMQP_FIELD_KIND_I8 = 'b', void endRead();
AMQP_FIELD_KIND_U8 = 'B',
AMQP_FIELD_KIND_I16 = 's',
AMQP_FIELD_KIND_U16 = 'u',
AMQP_FIELD_KIND_I32 = 'I',
AMQP_FIELD_KIND_U32 = 'i',
AMQP_FIELD_KIND_I64 = 'l',
AMQP_FIELD_KIND_U64 = 'L',
AMQP_FIELD_KIND_F32 = 'f',
AMQP_FIELD_KIND_F64 = 'd',
AMQP_FIELD_KIND_DECIMAL = 'D',
AMQP_FIELD_KIND_UTF8 = 'S',
AMQP_FIELD_KIND_ARRAY = 'A',
AMQP_FIELD_KIND_TIMESTAMP = 'T',
AMQP_FIELD_KIND_TABLE = 'F',
AMQP_FIELD_KIND_VOID = 'V',
AMQP_FIELD_KIND_BYTES = 'x'
} ;
} private:
QAMQP::Frame::BasePtr inFrame_;
QAMQP::Frame::BasePtr outFrame_;
QDataStream streamIn_;
QDataStream streamOut_;
QSharedPointer<QBuffer> outArguments_;
QPointer<Client> client_;
};
}
#endif // amqp_private_h__