move all frame handlers to private classes in an effort to make amqp_frame private

This commit is contained in:
Matt Broadstone 2014-06-04 09:46:15 -04:00
parent 9a4e1f2764
commit 343609d884
10 changed files with 241 additions and 248 deletions

View File

@ -76,6 +76,7 @@ bool ChannelPrivate::_q_method(const Frame::Method &frame)
closeOk(frame);
break;
}
return true;
}
@ -257,12 +258,6 @@ void Channel::setName(const QString &name)
d->name = name;
}
void Channel::_q_method(const Frame::Method &frame)
{
Q_D(Channel);
d->_q_method(frame);
}
bool Channel::isOpened() const
{
Q_D(const Channel);

View File

@ -9,7 +9,7 @@ namespace QAMQP
class Client;
class ChannelPrivate;
class QAMQP_EXPORT Channel : public QObject, public Frame::MethodHandler
class QAMQP_EXPORT Channel : public QObject
{
Q_OBJECT
Q_PROPERTY(int number READ channelNumber CONSTANT)
@ -49,9 +49,6 @@ protected:
Q_PRIVATE_SLOT(d_func(), void _q_open())
Q_PRIVATE_SLOT(d_func(), void _q_disconnected())
// method handling
void _q_method(const Frame::Method &frame);
friend class ClientPrivate;
};

View File

@ -2,15 +2,17 @@
#define amqp_channel_p_h__
#include <QPointer>
#include "amqp_frame.h"
#define METHOD_ID_ENUM(name, id) name = id, name ## Ok
namespace QAMQP
{
class Client;
class Network;
class ClientPrivate;
class ChannelPrivate
class ChannelPrivate : public Frame::MethodHandler
{
public:
enum MethodId {
@ -47,27 +49,27 @@ public:
void init(int channelNumber, Client *client);
void stateChanged(State state);
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
void sendFrame(const Frame::Base &frame);
void open();
void flow();
void flowOk();
void close(int code, const QString &text, int classId, int methodId);
void closeOk();
//////////////////////////////////////////////////////////////////////////
// reimp MethodHandler
virtual bool _q_method(const Frame::Method &frame);
void openOk(const Frame::Method &frame);
void flow(const Frame::Method &frame);
void flowOk(const Frame::Method &frame);
void close(const Frame::Method &frame);
void closeOk(const Frame::Method &frame);
virtual bool _q_method(const Frame::Method &frame);
// private slots
virtual void _q_disconnected();
void _q_open();
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
void sendFrame(const Frame::Base &frame);
QPointer<Client> client;
QString name;
int number;

View File

@ -198,17 +198,17 @@ void ClientPrivate::sendFrame(const Frame::Base &frame)
frame.toStream(stream);
}
void ClientPrivate::_q_method(const Frame::Method &frame)
bool ClientPrivate::_q_method(const Frame::Method &frame)
{
Q_ASSERT(frame.methodClass() == Frame::fcConnection);
if (frame.methodClass() != Frame::fcConnection)
return;
return false;
qDebug() << "Connection:";
if (closed) {
if (frame.id() == ClientPrivate::miCloseOk)
closeOk(frame);
return;
return false;
}
switch (ClientPrivate::MethodId(frame.id())) {
@ -233,6 +233,8 @@ void ClientPrivate::_q_method(const Frame::Method &frame)
default:
qWarning("Unknown method-id %d", frame.id());
}
return true;
}
void ClientPrivate::start(const Frame::Method &frame)
@ -534,7 +536,7 @@ Exchange *Client::createExchange(const QString &name, int channelNumber)
{
Q_D(Client);
Exchange *exchange = new Exchange(channelNumber, this);
d->methodHandlersByChannel[exchange->channelNumber()].append(exchange);
d->methodHandlersByChannel[exchange->channelNumber()].append(exchange->d_func());
connect(this, SIGNAL(connected()), exchange, SLOT(_q_open()));
exchange->d_func()->open();
connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected()));
@ -552,9 +554,9 @@ Queue *Client::createQueue(const QString &name, int channelNumber)
{
Q_D(Client);
Queue *queue = new Queue(channelNumber, this);
d->methodHandlersByChannel[queue->channelNumber()].append(queue);
d->contentHandlerByChannel[queue->channelNumber()].append(queue);
d->bodyHandlersByChannel[queue->channelNumber()].append(queue);
d->methodHandlersByChannel[queue->channelNumber()].append(queue->d_func());
d->contentHandlerByChannel[queue->channelNumber()].append(queue->d_func());
d->bodyHandlersByChannel[queue->channelNumber()].append(queue->d_func());
connect(this, SIGNAL(connected()), queue, SLOT(_q_open()));
queue->d_func()->open();
connect(this, SIGNAL(disconnected()), queue, SLOT(_q_disconnected()));

View File

@ -47,7 +47,7 @@ public:
void _q_socketError(QAbstractSocket::SocketError error);
void _q_heartbeat();
virtual void _q_method(const Frame::Method &frame);
virtual bool _q_method(const Frame::Method &frame);
// method handlers, FROM server
void start(const Frame::Method &frame);

View File

@ -37,11 +37,9 @@ public:
Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption)
typedef QHash<Frame::Content::Property, QVariant> MessageProperties;
virtual ~Exchange();
ExchangeOptions option() const;
virtual ~Exchange();
void declare(ExchangeType type = Direct,
ExchangeOptions options = NoOptions,
const Frame::TableField &args = Frame::TableField());

View File

@ -29,7 +29,7 @@
namespace QAMQP
{
class Queue;
class QueuePrivate;
namespace Frame
{
typedef quint16 channel_t;
@ -358,7 +358,8 @@ namespace Frame
qlonglong bodySize_;
private:
friend class QAMQP::Queue;
friend class QAMQP::QueuePrivate;
};
class QAMQP_EXPORT ContentBody : public Base
@ -400,7 +401,7 @@ namespace Frame
class QAMQP_EXPORT MethodHandler
{
public:
virtual void _q_method(const Frame::Method &frame) = 0;
virtual bool _q_method(const Frame::Method &frame) = 0;
};
class QAMQP_EXPORT ContentHandler

View File

@ -9,6 +9,215 @@ using namespace QAMQP;
#include <QDataStream>
#include <QFile>
QueuePrivate::QueuePrivate(Queue *q)
: ChannelPrivate(q),
delayedDeclare(false),
declared(false),
noAck(true),
recievingMessage(false)
{
}
QueuePrivate::~QueuePrivate()
{
}
bool QueuePrivate::_q_method(const Frame::Method &frame)
{
Q_Q(Queue);
if (ChannelPrivate::_q_method(frame))
return true;
if (frame.methodClass() == Frame::fcQueue) {
switch (frame.id()) {
case miDeclareOk:
declareOk(frame);
break;
case miDelete:
deleteOk(frame);
break;
case miBindOk:
bindOk(frame);
break;
case miUnbindOk:
unbindOk(frame);
break;
case miPurgeOk:
deleteOk(frame);
break;
default:
break;
}
return true;
}
if (frame.methodClass() == Frame::fcBasic) {
switch(frame.id()) {
case bmConsumeOk:
consumeOk(frame);
break;
case bmDeliver:
deliver(frame);
break;
case bmGetOk:
getOk(frame);
break;
case bmGetEmpty:
Q_EMIT q->empty();
break;
default:
break;
}
return true;
}
return false;
}
void QueuePrivate::_q_content(const Frame::Content &frame)
{
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return;
if (messages.isEmpty()) {
qErrnoWarning("Received content-header without method frame before");
return;
}
MessagePtr &message = messages.last();
message->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator it;
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
for (it = frame.properties_.constBegin(); it != itEnd; ++it)
message->property[Message::MessageProperty(it.key())] = it.value();
}
void QueuePrivate::_q_body(const Frame::ContentBody &frame)
{
Q_Q(Queue);
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return;
if (messages.isEmpty()) {
qErrnoWarning("Received content-body without method frame before");
return;
}
MessagePtr &message = messages.last();
message->payload.append(frame.body());
message->leftSize -= frame.body().size();
if (message->leftSize == 0 && messages.size() == 1)
Q_EMIT q->messageReceived(q);
}
void QueuePrivate::declareOk(const Frame::Method &frame)
{
Q_Q(Queue);
qDebug() << "Declared queue: " << name;
declared = true;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
name = Frame::readField('s', stream).toString();
qint32 messageCount = 0, consumerCount = 0;
stream >> messageCount >> consumerCount;
qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount);
Q_EMIT q->declared();
}
void QueuePrivate::deleteOk(const Frame::Method &frame)
{
Q_Q(Queue);
qDebug() << "Deleted or purged queue: " << name;
declared = false;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint32 messageCount = 0;
stream >> messageCount;
qDebug("Message count %d", messageCount);
Q_EMIT q->removed();
}
void QueuePrivate::bindOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Queue);
qDebug() << Q_FUNC_INFO << "bound to queue: " << name;
Q_EMIT q->bound();
}
void QueuePrivate::unbindOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Queue);
qDebug() << Q_FUNC_INFO << "unbound queue: " << name;
Q_EMIT q->unbound();
}
void QueuePrivate::getOk(const Frame::Method &frame)
{
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
qlonglong deliveryTag = Frame::readField('L',in).toLongLong();
bool redelivered = Frame::readField('t',in).toBool();
QString exchangeName = Frame::readField('s',in).toString();
QString routingKey = Frame::readField('s',in).toString();
Q_UNUSED(redelivered)
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag;
messages.enqueue(newMessage);
}
void QueuePrivate::consumeOk(const Frame::Method &frame)
{
qDebug() << "Consume ok: " << name;
declared = false;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
consumerTag = Frame::readField('s',stream).toString();
qDebug("Consumer tag = %s", qPrintable(consumerTag));
}
void QueuePrivate::deliver(const Frame::Method &frame)
{
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
QString consumer_ = Frame::readField('s',in).toString();
if (consumer_ != consumerTag)
return;
qlonglong deliveryTag = Frame::readField('L',in).toLongLong();
bool redelivered = Frame::readField('t',in).toBool();
QString exchangeName = Frame::readField('s',in).toString();
QString routingKey = Frame::readField('s',in).toString();
Q_UNUSED(redelivered)
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag;
messages.enqueue(newMessage);
}
//////////////////////////////////////////////////////////////////////////
Queue::Queue(int channelNumber, Client *parent)
: Channel(new QueuePrivate(this), parent)
{
@ -205,46 +414,6 @@ void Queue::unbind(const QString &exchangeName, const QString &key)
d->sendFrame(frame);
}
void Queue::_q_content(const Frame::Content &frame)
{
Q_D(Queue);
Q_ASSERT(frame.channel() == d->number);
if (frame.channel() != d->number)
return;
if (d->messages.isEmpty()) {
qErrnoWarning("Received content-header without method frame before");
return;
}
MessagePtr &message = d->messages.last();
message->leftSize = frame.bodySize();
QHash<int, QVariant>::ConstIterator it;
QHash<int, QVariant>::ConstIterator itEnd = frame.properties_.constEnd();
for (it = frame.properties_.constBegin(); it != itEnd; ++it)
message->property[Message::MessageProperty(it.key())] = it.value();
}
void Queue::_q_body(const Frame::ContentBody &frame)
{
Q_D(Queue);
Q_ASSERT(frame.channel() == d->number);
if (frame.channel() != d->number)
return;
if (d->messages.isEmpty()) {
qErrnoWarning("Received content-body without method frame before");
return;
}
MessagePtr &message = d->messages.last();
message->payload.append(frame.body());
message->leftSize -= frame.body().size();
if (message->leftSize == 0 && d->messages.size() == 1)
Q_EMIT messageReceived(this);
}
MessagePtr Queue::getMessage()
{
Q_D(Queue);
@ -341,173 +510,3 @@ void Queue::ack(const MessagePtr &message)
frame.setArguments(arguments);
d->sendFrame(frame);
}
//////////////////////////////////////////////////////////////////////////
QueuePrivate::QueuePrivate(Queue *q)
: ChannelPrivate(q),
delayedDeclare(false),
declared(false),
noAck(true),
recievingMessage(false)
{
}
QueuePrivate::~QueuePrivate()
{
}
bool QueuePrivate::_q_method(const Frame::Method &frame)
{
Q_Q(Queue);
if (ChannelPrivate::_q_method(frame))
return true;
if (frame.methodClass() == Frame::fcQueue) {
switch (frame.id()) {
case miDeclareOk:
declareOk(frame);
break;
case miDelete:
deleteOk(frame);
break;
case miBindOk:
bindOk(frame);
break;
case miUnbindOk:
unbindOk(frame);
break;
case miPurgeOk:
deleteOk(frame);
break;
default:
break;
}
return true;
}
if (frame.methodClass() == Frame::fcBasic) {
switch(frame.id()) {
case bmConsumeOk:
consumeOk(frame);
break;
case bmDeliver:
deliver(frame);
break;
case bmGetOk:
getOk(frame);
break;
case bmGetEmpty:
Q_EMIT q->empty();
break;
default:
break;
}
return true;
}
return false;
}
void QueuePrivate::declareOk(const Frame::Method &frame)
{
Q_Q(Queue);
qDebug() << "Declared queue: " << name;
declared = true;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
name = Frame::readField('s', stream).toString();
qint32 messageCount = 0, consumerCount = 0;
stream >> messageCount >> consumerCount;
qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount);
Q_EMIT q->declared();
}
void QueuePrivate::deleteOk(const Frame::Method &frame)
{
Q_Q(Queue);
qDebug() << "Deleted or purged queue: " << name;
declared = false;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint32 messageCount = 0;
stream >> messageCount;
qDebug("Message count %d", messageCount);
Q_EMIT q->removed();
}
void QueuePrivate::bindOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Queue);
qDebug() << Q_FUNC_INFO << "bound to queue: " << name;
Q_EMIT q->bound();
}
void QueuePrivate::unbindOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Queue);
qDebug() << Q_FUNC_INFO << "unbound queue: " << name;
Q_EMIT q->unbound();
}
void QueuePrivate::getOk(const Frame::Method &frame)
{
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
qlonglong deliveryTag = Frame::readField('L',in).toLongLong();
bool redelivered = Frame::readField('t',in).toBool();
QString exchangeName = Frame::readField('s',in).toString();
QString routingKey = Frame::readField('s',in).toString();
Q_UNUSED(redelivered)
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag;
messages.enqueue(newMessage);
}
void QueuePrivate::consumeOk(const Frame::Method &frame)
{
qDebug() << "Consume ok: " << name;
declared = false;
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
consumerTag = Frame::readField('s',stream).toString();
qDebug("Consumer tag = %s", qPrintable(consumerTag));
}
void QueuePrivate::deliver(const Frame::Method &frame)
{
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
QString consumer_ = Frame::readField('s',in).toString();
if (consumer_ != consumerTag)
return;
qlonglong deliveryTag = Frame::readField('L',in).toLongLong();
bool redelivered = Frame::readField('t',in).toBool();
QString exchangeName = Frame::readField('s',in).toString();
QString routingKey = Frame::readField('s',in).toString();
Q_UNUSED(redelivered)
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag;
messages.enqueue(newMessage);
}

