Fizes#24. Manually merge with fixes maked by @qnective

This commit is contained in:
Alexey Shcherbakov 2013-04-02 20:24:20 +06:00
parent e3327dbf6d
commit 1958be950d
14 changed files with 183 additions and 99 deletions

View File

@ -49,7 +49,8 @@ ClientPrivate::~ClientPrivate()
void ClientPrivate::init(QObject * parent)
{
pq_func()->setParent(parent);
if(!network_){
if(!network_)
{
network_ = new QAMQP::Network(pq_func());
}
@ -58,10 +59,9 @@ void ClientPrivate::init(QObject * parent)
connection_ = new QAMQP::Connection(pq_func());
}
setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD)));
network_->setMethodHandlerConnection(connection_);
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
connection_, SLOT(_q_method(const QAMQP::Frame::Method &)));
setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD)));
QObject::connect(connection_, SIGNAL(connected()), pq_func(), SIGNAL(connected()));
QObject::connect(connection_, SIGNAL(disconnected()), pq_func(), SIGNAL(disconnected()));
@ -134,32 +134,30 @@ void ClientPrivate::login()
Exchange * ClientPrivate::createExchange(int channelNumber, const QString &name )
{
Exchange * exchange_ = new Exchange(channelNumber, pq_func());
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
exchange_, SLOT(_q_method(const QAMQP::Frame::Method &)));
network_->addMethodHandlerForChannel(exchange_->channelNumber(), exchange_);
QObject::connect(connection_, SIGNAL(connected()), exchange_, SLOT(_q_open()));
exchange_->pd_func()->open();
QObject::connect(pq_func(), SIGNAL(disconnected()), exchange_, SLOT(_q_disconnected()));
exchange_->setName(name);
return exchange_;
}
Queue * ClientPrivate::createQueue(int channelNumber, const QString &name )
{
Queue * queue_ = new Queue(channelNumber, pq_func());
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
queue_, SLOT(_q_method(const QAMQP::Frame::Method &)));
QObject::connect(network_, SIGNAL(content(const QAMQP::Frame::Content &)),
queue_, SLOT(_q_content(const QAMQP::Frame::Content &)));
QObject::connect(network_, SIGNAL(body(int, const QByteArray &)),
queue_, SLOT(_q_body(int, const QByteArray &)));
network_->addMethodHandlerForChannel(queue_->channelNumber(), queue_);
network_->addContentHandlerForChannel(queue_->channelNumber(), queue_);
network_->addContentBodyHandlerForChannel(queue_->channelNumber(), queue_);
QObject::connect(connection_, SIGNAL(connected()), queue_, SLOT(_q_open()));
queue_->pd_func()->open();
QObject::connect(pq_func(), SIGNAL(disconnected()), queue_, SLOT(_q_disconnected()));
queue_->setName(name);
return queue_;
}

View File

@ -109,6 +109,11 @@ void QAMQP::Channel::stateChanged( int state )
}
}
void QAMQP::Channel::_q_method(const Frame::Method &frame)
{
pd_func()->_q_method(frame);
}
bool QAMQP::Channel::isOpened() const
{
return pd_func()->opened;
@ -156,6 +161,7 @@ void ChannelPrivate::init(int channelNumber, Client * parent)
bool ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
{
Q_ASSERT(frame.channel() == number);
if(frame.channel() != number )
return true;

View File

@ -9,7 +9,7 @@ namespace QAMQP
{
class ChannelPrivate;
class Client;
class Channel : public QObject
class Channel : public QObject, public Frame::MethodHandler
{
Q_OBJECT
@ -47,8 +47,9 @@ namespace QAMQP
private:
void stateChanged(int state);
friend class ClientPrivate;
void _q_method(const QAMQP::Frame::Method & frame);
Q_PRIVATE_SLOT(pd_func(), void _q_open())
Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame))
Q_PRIVATE_SLOT(pd_func(), void _q_disconnected())
};
}

View File

@ -260,6 +260,7 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int
bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
{
Q_ASSERT(frame.methodClass() == QAMQP::Frame::fcConnection);
if(frame.methodClass() != QAMQP::Frame::fcConnection)
return true;
@ -364,6 +365,11 @@ void Connection::openOk()
emit connected();
}
void Connection::_q_method(const QAMQP::Frame::Method & frame)
{
pd_func()->_q_method(frame);
}
bool Connection::isConnected() const
{
return pd_func()->connected;

View File

@ -11,7 +11,7 @@ namespace QAMQP
class ChannelPrivate;
class ClientPrivate;
class Client;
class Connection : public QObject
class Connection : public QObject, public Frame::MethodHandler
{
Q_OBJECT
P_DECLARE_PRIVATE(QAMQP::Connection)
@ -45,8 +45,9 @@ namespace QAMQP
void openOk();
friend class ClientPrivate;
friend class ChannelPrivate;
Q_PRIVATE_SLOT(pd_func(), void _q_method(const QAMQP::Frame::Method & frame))
Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat())
void _q_method(const QAMQP::Frame::Method & frame);
Q_PRIVATE_SLOT(pd_func(), void _q_heartbeat());
};
}

