This commit is contained in:
Alexey Shcherbakov 2013-09-08 11:16:10 +06:00
commit 5c4db2233c
15 changed files with 188 additions and 104 deletions

View File

@ -73,7 +73,7 @@ Usage
queue_ = client_->createQueue("test.my_queue", exchange_->channelNumber()); queue_ = client_->createQueue("test.my_queue", exchange_->channelNumber());
connect(queue_, SIGNAL(declared()), this, SLOT(declared())); connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
connect(queue_, SIGNAL(messageRecieved()), this, SLOT(newMessage())); connect(queue_, SIGNAL(messageReceived(QAMQP::Queue * )), this, SLOT(newMessage(QAMQP::Queue *)));
} }
@ -92,9 +92,8 @@ Usage
queue_->consume(QAMQP::Queue::coNoAck); queue_->consume(QAMQP::Queue::coNoAck);
} }
void Test::newMessage() void Test::newMessage(QAMQP::Queue * q)
{ {
QAMQP::Queue * q = qobject_cast<QAMQP::Queue *>(sender());
while (q->hasMessage()) while (q->hasMessage())
{ {
QAMQP::MessagePtr message = q->getMessage(); QAMQP::MessagePtr message = q->getMessage();

View File

@ -49,7 +49,8 @@ ClientPrivate::~ClientPrivate()
void ClientPrivate::init(QObject * parent) void ClientPrivate::init(QObject * parent)
{ {
pq_func()->setParent(parent); pq_func()->setParent(parent);
if(!network_){ if(!network_)
{
network_ = new QAMQP::Network(pq_func()); network_ = new QAMQP::Network(pq_func());
} }
@ -58,10 +59,9 @@ void ClientPrivate::init(QObject * parent)
connection_ = new QAMQP::Connection(pq_func()); connection_ = new QAMQP::Connection(pq_func());
} }
setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD))); network_->setMethodHandlerConnection(connection_);
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)), setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD)));
connection_, SLOT(_q_method(const QAMQP::Frame::Method &)));
QObject::connect(connection_, SIGNAL(connected()), pq_func(), SIGNAL(connected())); QObject::connect(connection_, SIGNAL(connected()), pq_func(), SIGNAL(connected()));
QObject::connect(connection_, SIGNAL(disconnected()), pq_func(), SIGNAL(disconnected())); QObject::connect(connection_, SIGNAL(disconnected()), pq_func(), SIGNAL(disconnected()));
@ -134,32 +134,30 @@ void ClientPrivate::login()
Exchange * ClientPrivate::createExchange(int channelNumber, const QString &name ) Exchange * ClientPrivate::createExchange(int channelNumber, const QString &name )
{ {
Exchange * exchange_ = new Exchange(channelNumber, pq_func()); Exchange * exchange_ = new Exchange(channelNumber, pq_func());
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
exchange_, SLOT(_q_method(const QAMQP::Frame::Method &))); network_->addMethodHandlerForChannel(exchange_->channelNumber(), exchange_);
QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open())); QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open()));
exchange_->pd_func()->open(); exchange_->pd_func()->open();
QObject::connect(pq_func(), SIGNAL(disconnected()), exchange_, SLOT(_q_disconnected())); QObject::connect(pq_func(), SIGNAL(disconnected()), exchange_, SLOT(_q_disconnected()));
exchange_->setName(name); exchange_->setName(name);
return exchange_; return exchange_;
} }
Queue * ClientPrivate::createQueue(int channelNumber, const QString &name ) Queue * ClientPrivate::createQueue(int channelNumber, const QString &name )
{ {
Queue * queue_ = new Queue(channelNumber, pq_func()); Queue * queue_ = new Queue(channelNumber, pq_func());
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
queue_, SLOT(_q_method(const QAMQP::Frame::Method &)));
QObject::connect(network_, SIGNAL(content(const QAMQP::Frame::Content &)), network_->addMethodHandlerForChannel(queue_->channelNumber(), queue_);
queue_, SLOT(_q_content(const QAMQP::Frame::Content &))); network_->addContentHandlerForChannel(queue_->channelNumber(), queue_);
network_->addContentBodyHandlerForChannel(queue_->channelNumber(), queue_);
QObject::connect(network_, SIGNAL(body(int, const QByteArray &)),
queue_, SLOT(_q_body(int, const QByteArray &)));
QObject::connect(connection_, SIGNAL(connected()), queue_, SLOT(_q_open())); QObject::connect(connection_, SIGNAL(connected()), queue_, SLOT(_q_open()));
queue_->pd_func()->open(); queue_->pd_func()->open();
QObject::connect(pq_func(), SIGNAL(disconnected()), queue_, SLOT(_q_disconnected())); QObject::connect(pq_func(), SIGNAL(disconnected()), queue_, SLOT(_q_disconnected()));
queue_->setName(name); queue_->setName(name);
return queue_; return queue_;
} }

