This commit is contained in:
fuCtor 2012-02-18 03:40:24 -08:00
parent 958c554497
commit c3150322c1
15 changed files with 292 additions and 51 deletions

View File

@ -123,6 +123,12 @@ Queue * ClientPrivate::createQueue(int channelNumber, const QString &name )
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
queue_, SLOT(_q_method(const QAMQP::Frame::Method &)));
QObject::connect(network_, SIGNAL(content(const QAMQP::Frame::Content &)),
queue_, SLOT(_q_content(const QAMQP::Frame::Content &)));
QObject::connect(network_, SIGNAL(body(int, const QByteArray &)),
queue_, SLOT(_q_body(int, const QByteArray &)));
QObject::connect(connection_, SIGNAL(connected()), queue_, SLOT(_q_open()));
queue_->setName(name);
return queue_;

View File

@ -128,6 +128,11 @@ void QAMQP::Channel::onClose()
{
}
void QAMQP::Channel::setQOS( qint32 prefetchSize, quint16 prefetchCount )
{
d_func()->setQOS(prefetchSize, prefetchCount);
}
//////////////////////////////////////////////////////////////////////////
ChannelPrivate::ChannelPrivate(int version /* = QObjectPrivateVersion */)
@ -290,4 +295,9 @@ void ChannelPrivate::openOk( const QAMQP::Frame::Method & frame )
q->stateChanged(csOpened);
q->onOpen();
}
void ChannelPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount )
{
client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, number, false);
}

View File

@ -14,7 +14,7 @@ namespace QAMQP
Q_OBJECT
Q_PROPERTY(int number READ channelNumber);
Q_PROPERTY(QString name READ name WRITE setName);
Q_PROPERTY(QString name READ name WRITE setName);
Q_DECLARE_PRIVATE(QAMQP::Channel)
Q_DISABLE_COPY(Channel)
@ -29,7 +29,7 @@ namespace QAMQP
void setParam(int param);
void setName(const QString &name);
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
bool isOpened() const;
signals:

View File

@ -64,8 +64,8 @@ namespace QAMQP
virtual void _q_method(const QAMQP::Frame::Method & frame);
void _q_open();
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
void sendFrame(const QAMQP::Frame::Base & frame);
QPointer<Client> client_;

View File

@ -4,6 +4,7 @@
#include "amqp_p.h"
#include "amqp_frame.h"
#include <QCoreApplication>
#include <QDebug>
#include <QDataStream>
@ -216,6 +217,23 @@ void ConnectionPrivate::closeOk( const QAMQP::Frame::Method & )
QMetaObject::invokeMethod(q_func(), "disconnected");
}
void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int channel, bool global )
{
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, 10);
frame.setChannel(channel);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << prefetchSize;
out << prefetchCount;
out << qint8(global ? 1 : 0);
frame.setArguments(arguments_);
client_->d_func()->network_->sendFrame(frame);
}
void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
{
if(frame.methodClass() != QAMQP::Frame::fcConnection)
@ -332,4 +350,10 @@ void Connection::openOk()
bool Connection::isConnected() const
{
return d_func()->connected;
}
}
void Connection::setQOS( qint32 prefetchSize, quint16 prefetchCount )
{
d_func()->setQOS(prefetchSize, prefetchSize, 0, true);
}

View File

@ -8,6 +8,7 @@
namespace QAMQP
{
class ConnectionPrivate;
class ChannelPrivate;
class ClientPrivate;
class Client;
class Connection : public QObject
@ -28,6 +29,8 @@ namespace QAMQP
bool isConnected() const;
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
Q_SIGNALS:
void disconnected();
void connected();
@ -36,6 +39,7 @@ namespace QAMQP
private:
void openOk();
friend class ClientPrivate;
friend class ChannelPrivate;
Q_PRIVATE_SLOT(d_func(), void _q_method(const QAMQP::Frame::Method & frame))
};
}

View File

@ -39,6 +39,9 @@ namespace QAMQP
void close(const QAMQP::Frame::Method & frame);
void closeOk(const QAMQP::Frame::Method & frame);
void _q_method(const QAMQP::Frame::Method & frame);
void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global);
QPointer<Client> client_;
bool closed_;
bool connected;

View File