View File

@ -1,5 +1,4 @@
#include "amqp_frame.h"
#define AMQP_FRAME_END 0xCE
#include <QDateTime>
#include <QList>
@ -47,7 +46,7 @@ void QAMQP::Frame::Base::writeHeader( QDataStream & stream ) const
void QAMQP::Frame::Base::writeEnd( QDataStream & stream ) const
{
stream << qint8(AMQP_FRAME_END);
stream << qint8(FRAME_END);
}
void QAMQP::Frame::Base::writePayload( QDataStream & ) const{}
@ -64,7 +63,7 @@ void QAMQP::Frame::Base::readEnd( QDataStream & stream )
{
unsigned char end_ = 0;
stream.readRawData(reinterpret_cast<char*>(&end_), sizeof(end_));
if(end_ != AMQP_FRAME_END )
if(end_ != FRAME_END )
{
qWarning("Wrong end of frame");
}
@ -711,6 +710,6 @@ qint32 QAMQP::Frame::ContentBody::size() const
QAMQP::Frame::Heartbeat::Heartbeat() : Base(ftHeartbeat) {}
void QAMQP::Frame::Heartbeat::readPayload(QDataStream & stream) {}
void QAMQP::Frame::Heartbeat::writePayload(QDataStream & stream) const {}
void QAMQP::Frame::Heartbeat::readPayload(QDataStream & ) {}
void QAMQP::Frame::Heartbeat::writePayload(QDataStream & ) const {}

View File

@ -33,6 +33,21 @@ namespace QAMQP
*/
namespace Frame
{
typedef quint16 channel_t;
/*!
@brief Header size in bytes
*/
static const qint64 HEADER_SIZE = 7;
/*!
@brief Frame end indicator size in bytes
*/
static const qint64 FRAME_END_SIZE = 1;
/*!
@brief Frame end marker
*/
static const quint8 FRAME_END = 0xCE;
/*!
@brief Frame type
*/
@ -379,6 +394,24 @@ namespace QAMQP
void writePayload(QDataStream & stream) const;
void readPayload(QDataStream & stream);
};
class MethodHandler
{
public:
virtual void _q_method(const QAMQP::Frame::Method & frame) = 0;
};
class ContentHandler
{
public:
virtual void _q_content(const QAMQP::Frame::Content & frame) = 0;
};
class ContentBodyHandler
{
public:
virtual void _q_body(const QAMQP::Frame::ContentBody & frame) = 0;
};
}
}

View File

@ -13,7 +13,7 @@
#define AMQPPSWD "guest"
#define FRAME_MAX 131072
#define QAMQP_VERSION "0.1.2"
#define QAMQP_VERSION "0.2.0"

View File

