diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index 1fc8c15..a703eb6 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -76,6 +76,7 @@ bool ChannelPrivate::_q_method(const Frame::Method &frame) closeOk(frame); break; } + return true; } @@ -257,12 +258,6 @@ void Channel::setName(const QString &name) d->name = name; } -void Channel::_q_method(const Frame::Method &frame) -{ - Q_D(Channel); - d->_q_method(frame); -} - bool Channel::isOpened() const { Q_D(const Channel); diff --git a/src/amqp_channel.h b/src/amqp_channel.h index 3fed4f8..aa677f7 100644 --- a/src/amqp_channel.h +++ b/src/amqp_channel.h @@ -9,7 +9,7 @@ namespace QAMQP class Client; class ChannelPrivate; -class QAMQP_EXPORT Channel : public QObject, public Frame::MethodHandler +class QAMQP_EXPORT Channel : public QObject { Q_OBJECT Q_PROPERTY(int number READ channelNumber CONSTANT) @@ -49,9 +49,6 @@ protected: Q_PRIVATE_SLOT(d_func(), void _q_open()) Q_PRIVATE_SLOT(d_func(), void _q_disconnected()) - // method handling - void _q_method(const Frame::Method &frame); - friend class ClientPrivate; }; diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h index a343a57..edcac1a 100644 --- a/src/amqp_channel_p.h +++ b/src/amqp_channel_p.h @@ -2,15 +2,17 @@ #define amqp_channel_p_h__ #include +#include "amqp_frame.h" #define METHOD_ID_ENUM(name, id) name = id, name ## Ok namespace QAMQP { + class Client; class Network; class ClientPrivate; -class ChannelPrivate +class ChannelPrivate : public Frame::MethodHandler { public: enum MethodId { @@ -47,27 +49,27 @@ public: void init(int channelNumber, Client *client); void stateChanged(State state); + void setQOS(qint32 prefetchSize, quint16 prefetchCount); + void sendFrame(const Frame::Base &frame); + void open(); void flow(); void flowOk(); void close(int code, const QString &text, int classId, int methodId); void closeOk(); - ////////////////////////////////////////////////////////////////////////// - + // reimp MethodHandler + virtual bool _q_method(const Frame::Method &frame); void openOk(const Frame::Method &frame); void flow(const Frame::Method &frame); void flowOk(const Frame::Method &frame); void close(const Frame::Method &frame); void closeOk(const Frame::Method &frame); - virtual bool _q_method(const Frame::Method &frame); + // private slots virtual void _q_disconnected(); void _q_open(); - void setQOS(qint32 prefetchSize, quint16 prefetchCount); - void sendFrame(const Frame::Base &frame); - QPointer client; QString name; int number; diff --git a/src/amqp_client.cpp b/src/amqp_client.cpp index 2e3a4ca..2a73690 100644 --- a/src/amqp_client.cpp +++ b/src/amqp_client.cpp @@ -198,17 +198,17 @@ void ClientPrivate::sendFrame(const Frame::Base &frame) frame.toStream(stream); } -void ClientPrivate::_q_method(const Frame::Method &frame) +bool ClientPrivate::_q_method(const Frame::Method &frame) { Q_ASSERT(frame.methodClass() == Frame::fcConnection); if (frame.methodClass() != Frame::fcConnection) - return; + return false; qDebug() << "Connection:"; if (closed) { if (frame.id() == ClientPrivate::miCloseOk) closeOk(frame); - return; + return false; } switch (ClientPrivate::MethodId(frame.id())) { @@ -233,6 +233,8 @@ void ClientPrivate::_q_method(const Frame::Method &frame) default: qWarning("Unknown method-id %d", frame.id()); } + + return true; } void ClientPrivate::start(const Frame::Method &frame) @@ -534,7 +536,7 @@ Exchange *Client::createExchange(const QString &name, int channelNumber) { Q_D(Client); Exchange *exchange = new Exchange(channelNumber, this); - d->methodHandlersByChannel[exchange->channelNumber()].append(exchange); + d->methodHandlersByChannel[exchange->channelNumber()].append(exchange->d_func()); connect(this, SIGNAL(connected()), exchange, SLOT(_q_open())); exchange->d_func()->open(); connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected())); @@ -552,9 +554,9 @@ Queue *Client::createQueue(const QString &name, int channelNumber) { Q_D(Client); Queue *queue = new Queue(channelNumber, this); - d->methodHandlersByChannel[queue->channelNumber()].append(queue); - d->contentHandlerByChannel[queue->channelNumber()].append(queue); - d->bodyHandlersByChannel[queue->channelNumber()].append(queue); + d->methodHandlersByChannel[queue->channelNumber()].append(queue->d_func()); + d->contentHandlerByChannel[queue->channelNumber()].append(queue->d_func()); + d->bodyHandlersByChannel[queue->channelNumber()].append(queue->d_func()); connect(this, SIGNAL(connected()), queue, SLOT(_q_open())); queue->d_func()->open(); connect(this, SIGNAL(disconnected()), queue, SLOT(_q_disconnected())); diff --git a/src/amqp_client_p.h b/src/amqp_client_p.h index 38359b2..5bf4ba2 100644 --- a/src/amqp_client_p.h +++ b/src/amqp_client_p.h @@ -47,7 +47,7 @@ public: void _q_socketError(QAbstractSocket::SocketError error); void _q_heartbeat(); - virtual void _q_method(const Frame::Method &frame); + virtual bool _q_method(const Frame::Method &frame); // method handlers, FROM server void start(const Frame::Method &frame); diff --git a/src/amqp_exchange.h b/src/amqp_exchange.h index 9d25862..1b67e9e 100644 --- a/src/amqp_exchange.h +++ b/src/amqp_exchange.h @@ -37,11 +37,9 @@ public: Q_DECLARE_FLAGS(ExchangeOptions, ExchangeOption) typedef QHash MessageProperties; - - virtual ~Exchange(); - ExchangeOptions option() const; + virtual ~Exchange(); void declare(ExchangeType type = Direct, ExchangeOptions options = NoOptions, const Frame::TableField &args = Frame::TableField()); diff --git a/src/amqp_frame.h b/src/amqp_frame.h index 833abc9..c40fc0a 100644 --- a/src/amqp_frame.h +++ b/src/amqp_frame.h @@ -29,7 +29,7 @@ namespace QAMQP { -class Queue; +class QueuePrivate; namespace Frame { typedef quint16 channel_t; @@ -358,7 +358,8 @@ namespace Frame qlonglong bodySize_; private: - friend class QAMQP::Queue; + friend class QAMQP::QueuePrivate; + }; class QAMQP_EXPORT ContentBody : public Base @@ -400,7 +401,7 @@ namespace Frame class QAMQP_EXPORT MethodHandler { public: - virtual void _q_method(const Frame::Method &frame) = 0; + virtual bool _q_method(const Frame::Method &frame) = 0; }; class QAMQP_EXPORT ContentHandler diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index 0fa75ae..0938f4c 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -9,6 +9,215 @@ using namespace QAMQP; #include #include +QueuePrivate::QueuePrivate(Queue *q) + : ChannelPrivate(q), + delayedDeclare(false), + declared(false), + noAck(true), + recievingMessage(false) +{ +} + +QueuePrivate::~QueuePrivate() +{ +} + +bool QueuePrivate::_q_method(const Frame::Method &frame) +{ + Q_Q(Queue); + if (ChannelPrivate::_q_method(frame)) + return true; + + if (frame.methodClass() == Frame::fcQueue) { + switch (frame.id()) { + case miDeclareOk: + declareOk(frame); + break; + case miDelete: + deleteOk(frame); + break; + case miBindOk: + bindOk(frame); + break; + case miUnbindOk: + unbindOk(frame); + break; + case miPurgeOk: + deleteOk(frame); + break; + default: + break; + } + + return true; + } + + if (frame.methodClass() == Frame::fcBasic) { + switch(frame.id()) { + case bmConsumeOk: + consumeOk(frame); + break; + case bmDeliver: + deliver(frame); + break; + case bmGetOk: + getOk(frame); + break; + case bmGetEmpty: + Q_EMIT q->empty(); + break; + default: + break; + } + return true; + } + + return false; +} + +void QueuePrivate::_q_content(const Frame::Content &frame) +{ + Q_ASSERT(frame.channel() == number); + if (frame.channel() != number) + return; + + if (messages.isEmpty()) { + qErrnoWarning("Received content-header without method frame before"); + return; + } + + MessagePtr &message = messages.last(); + message->leftSize = frame.bodySize(); + QHash::ConstIterator it; + QHash::ConstIterator itEnd = frame.properties_.constEnd(); + for (it = frame.properties_.constBegin(); it != itEnd; ++it) + message->property[Message::MessageProperty(it.key())] = it.value(); +} + +void QueuePrivate::_q_body(const Frame::ContentBody &frame) +{ + Q_Q(Queue); + Q_ASSERT(frame.channel() == number); + if (frame.channel() != number) + return; + + if (messages.isEmpty()) { + qErrnoWarning("Received content-body without method frame before"); + return; + } + + MessagePtr &message = messages.last(); + message->payload.append(frame.body()); + message->leftSize -= frame.body().size(); + + if (message->leftSize == 0 && messages.size() == 1) + Q_EMIT q->messageReceived(q); +} + +void QueuePrivate::declareOk(const Frame::Method &frame) +{ + Q_Q(Queue); + qDebug() << "Declared queue: " << name; + declared = true; + + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + + name = Frame::readField('s', stream).toString(); + qint32 messageCount = 0, consumerCount = 0; + stream >> messageCount >> consumerCount; + qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount); + + Q_EMIT q->declared(); +} + +void QueuePrivate::deleteOk(const Frame::Method &frame) +{ + Q_Q(Queue); + qDebug() << "Deleted or purged queue: " << name; + declared = false; + + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + qint32 messageCount = 0; + stream >> messageCount; + qDebug("Message count %d", messageCount); + + Q_EMIT q->removed(); +} + +void QueuePrivate::bindOk(const Frame::Method &frame) +{ + Q_UNUSED(frame) + + Q_Q(Queue); + qDebug() << Q_FUNC_INFO << "bound to queue: " << name; + Q_EMIT q->bound(); +} + +void QueuePrivate::unbindOk(const Frame::Method &frame) +{ + Q_UNUSED(frame) + + Q_Q(Queue); + qDebug() << Q_FUNC_INFO << "unbound queue: " << name; + Q_EMIT q->unbound(); +} + +void QueuePrivate::getOk(const Frame::Method &frame) +{ + QByteArray data = frame.arguments(); + QDataStream in(&data, QIODevice::ReadOnly); + + qlonglong deliveryTag = Frame::readField('L',in).toLongLong(); + bool redelivered = Frame::readField('t',in).toBool(); + QString exchangeName = Frame::readField('s',in).toString(); + QString routingKey = Frame::readField('s',in).toString(); + + Q_UNUSED(redelivered) + + MessagePtr newMessage = MessagePtr(new Message); + newMessage->routeKey = routingKey; + newMessage->exchangeName = exchangeName; + newMessage->deliveryTag = deliveryTag; + messages.enqueue(newMessage); +} + +void QueuePrivate::consumeOk(const Frame::Method &frame) +{ + qDebug() << "Consume ok: " << name; + declared = false; + + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + consumerTag = Frame::readField('s',stream).toString(); + qDebug("Consumer tag = %s", qPrintable(consumerTag)); +} + +void QueuePrivate::deliver(const Frame::Method &frame) +{ + QByteArray data = frame.arguments(); + QDataStream in(&data, QIODevice::ReadOnly); + QString consumer_ = Frame::readField('s',in).toString(); + if (consumer_ != consumerTag) + return; + + qlonglong deliveryTag = Frame::readField('L',in).toLongLong(); + bool redelivered = Frame::readField('t',in).toBool(); + QString exchangeName = Frame::readField('s',in).toString(); + QString routingKey = Frame::readField('s',in).toString(); + + Q_UNUSED(redelivered) + + MessagePtr newMessage = MessagePtr(new Message); + newMessage->routeKey = routingKey; + newMessage->exchangeName = exchangeName; + newMessage->deliveryTag = deliveryTag; + messages.enqueue(newMessage); +} + +////////////////////////////////////////////////////////////////////////// + Queue::Queue(int channelNumber, Client *parent) : Channel(new QueuePrivate(this), parent) { @@ -205,46 +414,6 @@ void Queue::unbind(const QString &exchangeName, const QString &key) d->sendFrame(frame); } -void Queue::_q_content(const Frame::Content &frame) -{ - Q_D(Queue); - Q_ASSERT(frame.channel() == d->number); - if (frame.channel() != d->number) - return; - - if (d->messages.isEmpty()) { - qErrnoWarning("Received content-header without method frame before"); - return; - } - - MessagePtr &message = d->messages.last(); - message->leftSize = frame.bodySize(); - QHash::ConstIterator it; - QHash::ConstIterator itEnd = frame.properties_.constEnd(); - for (it = frame.properties_.constBegin(); it != itEnd; ++it) - message->property[Message::MessageProperty(it.key())] = it.value(); -} - -void Queue::_q_body(const Frame::ContentBody &frame) -{ - Q_D(Queue); - Q_ASSERT(frame.channel() == d->number); - if (frame.channel() != d->number) - return; - - if (d->messages.isEmpty()) { - qErrnoWarning("Received content-body without method frame before"); - return; - } - - MessagePtr &message = d->messages.last(); - message->payload.append(frame.body()); - message->leftSize -= frame.body().size(); - - if (message->leftSize == 0 && d->messages.size() == 1) - Q_EMIT messageReceived(this); -} - MessagePtr Queue::getMessage() { Q_D(Queue); @@ -341,173 +510,3 @@ void Queue::ack(const MessagePtr &message) frame.setArguments(arguments); d->sendFrame(frame); } - -////////////////////////////////////////////////////////////////////////// - -QueuePrivate::QueuePrivate(Queue *q) - : ChannelPrivate(q), - delayedDeclare(false), - declared(false), - noAck(true), - recievingMessage(false) -{ -} - -QueuePrivate::~QueuePrivate() -{ -} - -bool QueuePrivate::_q_method(const Frame::Method &frame) -{ - Q_Q(Queue); - if (ChannelPrivate::_q_method(frame)) - return true; - - if (frame.methodClass() == Frame::fcQueue) { - switch (frame.id()) { - case miDeclareOk: - declareOk(frame); - break; - case miDelete: - deleteOk(frame); - break; - case miBindOk: - bindOk(frame); - break; - case miUnbindOk: - unbindOk(frame); - break; - case miPurgeOk: - deleteOk(frame); - break; - default: - break; - } - - return true; - } - - if (frame.methodClass() == Frame::fcBasic) { - switch(frame.id()) { - case bmConsumeOk: - consumeOk(frame); - break; - case bmDeliver: - deliver(frame); - break; - case bmGetOk: - getOk(frame); - break; - case bmGetEmpty: - Q_EMIT q->empty(); - break; - default: - break; - } - return true; - } - - return false; -} - -void QueuePrivate::declareOk(const Frame::Method &frame) -{ - Q_Q(Queue); - qDebug() << "Declared queue: " << name; - declared = true; - - QByteArray data = frame.arguments(); - QDataStream stream(&data, QIODevice::ReadOnly); - - name = Frame::readField('s', stream).toString(); - qint32 messageCount = 0, consumerCount = 0; - stream >> messageCount >> consumerCount; - qDebug("Message count %d\nConsumer count: %d", messageCount, consumerCount); - - Q_EMIT q->declared(); -} - -void QueuePrivate::deleteOk(const Frame::Method &frame) -{ - Q_Q(Queue); - qDebug() << "Deleted or purged queue: " << name; - declared = false; - - QByteArray data = frame.arguments(); - QDataStream stream(&data, QIODevice::ReadOnly); - qint32 messageCount = 0; - stream >> messageCount; - qDebug("Message count %d", messageCount); - - Q_EMIT q->removed(); -} - -void QueuePrivate::bindOk(const Frame::Method &frame) -{ - Q_UNUSED(frame) - - Q_Q(Queue); - qDebug() << Q_FUNC_INFO << "bound to queue: " << name; - Q_EMIT q->bound(); -} - -void QueuePrivate::unbindOk(const Frame::Method &frame) -{ - Q_UNUSED(frame) - - Q_Q(Queue); - qDebug() << Q_FUNC_INFO << "unbound queue: " << name; - Q_EMIT q->unbound(); -} - -void QueuePrivate::getOk(const Frame::Method &frame) -{ - QByteArray data = frame.arguments(); - QDataStream in(&data, QIODevice::ReadOnly); - - qlonglong deliveryTag = Frame::readField('L',in).toLongLong(); - bool redelivered = Frame::readField('t',in).toBool(); - QString exchangeName = Frame::readField('s',in).toString(); - QString routingKey = Frame::readField('s',in).toString(); - - Q_UNUSED(redelivered) - - MessagePtr newMessage = MessagePtr(new Message); - newMessage->routeKey = routingKey; - newMessage->exchangeName = exchangeName; - newMessage->deliveryTag = deliveryTag; - messages.enqueue(newMessage); -} - -void QueuePrivate::consumeOk(const Frame::Method &frame) -{ - qDebug() << "Consume ok: " << name; - declared = false; - - QByteArray data = frame.arguments(); - QDataStream stream(&data, QIODevice::ReadOnly); - consumerTag = Frame::readField('s',stream).toString(); - qDebug("Consumer tag = %s", qPrintable(consumerTag)); -} - -void QueuePrivate::deliver(const Frame::Method &frame) -{ - QByteArray data = frame.arguments(); - QDataStream in(&data, QIODevice::ReadOnly); - QString consumer_ = Frame::readField('s',in).toString(); - if (consumer_ != consumerTag) - return; - - qlonglong deliveryTag = Frame::readField('L',in).toLongLong(); - bool redelivered = Frame::readField('t',in).toBool(); - QString exchangeName = Frame::readField('s',in).toString(); - QString routingKey = Frame::readField('s',in).toString(); - - Q_UNUSED(redelivered) - - MessagePtr newMessage = MessagePtr(new Message); - newMessage->routeKey = routingKey; - newMessage->exchangeName = exchangeName; - newMessage->deliveryTag = deliveryTag; - messages.enqueue(newMessage); -} diff --git a/src/amqp_queue.h b/src/amqp_queue.h index 0662aeb..a8217d3 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -12,9 +12,7 @@ class Client; class ClientPrivate; class Exchange; class QueuePrivate; -class QAMQP_EXPORT Queue : public Channel, - public Frame::ContentHandler, - public Frame::ContentBodyHandler +class QAMQP_EXPORT Queue : public Channel { Q_OBJECT Q_ENUMS(QueueOptions) @@ -83,9 +81,6 @@ protected: private: Queue(int channelNumber = -1, Client *parent = 0); - void _q_content(const Frame::Content &frame); - void _q_body(const Frame::ContentBody &frame); - Q_DISABLE_COPY(Queue) Q_DECLARE_PRIVATE(Queue) diff --git a/src/amqp_queue_p.h b/src/amqp_queue_p.h index 65d4278..f0e747a 100644 --- a/src/amqp_queue_p.h +++ b/src/amqp_queue_p.h @@ -7,7 +7,9 @@ namespace QAMQP { -class QueuePrivate: public ChannelPrivate +class QueuePrivate: public ChannelPrivate, + public Frame::ContentHandler, + public Frame::ContentBodyHandler { public: enum MethodId { @@ -23,6 +25,8 @@ public: // method handler related virtual bool _q_method(const Frame::Method &frame); + virtual void _q_content(const Frame::Content &frame); + virtual void _q_body(const Frame::ContentBody &frame); void declareOk(const Frame::Method &frame); void deleteOk(const Frame::Method &frame); void bindOk(const Frame::Method &frame);