@ -143,16 +143,16 @@ void ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
void ExchangePrivate::declareOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Declared exchange: " << name;
QMetaObject::invokeMethod(q_func(), "declared");
qDebug() << "Declared exchange: " << name;
declared = true;
QMetaObject::invokeMethod(q_func(), "declared");
}
void ExchangePrivate::deleteOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Deleted exchange: " << name;
QMetaObject::invokeMethod(q_func(), "removed");
qDebug() << "Deleted exchange: " << name;
declared = false;
QMetaObject::invokeMethod(q_func(), "removed");
}
void ExchangePrivate::declare( )
@ -189,7 +189,7 @@ void ExchangePrivate::remove( bool ifUnused /*= true*/, bool noWait /*= true*/ )
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
stream << qint8(0); //reserver 1
stream << qint16(0); //reserver 1
writeField('s', stream, name);
qint8 flag = 0;

View File

@ -185,12 +185,26 @@ QVariant QAMQP::Frame::readField( qint8 valueType, QDataStream &s )
value = QVariant::fromValue<uint>(*reinterpret_cast<quint32*>(octet4));
break;
case 'L':
{
qlonglong v = 0 ;
s >> v;
value = v;
}
/*
s.readRawData(octet8, sizeof(octet8));
value = QVariant::fromValue<qlonglong>(*reinterpret_cast<qlonglong*>(octet8));
value = QVariant::fromValue<qlonglong>(*reinterpret_cast<qlonglong*>(octet8));*/
break;
case 'l':
{
qulonglong v = 0 ;
s >> v;
value = v;
}
/*
s.readRawData(octet8, sizeof(octet8));
value = QVariant::fromValue<qulonglong>(*reinterpret_cast<qulonglong*>(octet8));
value = QVariant::fromValue<qulonglong>(*reinterpret_cast<qulonglong*>(octet8));*/
break;
case 'f':
s.readRawData(octet4, sizeof(octet4));
@ -478,9 +492,9 @@ QAMQP::Frame::Content::Content( MethodClass methodClass ):Base(ftHeader)
methodClass_ = methodClass;
}
QAMQP::Frame::Content::Content( QDataStream& raw ):Base(ftHeader)
QAMQP::Frame::Content::Content( QDataStream& raw ): Base(raw)
{
readPayload(raw);
}
QAMQP::Frame::MethodClass QAMQP::Frame::Content::methodClass() const
@ -544,9 +558,58 @@ void QAMQP::Frame::Content::writePayload( QDataStream & out ) const
void QAMQP::Frame::Content::readPayload( QDataStream & in )
{
in >> methodClass_;
in.skipRawData(2); //weight
in >> bodySize_;
qint16 flags_ = 0;
in >> flags_;
if(flags_ & cpContentType)
properties_[cpContentType] = readField('s', in);
if(flags_ & cpContentEncoding)
properties_[cpContentEncoding] = readField('s', in);
if(flags_ & cpHeaders)
properties_[cpHeaders] = readField('f', in);
if(flags_ & cpDeliveryMode)
properties_[cpDeliveryMode] = readField('b', in);
if(flags_ & cpPriority)
properties_[cpPriority] = readField('b', in);
if(flags_ & cpCorrelationId)
properties_[cpCorrelationId] = readField('s', in);
if(flags_ & cpReplyTo)
properties_[cpReplyTo] = readField('s', in);
if(flags_ & cpExpiration)
properties_[cpExpiration] = readField('s', in);
if(flags_ & cpMessageId)
properties_[cpMessageId] = readField('s', in);
if(flags_ & cpTimestamp)
properties_[cpTimestamp] = readField('T', in);
if(flags_ & cpType)
properties_[cpType] = readField('s', in);
if(flags_ & cpUserId)
properties_[cpUserId] = readField('s', in);
if(flags_ & cpAppId)
properties_[cpAppId] = readField('s', in);
if(flags_ & cpClusterID)
properties_[cpClusterID] = readField('s', in);
}
qlonglong QAMQP::Frame::Content::bodySize() const
{
return body_.isEmpty() ? bodySize_ : body_.size();
}
//////////////////////////////////////////////////////////////////////////
ContentBody::ContentBody() : Base(ftBody)

View File