View File

@ -109,6 +109,11 @@ void QAMQP::Channel::stateChanged( int state )
} }
} }
void QAMQP::Channel::_q_method(const Frame::Method &frame)
{
pd_func()->_q_method(frame);
}
bool QAMQP::Channel::isOpened() const bool QAMQP::Channel::isOpened() const
{ {
return pd_func()->opened; return pd_func()->opened;
@ -156,6 +161,7 @@ void ChannelPrivate::init(int channelNumber, Client * parent)
bool ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame ) bool ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
{ {
Q_ASSERT(frame.channel() == number);
if(frame.channel() != number ) if(frame.channel() != number )
return true; return true;

View File

@ -9,7 +9,7 @@ namespace QAMQP
{ {
class ChannelPrivate; class ChannelPrivate;
class Client; class Client;
class Channel : public QObject class Channel : public QObject, public Frame::MethodHandler
{ {
Q_OBJECT Q_OBJECT
@ -47,8 +47,9 @@ namespace QAMQP
private: private:
void stateChanged(int state); void stateChanged(int state);
friend class ClientPrivate; friend class ClientPrivate;
void _q_method(const QAMQP::Frame::Method & frame);
Q_PRIVATE_SLOT(pd_func(), void _q_open()) Q_PRIVATE_SLOT(pd_func(), void _q_open())
Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame))
Q_PRIVATE_SLOT(pd_func(), void _q_disconnected()) Q_PRIVATE_SLOT(pd_func(), void _q_disconnected())
}; };
} }

View File

@ -260,6 +260,7 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int
bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame ) bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
{ {
Q_ASSERT(frame.methodClass() == QAMQP::Frame::fcConnection);
if(frame.methodClass() != QAMQP::Frame::fcConnection) if(frame.methodClass() != QAMQP::Frame::fcConnection)
return true; return true;
@ -364,6 +365,11 @@ void Connection::openOk()
emit connected(); emit connected();
} }
void Connection::_q_method(const QAMQP::Frame::Method & frame)
{
pd_func()->_q_method(frame);
}
bool Connection::isConnected() const bool Connection::isConnected() const
{ {
return pd_func()->connected; return pd_func()->connected;

View File

@ -11,7 +11,7 @@ namespace QAMQP
class ChannelPrivate; class ChannelPrivate;
class ClientPrivate; class ClientPrivate;
class Client; class Client;
class Connection : public QObject class Connection : public QObject, public Frame::MethodHandler
{ {
Q_OBJECT Q_OBJECT
P_DECLARE_PRIVATE(QAMQP::Connection) P_DECLARE_PRIVATE(QAMQP::Connection)
@ -45,8 +45,9 @@ namespace QAMQP
void openOk(); void openOk();
friend class ClientPrivate; friend class ClientPrivate;
friend class ChannelPrivate; friend class ChannelPrivate;
Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame))
Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat()) void _q_method(const QAMQP::Frame::Method & frame);
Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat());
}; };
} }

View File