View File

@ -12,9 +12,7 @@ class Client;
class ClientPrivate;
class Exchange;
class QueuePrivate;
class QAMQP_EXPORT Queue : public Channel,
public Frame::ContentHandler,
public Frame::ContentBodyHandler
class QAMQP_EXPORT Queue : public Channel
{
Q_OBJECT
Q_ENUMS(QueueOptions)
@ -83,9 +81,6 @@ protected:
private:
Queue(int channelNumber = -1, Client *parent = 0);
void _q_content(const Frame::Content &frame);
void _q_body(const Frame::ContentBody &frame);
Q_DISABLE_COPY(Queue)
Q_DECLARE_PRIVATE(Queue)

View File

@ -7,7 +7,9 @@
namespace QAMQP
{
class QueuePrivate: public ChannelPrivate
class QueuePrivate: public ChannelPrivate,
public Frame::ContentHandler,
public Frame::ContentBodyHandler
{
public:
enum MethodId {
@ -23,6 +25,8 @@ public:
// method handler related
virtual bool _q_method(const Frame::Method &frame);
virtual void _q_content(const Frame::Content &frame);
virtual void _q_body(const Frame::ContentBody &frame);
void declareOk(const Frame::Method &frame);
void deleteOk(const Frame::Method &frame);
void bindOk(const Frame::Method &frame);