diff --git a/src/amqp_channel.cpp b/src/amqp_channel.cpp index a73bf14..01134b4 100644 --- a/src/amqp_channel.cpp +++ b/src/amqp_channel.cpp @@ -10,14 +10,210 @@ using namespace QAMQP; +int ChannelPrivate::nextChannelNumber = 0; +ChannelPrivate::ChannelPrivate(Channel *q) + : number(0), + opened(false), + needOpen(true), + q_ptr(q) +{ +} + +ChannelPrivate::~ChannelPrivate() +{ +} + +void ChannelPrivate::init(int channelNumber, Client *c) +{ + client = c; + needOpen = channelNumber == -1 ? true : false; + number = channelNumber == -1 ? ++nextChannelNumber : channelNumber; + nextChannelNumber = qMax(channelNumber, (nextChannelNumber + 1)); +} + +void ChannelPrivate::stateChanged(State state) +{ + Q_Q(Channel); + switch(ChannelPrivate::State(state)) { + case ChannelPrivate::csOpened: + Q_EMIT q->opened(); + break; + case ChannelPrivate::csClosed: + Q_EMIT q->closed(); + break; + case ChannelPrivate::csIdle: + Q_EMIT q->flowChanged(false); + break; + case ChannelPrivate::csRunning: + Q_EMIT q->flowChanged(true); + break; + } +} + +bool ChannelPrivate::_q_method(const Frame::Method &frame) +{ + Q_ASSERT(frame.channel() == number); + if (frame.channel() != number) + return true; + + if (frame.methodClass() != Frame::fcChannel) + return false; + + qDebug("Channel#%d:", number); + + switch (frame.id()) { + case miOpenOk: + openOk(frame); + break; + case miFlow: + flow(frame); + break; + case miFlowOk: + flowOk(frame); + break; + case miClose: + close(frame); + break; + case miCloseOk: + closeOk(frame); + break; + } + return true; +} + +void ChannelPrivate::_q_open() +{ + open(); +} + +void ChannelPrivate::sendFrame(const Frame::Base &frame) +{ + if (client) + client->d_func()->network_->sendFrame(frame); +} + +void ChannelPrivate::open() +{ + if (!needOpen || opened) + return; + + if (!client->d_func()->connection_->isConnected()) + return; + + qDebug("Open channel #%d", number); + Frame::Method frame(Frame::fcChannel, miOpen); + frame.setChannel(number); + + QByteArray arguments; + arguments.resize(1); + arguments[0] = 0; + + frame.setArguments(arguments); + sendFrame(frame); +} + +void ChannelPrivate::flow() +{ +} + +void ChannelPrivate::flow(const Frame::Method &frame) +{ + Q_UNUSED(frame); +} + +void ChannelPrivate::flowOk() +{ +} + +void ChannelPrivate::flowOk(const Frame::Method &frame) +{ + Q_UNUSED(frame); +} + +void ChannelPrivate::close(int code, const QString &text, int classId, int methodId) +{ + Frame::Method frame(Frame::fcChannel, miClose); + QByteArray arguments; + QDataStream stream(&arguments, QIODevice::WriteOnly); + + Frame::writeField('s',stream, client->virtualHost()); + + stream << qint16(code); + Frame::writeField('s', stream, text); + stream << qint16(classId); + stream << qint16(methodId); + + frame.setArguments(arguments); + sendFrame(frame); +} + +void ChannelPrivate::close(const Frame::Method &frame) +{ + qDebug(">> CLOSE"); + stateChanged(csClosed); + QByteArray data = frame.arguments(); + QDataStream stream(&data, QIODevice::ReadOnly); + qint16 code = 0, classId, methodId; + stream >> code; + QString text(Frame::readField('s', stream).toString()); + stream >> classId; + stream >> methodId; + + qDebug(">> code: %d", code); + qDebug(">> text: %s", qPrintable(text)); + qDebug(">> class-id: %d", classId); + qDebug(">> method-id: %d", methodId); +} + +void ChannelPrivate::closeOk() +{ + Frame::Method frame(Frame::fcChannel, miCloseOk); + sendFrame(frame); +} + +void ChannelPrivate::closeOk(const Frame::Method &frame) +{ + Q_UNUSED(frame) + Q_Q(Channel); + + stateChanged(csClosed); + q->channelClosed(); + opened = false; +} + +void ChannelPrivate::openOk(const Frame::Method &frame) +{ + Q_UNUSED(frame) + Q_Q(Channel); + + qDebug(">> OpenOK"); + opened = true; + stateChanged(csOpened); + q->channelOpened(); +} + +void ChannelPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount) +{ + Q_UNUSED(prefetchSize) + Q_UNUSED(prefetchCount) + qDebug() << Q_FUNC_INFO << "temporarily disabled"; +// client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, number, false); +} + +void ChannelPrivate::_q_disconnected() +{ + nextChannelNumber = 0; + opened = false; +} + ////////////////////////////////////////////////////////////////////////// -Channel::Channel(int channelNumber, Client *parent) - : QObject(parent), +Channel::Channel(int channelNumber, Client *client) + : QObject(client), d_ptr(new ChannelPrivate(this)) { Q_D(Channel); - d->init(channelNumber, parent); + d->init(channelNumber, client); } Channel::Channel(ChannelPrivate *dd, Client *parent) @@ -63,24 +259,6 @@ void Channel::setName(const QString &name) d->name = name; } -void Channel::stateChanged(int state) -{ - switch(ChannelPrivate::State(state)) { - case ChannelPrivate::csOpened: - emit opened(); - break; - case ChannelPrivate::csClosed: - emit closed(); - break; - case ChannelPrivate::csIdle: - emit flowChanged(false); - break; - case ChannelPrivate::csRunning: - emit flowChanged(true); - break; - } -} - void Channel::_q_method(const Frame::Method &frame) { Q_D(Channel); @@ -93,199 +271,10 @@ bool Channel::isOpened() const return d->opened; } -void Channel::onOpen() -{ -} - -void Channel::onClose() -{ -} - void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount) { Q_D(Channel); d->setQOS(prefetchSize, prefetchCount); } -////////////////////////////////////////////////////////////////////////// - -int ChannelPrivate::nextChannelNumber_ = 0; -ChannelPrivate::ChannelPrivate(Channel *q) - : number(0), - opened(false), - needOpen(true), - q_ptr(q) -{ -} - -ChannelPrivate::~ChannelPrivate() -{ -} - -void ChannelPrivate::init(int channelNumber, Client *parent) -{ - Q_Q(Channel); - needOpen = channelNumber == -1 ? true : false; - number = channelNumber == -1 ? ++nextChannelNumber_ : channelNumber; - nextChannelNumber_ = qMax(channelNumber, (nextChannelNumber_ + 1)); - q->setParent(parent); - client_ = parent; -} - -bool ChannelPrivate::_q_method(const Frame::Method &frame) -{ - Q_ASSERT(frame.channel() == number); - if (frame.channel() != number) - return true; - - if (frame.methodClass() != Frame::fcChannel) - return false; - - qDebug("Channel#%d:", number); - - switch (frame.id()) { - case miOpenOk: - openOk(frame); - break; - case miFlow: - flow(frame); - break; - case miFlowOk: - flowOk(frame); - break; - case miClose: - close(frame); - break; - case miCloseOk: - closeOk(frame); - break; - } - return true; -} - -void ChannelPrivate::_q_open() -{ - open(); -} - -void ChannelPrivate::sendFrame(const Frame::Base &frame) -{ - if (client_) - client_->d_func()->network_->sendFrame(frame); -} - -void ChannelPrivate::open() -{ - if (!needOpen || opened) - return; - - if (!client_->d_func()->connection_->isConnected()) - return; - - qDebug("Open channel #%d", number); - Frame::Method frame(Frame::fcChannel, miOpen); - frame.setChannel(number); - QByteArray arguments_; - arguments_.resize(1); - arguments_[0] = 0; - frame.setArguments(arguments_); - sendFrame(frame); -} - -void ChannelPrivate::flow() -{ -} - -void ChannelPrivate::flow(const Frame::Method &frame) -{ - Q_UNUSED(frame); -} - -void ChannelPrivate::flowOk() -{ -} - -void ChannelPrivate::flowOk(const Frame::Method &frame) -{ - Q_UNUSED(frame); -} - -void ChannelPrivate::close(int code, const QString &text, int classId, int methodId) -{ - Frame::Method frame(Frame::fcChannel, miClose); - QByteArray arguments_; - QDataStream stream(&arguments_, QIODevice::WriteOnly); - - Frame::writeField('s',stream, client_->virtualHost()); - - stream << qint16(code); - Frame::writeField('s', stream, text); - stream << qint16(classId); - stream << qint16(methodId); - - frame.setArguments(arguments_); - client_->d_func()->network_->sendFrame(frame); -} - -void ChannelPrivate::close(const Frame::Method &frame) -{ - Q_Q(Channel); - q->stateChanged(csClosed); - - qDebug(">> CLOSE"); - QByteArray data = frame.arguments(); - QDataStream stream(&data, QIODevice::ReadOnly); - qint16 code_ = 0, classId, methodId; - stream >> code_; - QString text(Frame::readField('s', stream).toString()); - stream >> classId; - stream >> methodId; - - qDebug(">> code: %d", code_); - qDebug(">> text: %s", qPrintable(text)); - qDebug(">> class-id: %d", classId); - qDebug(">> method-id: %d", methodId); -} - -void ChannelPrivate::closeOk() -{ - Frame::Method frame(Frame::fcChannel, miCloseOk); - sendFrame(frame); -} - -void ChannelPrivate::closeOk(const Frame::Method &frame) -{ - Q_UNUSED(frame) - Q_Q(Channel); - - q->stateChanged(csClosed); - q->onClose(); - opened = false; -} - -void ChannelPrivate::openOk(const Frame::Method &frame) -{ - Q_UNUSED(frame) - Q_Q(Channel); - - qDebug(">> OpenOK"); - opened = true; - q->stateChanged(csOpened); - q->onOpen(); -} - -void ChannelPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount) -{ - Q_UNUSED(prefetchSize) - Q_UNUSED(prefetchCount) - qDebug() << Q_FUNC_INFO << "temporarily disabled"; -// client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, number, false); -} - -void ChannelPrivate::_q_disconnected() -{ - nextChannelNumber_ = 0; - opened = false; -} - #include "moc_amqp_channel.cpp" diff --git a/src/amqp_channel.h b/src/amqp_channel.h index 9181abf..3fed4f8 100644 --- a/src/amqp_channel.h +++ b/src/amqp_channel.h @@ -35,8 +35,12 @@ Q_SIGNALS: void flowChanged(bool enabled); protected: - Channel(int channelNumber = -1, Client *parent = 0); - Channel(ChannelPrivate *dd, Client *parent = 0); + virtual void channelOpened() = 0; + virtual void channelClosed() = 0; + +protected: + explicit Channel(int channelNumber = -1, Client *client = 0); + explicit Channel(ChannelPrivate *dd, Client *client); Q_DISABLE_COPY(Channel) Q_DECLARE_PRIVATE(Channel) @@ -45,10 +49,7 @@ protected: Q_PRIVATE_SLOT(d_func(), void _q_open()) Q_PRIVATE_SLOT(d_func(), void _q_disconnected()) - // should move to private classes - virtual void onOpen(); - virtual void onClose(); - void stateChanged(int state); + // method handling void _q_method(const Frame::Method &frame); friend class ClientPrivate; diff --git a/src/amqp_channel_p.h b/src/amqp_channel_p.h index 65c4ed0..a343a57 100644 --- a/src/amqp_channel_p.h +++ b/src/amqp_channel_p.h @@ -8,6 +8,7 @@ namespace QAMQP { class Client; +class Network; class ClientPrivate; class ChannelPrivate { @@ -43,7 +44,8 @@ public: ChannelPrivate(Channel *q); virtual ~ChannelPrivate(); - void init(int channelNumber, Client *parent); + void init(int channelNumber, Client *client); + void stateChanged(State state); void open(); void flow(); @@ -66,12 +68,10 @@ public: void setQOS(qint32 prefetchSize, quint16 prefetchCount); void sendFrame(const Frame::Base &frame); - QPointer client_; - + QPointer client; QString name; int number; - - static int nextChannelNumber_; + static int nextChannelNumber; bool opened; bool needOpen; diff --git a/src/amqp_exchange.cpp b/src/amqp_exchange.cpp index b898326..f10dde3 100644 --- a/src/amqp_exchange.cpp +++ b/src/amqp_exchange.cpp @@ -20,14 +20,14 @@ Exchange::~Exchange() remove(); } -void Exchange::onOpen() +void Exchange::channelOpened() { Q_D(Exchange); if (d->delayedDeclare) declare(Exchange::Direct); } -void Exchange::onClose() +void Exchange::channelClosed() { remove(true, true); } diff --git a/src/amqp_exchange.h b/src/amqp_exchange.h index f7435bf..9d25862 100644 --- a/src/amqp_exchange.h +++ b/src/amqp_exchange.h @@ -67,8 +67,8 @@ Q_SIGNALS: void removed(); protected: - void onOpen(); - void onClose(); + virtual void channelOpened(); + virtual void channelClosed(); private: explicit Exchange(int channelNumber = -1, Client *parent = 0); diff --git a/src/amqp_queue.cpp b/src/amqp_queue.cpp index 00c7f7a..0fa75ae 100644 --- a/src/amqp_queue.cpp +++ b/src/amqp_queue.cpp @@ -21,7 +21,7 @@ Queue::~Queue() remove(); } -void Queue::onOpen() +void Queue::channelOpened() { Q_D(Queue); if (d->delayedDeclare) @@ -35,7 +35,7 @@ void Queue::onOpen() } } -void Queue::onClose() +void Queue::channelClosed() { remove(true, true); } diff --git a/src/amqp_queue.h b/src/amqp_queue.h index d10ffb7..0662aeb 100644 --- a/src/amqp_queue.h +++ b/src/amqp_queue.h @@ -76,8 +76,9 @@ Q_SIGNALS: void empty(); protected: - void onOpen(); - void onClose(); + // reimp Channel + virtual void channelOpened(); + virtual void channelClosed(); private: Queue(int channelNumber = -1, Client *parent = 0);