@ -1,5 +1,4 @@
#include "amqp_frame.h" #include "amqp_frame.h"
#define AMQP_FRAME_END 0xCE
#include <QDateTime> #include <QDateTime>
#include <QList> #include <QList>
@ -47,7 +46,7 @@ void QAMQP::Frame::Base::writeHeader( QDataStream & stream ) const
void QAMQP::Frame::Base::writeEnd( QDataStream & stream ) const void QAMQP::Frame::Base::writeEnd( QDataStream & stream ) const
{ {
stream << qint8(AMQP_FRAME_END); stream << qint8(FRAME_END);
} }
void QAMQP::Frame::Base::writePayload( QDataStream & ) const{} void QAMQP::Frame::Base::writePayload( QDataStream & ) const{}
@ -64,7 +63,7 @@ void QAMQP::Frame::Base::readEnd( QDataStream & stream )
{ {
unsigned char end_ = 0; unsigned char end_ = 0;
stream.readRawData(reinterpret_cast<char*>(&end_), sizeof(end_)); stream.readRawData(reinterpret_cast<char*>(&end_), sizeof(end_));
if(end_ != AMQP_FRAME_END ) if(end_ != FRAME_END )
{ {
qWarning("Wrong end of frame"); qWarning("Wrong end of frame");
} }
@ -248,13 +247,14 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s )
{ {
quint32 length_ = 0; quint32 length_ = 0;
s >> length_; s >> length_;
nameSize_ = length_;
tmp.resize(length_); tmp.resize(length_);
} }
s.readRawData(tmp.data(), tmp.size()); s.readRawData(tmp.data(), tmp.size());
#if QT_VERSION < 0x050000 #if QT_VERSION < 0x050000
value = QString::fromAscii(tmp.data(), nameSize_); value = QString::fromAscii(tmp.data(), tmp.size());
#else // For Qt5 #else // For Qt5
value = QString::fromLatin1(tmp.data(), nameSize_); value = QString::fromLatin1(tmp.data(), tmp.size());
#endif #endif
break; break;
case 'A': case 'A':
@ -711,6 +711,6 @@ qint32 QAMQP::Frame::ContentBody::size() const
QAMQP::Frame::Heartbeat::Heartbeat() : Base(ftHeartbeat) {} QAMQP::Frame::Heartbeat::Heartbeat() : Base(ftHeartbeat) {}
void QAMQP::Frame::Heartbeat::readPayload(QDataStream & stream) {} void QAMQP::Frame::Heartbeat::readPayload(QDataStream & ) {}
void QAMQP::Frame::Heartbeat::writePayload(QDataStream & stream) const {} void QAMQP::Frame::Heartbeat::writePayload(QDataStream & ) const {}

View File

@ -33,6 +33,21 @@ namespace QAMQP
*/ */
namespace Frame namespace Frame
{ {
typedef quint16 channel_t;
/*!
@brief Header size in bytes
*/
static const qint64 HEADER_SIZE = 7;
/*!
@brief Frame end indicator size in bytes
*/
static const qint64 FRAME_END_SIZE = 1;
/*!
@brief Frame end marker
*/
static const quint8 FRAME_END = 0xCE;
/*! /*!
@brief Frame type @brief Frame type
*/ */
@ -379,6 +394,24 @@ namespace QAMQP
void writePayload(QDataStream & stream) const; void writePayload(QDataStream & stream) const;
void readPayload(QDataStream & stream); void readPayload(QDataStream & stream);
}; };
class MethodHandler
{
public:
virtual void _q_method(const QAMQP::Frame::Method & frame) = 0;
};
class ContentHandler
{
public:
virtual void _q_content(const QAMQP::Frame::Content & frame) = 0;
};
class ContentBodyHandler
{
public:
virtual void _q_body(const QAMQP::Frame::ContentBody & frame) = 0;
};
} }
} }

View File

@ -13,7 +13,7 @@
#define AMQPPSWD "guest" #define AMQPPSWD "guest"
#define FRAME_MAX 131072 #define FRAME_MAX 131072
#define QAMQP_VERSION "0.1.2" #define QAMQP_VERSION "0.2.0"

View File