@ -128,6 +128,7 @@ namespace QAMQP
cpAppId = AMQP_BASIC_APP_ID_FLAG,
cpClusterID = AMQP_BASIC_CLUSTER_ID_FLAG
};
Q_DECLARE_FLAGS(Properties, Property)
Content();
Content(MethodClass methodClass);
@ -140,6 +141,7 @@ namespace QAMQP
void setBody(const QByteArray & data);
QByteArray body() const;
qlonglong bodySize() const;
protected:
void writePayload(QDataStream & stream) const;
@ -149,6 +151,7 @@ namespace QAMQP
QByteArray body_;
mutable QByteArray buffer_;
QHash<int, QVariant> properties_;
qlonglong bodySize_;
};
class ContentBody : public Base

View File

@ -94,6 +94,14 @@ void QAMQP::Network::readyRead()
emit method(frame);
}
break;
case QAMQP::Frame::ftHeader:
{
QAMQP::Frame::Content frame(streamB);
emit content(frame);
}
break;
case QAMQP::Frame::ftBody:
break;
default:
qWarning("Unknown frame type");
}

View File

@ -26,6 +26,8 @@ namespace QAMQP
signals:
void method(const QAMQP::Frame::Method & method);
void content(const QAMQP::Frame::Content & content);
void body(int channeNumber, const QByteArray & body);
private slots:
void connected();

View File

@ -125,14 +125,14 @@ void Queue::get()
}
void Queue::consume()
void Queue::consume(QueueOptions options)
{
d_func()->consume(options);
}
void Queue::setConsumerTag( const QString &consumerTag )
{
d_func()->setConsumerTag(consumerTag);
d_func()->consumerTag = consumerTag;
}
QString Queue::consumerTag() const
@ -147,6 +147,7 @@ QueuePrivate::QueuePrivate()
:ChannelPrivate()
, deleyedDeclare(false)
, declared(false)
, recievingMessage(false)
{
}
@ -160,36 +161,54 @@ QueuePrivate::~QueuePrivate()
void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
{
ChannelPrivate::_q_method(frame);
if(frame.methodClass() != QAMQP::Frame::fcQueue
|| frame.channel() != number )
if(frame.channel() != number)
return;
switch(frame.id())
if(frame.methodClass() == QAMQP::Frame::fcQueue)
{
case miDeclareOk:
declareOk(frame);
break;
case miDelete:
deleteOk(frame);
break;
case miBindOk:
bindOk(frame);
break;
case miUnbindOk:
unbindOk(frame);
break;
case miPurgeOk:
deleteOk(frame);
break;
default:
break;
switch(frame.id())
{
case miDeclareOk:
declareOk(frame);
break;
case miDelete:
deleteOk(frame);
break;
case miBindOk:
bindOk(frame);
break;
case miUnbindOk:
unbindOk(frame);
break;
case miPurgeOk:
deleteOk(frame);
break;
default:
break;
}
}
if(frame.methodClass() == QAMQP::Frame::fcBasic)
{
switch(frame.id())
{
case bmConsumeOk:
consumeOk(frame);
break;
case bmDeliver:
deliver(frame);
break;
default:
break;
}
}
}
void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Declared queue: " << name;
QMetaObject::invokeMethod(q_func(), "declared");
qDebug() << "Declared queue: " << name;
declared = true;
QByteArray data = frame.arguments();
@ -199,12 +218,13 @@ void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame )
qint32 messageCount = 0, consumerCount = 0;
stream >> messageCount >> consumerCount;
qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount);
QMetaObject::invokeMethod(q_func(), "declared");
}
void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Deleted or purged queue: " << name;
QMetaObject::invokeMethod(q_func(), "removed");
qDebug() << "Deleted or purged queue: " << name;
declared = false;
QByteArray data = frame.arguments();
@ -212,6 +232,7 @@ void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame )
qint32 messageCount = 0;
stream >> messageCount;
qDebug("Message count %d", messageCount);
QMetaObject::invokeMethod(q_func(), "removed");
}
@ -255,12 +276,12 @@ void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bo
if(!declared)
return;
QAMQP::Frame::Method frame(QAMQP::Frame::fcExchange, miDelete);
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miDelete);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint8(0); //reserver 1
out << qint16(0); //reserver 1
writeField('s', out, name);
qint8 flag = 0;
@ -282,13 +303,13 @@ void QueuePrivate::purge()
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miBind);
QAMQP::Frame::Method frame(QAMQP::Frame::fcQueue, miPurge);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
out << qint8(1); // no-wait
out << qint8(0); // no-wait
frame.setArguments(arguments_);
sendFrame(frame);
@ -340,7 +361,74 @@ void QueuePrivate::unbind( const QString & exchangeName, const QString & key )
}
void QueuePrivate::setConsumerTag( const QString &consumerTag )
void QueuePrivate::consume( Queue::QueueOptions options )
{
if(!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmConsume);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
writeField('s', out, consumerTag);
out << qint8(options); // no-wait
writeField('F', out, TableField());
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame )
{
qDebug() << "Consume ok: " << name;
declared = false;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
consumerTag = readField('s',stream).toString();
qDebug("Consumer tag = %s", qPrintable(consumerTag));
}
void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
{
qDebug() << "* Receive message: ";
declared = false;
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
QString consumer_ = readField('s',in).toString();
if(consumer_ != consumerTag)
{
return;
}
qlonglong deliveryTag = readField('L',in).toLongLong();
bool redelivered = readField('t',in).toBool();
QString exchangeName = readField('s',in).toString();
QString routingKey = readField('s',in).toString();
qDebug() << "| Delivery-tag: " << deliveryTag;
qDebug() << "| Redelivered: " << redelivered;
qDebug("| Exchange-name: %s", qPrintable(exchangeName));
qDebug("| Routing-key: %s", qPrintable(routingKey));
}
void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
{
if(frame.channel() != number)
return;
qDebug() << "Content-type: " << qPrintable(frame.property(Content::cpContentType).toString());
qDebug() << "Encoding-type: " << qPrintable(frame.property(Content::cpContentEncoding).toString());
}
void QueuePrivate::_q_body( int channeNumber, const QByteArray & body )
{
}