@ -1,20 +1,16 @@
#include "amqp_network.h"
#include <QDebug>
#include <QTimer>
#include <QtEndian>
QAMQP::Network::Network( QObject * parent /*= 0*/ ):QObject(parent)
{
qRegisterMetaType<QAMQP::Frame::Method>("QAMQP::Frame::Method");
buffer_ = new QBuffer(this);
offsetBuf = 0;
leftSize = 0;
buffer_.reserve(Frame::HEADER_SIZE);
timeOut_ = 1000;
connect_ = false;
buffer_->open(QIODevice::ReadWrite);
initSocket(false);
}
@ -97,54 +93,59 @@ void QAMQP::Network::error( QAbstractSocket::SocketError socketError )
void QAMQP::Network::readyRead()
{
QDataStream streamA(socket_);
QDataStream streamB(buffer_);
while(!socket_->atEnd())
while(socket_->bytesAvailable() >= Frame::HEADER_SIZE)
{
if(leftSize == 0)
char* headerData = buffer_.data();
socket_->peek(headerData, Frame::HEADER_SIZE);
const quint32 payloadSize = qFromBigEndian<quint32>(*(quint32*)&headerData[3]);
const qint64 readSize = Frame::HEADER_SIZE+payloadSize+Frame::FRAME_END_SIZE;
if(socket_->bytesAvailable() >= readSize)
{
lastType_ = 0;
qint16 channel_ = 0;
leftSize = 0;
offsetBuf = 0;
streamA >> lastType_;
streamB << lastType_;
streamA >> channel_;
streamB << channel_;
streamA >> leftSize;
streamB << leftSize;
leftSize++;
buffer_.resize(readSize);
socket_->read(buffer_.data(), readSize);
const char* bufferData = buffer_.constData();
const quint8 type = *(quint8*)&bufferData[0];
const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE+payloadSize];
if(magic != QAMQP::Frame::FRAME_END)
{
qWarning() << "Wrong end frame";
}
QByteArray data_;
data_.resize(leftSize);
offsetBuf = streamA.readRawData(data_.data(), data_.size());
leftSize -= offsetBuf;
streamB.writeRawData(data_.data(), offsetBuf);
if(leftSize == 0)
{
buffer_->reset();
switch(QAMQP::Frame::Type(lastType_))
QDataStream streamB(&buffer_, QIODevice::ReadOnly);
switch(QAMQP::Frame::Type(type))
{
case QAMQP::Frame::ftMethod:
{
QAMQP::Frame::Method frame(streamB);
emit method(frame);
if(frame.methodClass() == QAMQP::Frame::fcConnection)
{
m_pMethodHandlerConnection->_q_method(frame);
}
else
{
foreach(Frame::MethodHandler* pMethodHandler, m_methodHandlersByChannel[frame.channel()])
{
pMethodHandler->_q_method(frame);
}
}
}
break;
case QAMQP::Frame::ftHeader:
{
QAMQP::Frame::Content frame(streamB);
emit content(frame);
foreach(Frame::ContentHandler* pMethodHandler, m_contentHandlerByChannel[frame.channel()])
{
pMethodHandler->_q_content(frame);
}
}
break;
case QAMQP::Frame::ftBody:
{
QAMQP::Frame::ContentBody frame(streamB);
emit body(frame.channel(), frame.body());
foreach(Frame::ContentBodyHandler* pMethodHandler, m_bodyHandlersByChannel[frame.channel()])
{
pMethodHandler->_q_body(frame);
}
}
break;
case QAMQP::Frame::ftHeartbeat:
@ -153,9 +154,12 @@ void QAMQP::Network::readyRead()
}
break;
default:
qWarning("AMQP: Unknown frame type");
qWarning() << "AMQP: Unknown frame type: " << type;
}
buffer_->reset();
}
else
{
break;
}
}
}
@ -194,8 +198,7 @@ void QAMQP::Network::initSocket( bool ssl /*= false*/ )
socket_ = new QSslSocket(this);
QSslSocket * ssl_= static_cast<QSslSocket*> (socket_.data());
ssl_->setProtocol(QSsl::AnyProtocol);
connect(socket_, SIGNAL(sslErrors(const QList<QSslError> &)),
this, SLOT(sslErrors()));
connect(socket_, SIGNAL(sslErrors(const QList<QSslError> &)), this, SLOT(sslErrors()));
connect(socket_, SIGNAL(connected()), this, SLOT(conectionReady()));
#else
@ -251,3 +254,23 @@ QAbstractSocket::SocketState QAMQP::Network::state() const
}
}
void QAMQP::Network::setMethodHandlerConnection(Frame::MethodHandler* pMethodHandlerConnection)
{
m_pMethodHandlerConnection = pMethodHandlerConnection;
}
void QAMQP::Network::addMethodHandlerForChannel(Channel channel, Frame::MethodHandler* pHandler)
{
m_methodHandlersByChannel[channel].append(pHandler);
}
void QAMQP::Network::addContentHandlerForChannel(Channel channel, Frame::ContentHandler* pHandler)
{
m_contentHandlerByChannel[channel].append(pHandler);
}
void QAMQP::Network::addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler* pHandler)
{
m_bodyHandlersByChannel[channel].append(pHandler);
}

View File

@ -18,6 +18,8 @@ namespace QAMQP
Q_OBJECT
Q_DISABLE_COPY(Network)
public:
typedef qint16 Channel;
Network(QObject * parent = 0);
~Network();
@ -34,6 +36,11 @@ namespace QAMQP
QAbstractSocket::SocketState state() const;
void setMethodHandlerConnection(Frame::MethodHandler* pMethodHandlerConnection);
void addMethodHandlerForChannel(Channel channel, Frame::MethodHandler* pHandler);
void addContentHandlerForChannel(Channel channel, Frame::ContentHandler* pHandler);
void addContentBodyHandlerForChannel(Channel channel, Frame::ContentBodyHandler* pHandler);
public slots:
void connectTo(const QString & host = QString(), quint32 port = 0);
void error( QAbstractSocket::SocketError socketError );
@ -41,9 +48,6 @@ namespace QAMQP
signals:
void connected();
void disconnected();
void method(const QAMQP::Frame::Method & method);
void content(const QAMQP::Frame::Content & content);
void body(int channeNumber, const QByteArray & body);
private slots:
void readyRead();
@ -56,16 +60,18 @@ namespace QAMQP
private:
void initSocket(bool ssl = false);
QPointer<QTcpSocket> socket_;
QPointer<QBuffer> buffer_;
QByteArray buffer_;
QString lastHost_;
int lastPort_;
int offsetBuf;
int leftSize;
qint8 lastType_;
bool autoReconnect_;
int timeOut_;
bool connect_;
Frame::MethodHandler* m_pMethodHandlerConnection;
QHash<Channel, QList<Frame::MethodHandler*> > m_methodHandlersByChannel;
QHash<Channel, QList<Frame::ContentHandler*> > m_contentHandlerByChannel;
QHash<Channel, QList<Frame::ContentBodyHandler*> > m_bodyHandlersByChannel;
};
}
#endif // amqp_network_h__