@ -1,20 +1,16 @@
#include "amqp_network.h" #include "amqp_network.h"
#include <QDebug> #include <QDebug>
#include <QTimer> #include <QTimer>
#include <QtEndian>
QAMQP::Network::Network( QObject * parent /*= 0*/ ):QObject(parent) QAMQP::Network::Network( QObject * parent /*= 0*/ ):QObject(parent)
{ {
qRegisterMetaType<QAMQP::Frame::Method>("QAMQP::Frame::Method"); qRegisterMetaType<QAMQP::Frame::Method>("QAMQP::Frame::Method");
buffer_.reserve(Frame::HEADER_SIZE);
buffer_ = new QBuffer(this);
offsetBuf = 0;
leftSize = 0;
timeOut_ = 1000; timeOut_ = 1000;
connect_ = false; connect_ = false;
buffer_->open(QIODevice::ReadWrite);
initSocket(false); initSocket(false);
} }
@ -97,54 +93,59 @@ void QAMQP::Network::error( QAbstractSocket::SocketError socketError )
void QAMQP::Network::readyRead() void QAMQP::Network::readyRead()
{ {
QDataStream streamA(socket_); while(socket_->bytesAvailable() >= Frame::HEADER_SIZE)
QDataStream streamB(buffer_);
while(!socket_->atEnd())
{ {
if(leftSize == 0) char* headerData = buffer_.data();
socket_->peek(headerData, Frame::HEADER_SIZE);
const quint32 payloadSize = qFromBigEndian<quint32>(*(quint32*)&headerData[3]);
const qint64 readSize = Frame::HEADER_SIZE+payloadSize+Frame::FRAME_END_SIZE;
if(socket_->bytesAvailable() >= readSize)
{ {
lastType_ = 0; buffer_.resize(readSize);
qint16 channel_ = 0; socket_->read(buffer_.data(), readSize);
leftSize = 0; const char* bufferData = buffer_.constData();
offsetBuf = 0; const quint8 type = *(quint8*)&bufferData[0];
const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE+payloadSize];
streamA >> lastType_; if(magic != QAMQP::Frame::FRAME_END)
streamB << lastType_; {
streamA >> channel_; qWarning() << "Wrong end frame";
streamB << channel_;
streamA >> leftSize;
streamB << leftSize;
leftSize++;
} }
QByteArray data_; QDataStream streamB(&buffer_, QIODevice::ReadOnly);
data_.resize(leftSize); switch(QAMQP::Frame::Type(type))
offsetBuf = streamA.readRawData(data_.data(), data_.size());
leftSize -= offsetBuf;
streamB.writeRawData(data_.data(), offsetBuf);
if(leftSize == 0)
{
buffer_->reset();
switch(QAMQP::Frame::Type(lastType_))
{ {
case QAMQP::Frame::ftMethod: case QAMQP::Frame::ftMethod:
{ {
QAMQP::Frame::Method frame(streamB); QAMQP::Frame::Method frame(streamB);
emit method(frame); if(frame.methodClass() == QAMQP::Frame::fcConnection)
{
m_pMethodHandlerConnection->_q_method(frame);
}
else
{
foreach(Frame::MethodHandler* pMethodHandler, m_methodHandlersByChannel[frame.channel()])
{
pMethodHandler->_q_method(frame);
}
}
} }
break; break;
case QAMQP::Frame::ftHeader: case QAMQP::Frame::ftHeader:
{ {
QAMQP::Frame::Content frame(streamB); QAMQP::Frame::Content frame(streamB);
emit content(frame); foreach(Frame::ContentHandler* pMethodHandler, m_contentHandlerByChannel[frame.channel()])
{
pMethodHandler->_q_content(frame);
}
} }
break; break;
case QAMQP::Frame::ftBody: case QAMQP::Frame::ftBody:
{ {
QAMQP::Frame::ContentBody frame(streamB); QAMQP::Frame::ContentBody frame(streamB);
emit body(frame.channel(), frame.body()); foreach(Frame::ContentBodyHandler* pMethodHandler, m_bodyHandlersByChannel[frame.channel()])
{
pMethodHandler->_q_body(frame);
}
} }
break; break;
case QAMQP::Frame::ftHeartbeat: case QAMQP::Frame::ftHeartbeat:
@ -153,9 +154,12 @@ void QAMQP::Network::readyRead()
} }
break; break;
default: default:
qWarning("AMQP: Unknown frame type"); qWarning() << "AMQP: Unknown frame type: " << type;
} }
buffer_->reset(); }
else
{
break;
} }
} }
} }
@ -194,8 +198,7 @@ void QAMQP::Network::initSocket( bool ssl /*= false*/ )
socket_ = new QSslSocket(this); socket_ = new QSslSocket(this);
QSslSocket * ssl_= static_cast<QSslSocket*> (socket_.data()); QSslSocket * ssl_= static_cast<QSslSocket*> (socket_.data());
ssl_->setProtocol(QSsl::AnyProtocol); ssl_->setProtocol(QSsl::AnyProtocol);
connect(socket_, SIGNAL(sslErrors(const QList<QSslError> &)), connect(socket_, SIGNAL(sslErrors(const QList<QSslError> &)), this, SLOT(sslErrors()));
this, SLOT(sslErrors()));
connect(socket_, SIGNAL(connected()), this, SLOT(conectionReady())); connect(socket_, SIGNAL(connected()), this, SLOT(conectionReady()));
#else #else
@ -251,3 +254,23 @@ QAbstractSocket::SocketState QAMQP::Network::state() const
} }
} }
void QAMQP::Network::setMethodHandlerConnection(Frame::MethodHandler* pMethodHandlerConnection)
{
m_pMethodHandlerConnection = pMethodHandlerConnection;
}
void QAMQP::Network::addMethodHandlerForChannel(Channel channel, Frame::MethodHandler* pHandler)
{
m_methodHandlersByChannel[channel].append(pHandler);
}
void QAMQP::Network::addContentHandlerForChannel(Channel channel, Frame::ContentHandler* pHandler)
{
m_contentHandlerByChannel[channel].append(pHandler);
}
void QAMQP::Network::addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler* pHandler)
{
m_bodyHandlersByChannel[channel].append(pHandler);
}