View File

@ -35,6 +35,15 @@ namespace QAMQP
NoWait = 0x10
};
Q_DECLARE_FLAGS(QueueOptions, QueueOption)
enum ConsumeOption {
coNoLocal = 0x1,
coNoAck = 0x02,
coExclusive = 0x04,
coNoWait = 0x8
};
Q_DECLARE_FLAGS(ConsumeOptions, ConsumeOption)
~Queue();
QueueOptions option() const;
@ -52,7 +61,7 @@ namespace QAMQP
void unbind(Exchange * exchange, const QString & key);
void get();
void consume();
void consume(ConsumeOptions options = NoOptions);
void setConsumerTag(const QString &consumerTag);
QString consumerTag() const;
@ -60,6 +69,12 @@ namespace QAMQP
void declared();
void binded(bool);
void removed();
private:
Q_PRIVATE_SLOT(d_func(), void _q_content(const QAMQP::Frame::Content & frame))
Q_PRIVATE_SLOT(d_func(), void _q_body(int channeNumber, const QByteArray & body))
};
}
#ifdef QAMQP_P_INCLUDE
# include "amqp_queue_p.h"
#endif
#endif // amqp_queue_h__

View File

@ -1,3 +1,6 @@
#ifndef amqp_queue_p_h__
#define amqp_queue_p_h__
#include "amqp_channel_p.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
@ -27,13 +30,19 @@ namespace QAMQP
void bind(const QString & exchangeName, const QString & key);
void unbind(const QString & exchangeName, const QString & key);
void setConsumerTag( const QString &consumerTag );
void declareOk(const QAMQP::Frame::Method & frame);
void deleteOk(const QAMQP::Frame::Method & frame);
void bindOk(const QAMQP::Frame::Method & frame);
void unbindOk(const QAMQP::Frame::Method & frame);
/************************************************************************/
/* CLASS BASIC METHODS */
/************************************************************************/
void consume(Queue::ConsumeOptions options);
void consumeOk(const QAMQP::Frame::Method & frame);
void deliver(const QAMQP::Frame::Method & frame);
QString type;
Queue::QueueOptions options;
@ -45,7 +54,13 @@ namespace QAMQP
QString consumerTag;
QMap<QString, QString> delayedBindings;
bool recievingMessage;
void _q_content(const QAMQP::Frame::Content & frame);
void _q_body(int channeNumber, const QByteArray & body);
};
}
}
#endif // amqp_queue_p_h__