View File

@ -54,10 +54,10 @@ void Queue::onOpen()
}
if(!d->delayedBindings.isEmpty())
{
QMap<QString, QString>::iterator i;
for(i = d->delayedBindings.begin(); i!= d->delayedBindings.end(); ++i )
typedef QPair<QString, QString> BindingPair;
foreach(BindingPair binding, d->delayedBindings)
{
d->bind(i.value(), i.key());
d->bind(binding.first, binding.second);
}
d->delayedBindings.clear();
}
@ -129,6 +129,15 @@ void Queue::unbind( Exchange * exchange, const QString & key )
pd_func()->unbind(exchange->name(), key);
}
void Queue::_q_content(const Content &frame)
{
pd_func()->_q_content(frame);
}
void Queue::_q_body(const ContentBody &frame)
{
pd_func()->_q_body(frame);
}
QAMQP::MessagePtr Queue::getMessage()
{
@ -365,7 +374,7 @@ void QueuePrivate::bind( const QString & exchangeName, const QString & key )
{
if(!opened)
{
delayedBindings[exchangeName] = key;
delayedBindings.append(QPair<QString,QString>(exchangeName, key));
return;
}
@ -530,6 +539,7 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
{
Q_ASSERT(frame.channel() == number);
if(frame.channel() != number)
return;
if(messages_.isEmpty())
@ -537,7 +547,7 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
qErrnoWarning("Received content-header without method frame before");
return;
}
MessagePtr &message = messages_.head();
MessagePtr &message = messages_.last();
message->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator i;
for (i = frame.properties_.begin(); i != frame.properties_.end(); ++i)
@ -546,9 +556,10 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
}
}
void QueuePrivate::_q_body( int channeNumber, const QByteArray & body )
void QueuePrivate::_q_body(const QAMQP::Frame::ContentBody & frame)
{
if(channeNumber!= number)
Q_ASSERT(frame.channel() == number);
if(frame.channel() != number)
return;
if(messages_.isEmpty())
@ -556,12 +567,12 @@ void QueuePrivate::_q_body( int channeNumber, const QByteArray & body )
qErrnoWarning("Received content-body without method frame before");
return;
}
MessagePtr &message = messages_.head();
message->payload.append(body);
message->leftSize -= body.size();
MessagePtr &message = messages_.last();
message->payload.append(frame.body());
message->leftSize -= frame.body().size();
if(message->leftSize == 0 && messages_.size() == 1)
{
QMetaObject::invokeMethod(pq_func(), "messageReceived");
emit pq_func()->messageReceived(pq_func());
}
}

View File

@ -10,7 +10,7 @@ namespace QAMQP
class ClientPrivate;
class Exchange;
class QueuePrivate;
class Queue : public Channel
class Queue : public Channel, public Frame::ContentHandler, public Frame::ContentBodyHandler
{
Q_OBJECT
Queue(int channelNumber = -1, Client * parent = 0);
@ -77,12 +77,12 @@ namespace QAMQP
void declared();
void binded(bool);
void removed();
void messageReceived();
void messageReceived(QAMQP::Queue* pQueue);
void empty();
private:
Q_PRIVATE_SLOT(pd_func(), void _q_content(const QAMQP::Frame::Content & frame))
Q_PRIVATE_SLOT(pd_func(), void _q_body(int channeNumber, const QByteArray & body))
void _q_content(const QAMQP::Frame::Content & frame);
void _q_body(const QAMQP::Frame::ContentBody & frame);
};
}
#ifdef QAMQP_P_INCLUDE

View File

@ -58,13 +58,13 @@ namespace QAMQP
bool noAck;
QString consumerTag;
QMap<QString, QString> delayedBindings;
QQueue<QPair<QString, QString> > delayedBindings;
QQueue<QAMQP::MessagePtr> messages_;
bool recievingMessage;
void _q_content(const QAMQP::Frame::Content & frame);
void _q_body(int channeNumber, const QByteArray & body);
void _q_body(const QAMQP::Frame::ContentBody & frame);
};
}
#endif // amqp_queue_p_h__