View File

@ -18,6 +18,8 @@ namespace QAMQP
Q_OBJECT Q_OBJECT
Q_DISABLE_COPY(Network) Q_DISABLE_COPY(Network)
public: public:
typedef qint16 Channel;
Network(QObject * parent = 0); Network(QObject * parent = 0);
~Network(); ~Network();
@ -34,6 +36,11 @@ namespace QAMQP
QAbstractSocket::SocketState state() const; QAbstractSocket::SocketState state() const;
void setMethodHandlerConnection(Frame::MethodHandler* pMethodHandlerConnection);
void addMethodHandlerForChannel(Channel channel, Frame::MethodHandler* pHandler);
void addContentHandlerForChannel(Channel channel, Frame::ContentHandler* pHandler);
void addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler* pHandler);
public slots: public slots:
void connectTo(const QString & host = QString(), quint32 port = 0); void connectTo(const QString & host = QString(), quint32 port = 0);
void error( QAbstractSocket::SocketError socketError ); void error( QAbstractSocket::SocketError socketError );
@ -41,9 +48,6 @@ namespace QAMQP
signals: signals:
void connected(); void connected();
void disconnected(); void disconnected();
void method(const QAMQP::Frame::Method & method);
void content(const QAMQP::Frame::Content & content);
void body(int channeNumber, const QByteArray & body);
private slots: private slots:
void readyRead(); void readyRead();
@ -56,16 +60,18 @@ namespace QAMQP
private: private:
void initSocket(bool ssl = false); void initSocket(bool ssl = false);
QPointer<QTcpSocket> socket_; QPointer<QTcpSocket> socket_;
QPointer<QBuffer> buffer_; QByteArray buffer_;
QString lastHost_; QString lastHost_;
int lastPort_; int lastPort_;
int offsetBuf;
int leftSize;
qint8 lastType_;
bool autoReconnect_; bool autoReconnect_;
int timeOut_; int timeOut_;
bool connect_; bool connect_;
Frame::MethodHandler* m_pMethodHandlerConnection;
QHash<Channel, QList<Frame::MethodHandler*> > m_methodHandlersByChannel;
QHash<Channel, QList<Frame::ContentHandler*> > m_contentHandlerByChannel;
QHash<Channel, QList<Frame::ContentBodyHandler*> > m_bodyHandlersByChannel;
}; };
} }
#endif // amqp_network_h__ #endif // amqp_network_h__

View File

@ -54,10 +54,10 @@ void Queue::onOpen()
} }
if(!d->delayedBindings.isEmpty()) if(!d->delayedBindings.isEmpty())
{ {
QMap<QString, QString>::iterator i; typedef QPair<QString, QString> BindingPair;
for(i = d->delayedBindings.begin(); i!= d->delayedBindings.end(); ++i ) foreach(BindingPair binding, d->delayedBindings)
{ {
d->bind(i.value(), i.key()); d->bind(binding.first, binding.second);
} }
d->delayedBindings.clear(); d->delayedBindings.clear();
} }
@ -129,6 +129,15 @@ void Queue::unbind( Exchange * exchange, const QString & key )
pd_func()->unbind(exchange->name(), key); pd_func()->unbind(exchange->name(), key);
} }
void Queue::_q_content(const Content &frame)
{
pd_func()->_q_content(frame);
}
void Queue::_q_body(const ContentBody &frame)
{
pd_func()->_q_body(frame);
}
QAMQP::MessagePtr Queue::getMessage() QAMQP::MessagePtr Queue::getMessage()
{ {
@ -365,7 +374,7 @@ void QueuePrivate::bind( const QString & exchangeName, const QString & key )
{ {
if(!opened) if(!opened)
{ {
delayedBindings[exchangeName] = key; delayedBindings.append(QPair<QString,QString>(exchangeName, key));
return; return;
} }
@ -530,6 +539,7 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame ) void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
{ {
Q_ASSERT(frame.channel() == number);
if(frame.channel() != number) if(frame.channel() != number)
return; return;
if(messages_.isEmpty()) if(messages_.isEmpty())
@ -537,7 +547,7 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
qErrnoWarning("Received content-header without method frame before"); qErrnoWarning("Received content-header without method frame before");
return; return;
} }
MessagePtr &message = messages_.head(); MessagePtr &message = messages_.last();
message->leftSize = frame.bodySize(); message->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator i; QHash<int, QVariant>::ConstIterator i;
for (i = frame.properties_.begin(); i != frame.properties_.end(); ++i) for (i = frame.properties_.begin(); i != frame.properties_.end(); ++i)
@ -546,9 +556,10 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
} }
} }
void QueuePrivate::_q_body( int channeNumber, const QByteArray & body ) void QueuePrivate::_q_body(const QAMQP::Frame::ContentBody & frame)
{ {
if(channeNumber!= number) Q_ASSERT(frame.channel() == number);
if(frame.channel() != number)
return; return;
if(messages_.isEmpty()) if(messages_.isEmpty())
@ -556,12 +567,12 @@ void QueuePrivate::_q_body( int channeNumber, const QByteArray & body )
qErrnoWarning("Received content-body without method frame before"); qErrnoWarning("Received content-body without method frame before");
return; return;
} }
MessagePtr &message = messages_.head(); MessagePtr &message = messages_.last();
message->payload.append(body); message->payload.append(frame.body());
message->leftSize -= body.size(); message->leftSize -= frame.body().size();
if(message->leftSize == 0 && messages_.size() == 1) if(message->leftSize == 0 && messages_.size() == 1)
{ {
QMetaObject::invokeMethod(pq_func(), "messageReceived"); emit pq_func()->messageReceived(pq_func());
} }
} }

View File

@ -10,7 +10,7 @@ namespace QAMQP
class ClientPrivate; class ClientPrivate;
class Exchange; class Exchange;
class QueuePrivate; class QueuePrivate;
class Queue : public Channel class Queue : public Channel, public Frame::ContentHandler, public Frame::ContentBodyHandler
{ {
Q_OBJECT Q_OBJECT
Queue(int channelNumber = -1, Client * parent = 0); Queue(int channelNumber = -1, Client * parent = 0);
@ -77,12 +77,12 @@ namespace QAMQP
void declared(); void declared();
void binded(bool); void binded(bool);
void removed(); void removed();
void messageReceived(); void messageReceived(QAMQP::Queue* pQueue);
void empty(); void empty();
private: private:
Q_PRIVATE_SLOT(pd_func(), void _q_content(const QAMQP::Frame::Content & frame)) void _q_content(const QAMQP::Frame::Content & frame);
Q_PRIVATE_SLOT(pd_func(), void _q_body(int channeNumber, const QByteArray & body)) void _q_body(const QAMQP::Frame::ContentBody & frame);
}; };
} }
#ifdef QAMQP_P_INCLUDE #ifdef QAMQP_P_INCLUDE

View File

@ -58,13 +58,13 @@ namespace QAMQP
bool noAck; bool noAck;
QString consumerTag; QString consumerTag;
QMap<QString, QString> delayedBindings; QQueue<QPair<QString, QString> > delayedBindings;
QQueue<QAMQP::MessagePtr> messages_; QQueue<QAMQP::MessagePtr> messages_;
bool recievingMessage; bool recievingMessage;
void _q_content(const QAMQP::Frame::Content & frame); void _q_content(const QAMQP::Frame::Content & frame);
void _q_body(int channeNumber, const QByteArray & body); void _q_body(const QAMQP::Frame::ContentBody & frame);
}; };
} }
#endif // amqp_queue_p_h__ #endif // amqp_queue_p_h__