refactored Channel, changed some methods to better convey their intention

This commit is contained in:
Matt Broadstone 2014-06-03 13:00:25 -04:00
parent 128f350cf8
commit 1198db2857
7 changed files with 220 additions and 229 deletions

View File

@ -10,14 +10,210 @@
using namespace QAMQP;
int ChannelPrivate::nextChannelNumber = 0;
ChannelPrivate::ChannelPrivate(Channel *q)
: number(0),
opened(false),
needOpen(true),
q_ptr(q)
{
}
ChannelPrivate::~ChannelPrivate()
{
}
void ChannelPrivate::init(int channelNumber, Client *c)
{
client = c;
needOpen = channelNumber == -1 ? true : false;
number = channelNumber == -1 ? ++nextChannelNumber : channelNumber;
nextChannelNumber = qMax(channelNumber, (nextChannelNumber + 1));
}
void ChannelPrivate::stateChanged(State state)
{
Q_Q(Channel);
switch(ChannelPrivate::State(state)) {
case ChannelPrivate::csOpened:
Q_EMIT q->opened();
break;
case ChannelPrivate::csClosed:
Q_EMIT q->closed();
break;
case ChannelPrivate::csIdle:
Q_EMIT q->flowChanged(false);
break;
case ChannelPrivate::csRunning:
Q_EMIT q->flowChanged(true);
break;
}
}
bool ChannelPrivate::_q_method(const Frame::Method &frame)
{
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return true;
if (frame.methodClass() != Frame::fcChannel)
return false;
qDebug("Channel#%d:", number);
switch (frame.id()) {
case miOpenOk:
openOk(frame);
break;
case miFlow:
flow(frame);
break;
case miFlowOk:
flowOk(frame);
break;
case miClose:
close(frame);
break;
case miCloseOk:
closeOk(frame);
break;
}
return true;
}
void ChannelPrivate::_q_open()
{
open();
}
void ChannelPrivate::sendFrame(const Frame::Base &frame)
{
if (client)
client->d_func()->network_->sendFrame(frame);
}
void ChannelPrivate::open()
{
if (!needOpen || opened)
return;
if (!client->d_func()->connection_->isConnected())
return;
qDebug("Open channel #%d", number);
Frame::Method frame(Frame::fcChannel, miOpen);
frame.setChannel(number);
QByteArray arguments;
arguments.resize(1);
arguments[0] = 0;
frame.setArguments(arguments);
sendFrame(frame);
}
void ChannelPrivate::flow()
{
}
void ChannelPrivate::flow(const Frame::Method &frame)
{
Q_UNUSED(frame);
}
void ChannelPrivate::flowOk()
{
}
void ChannelPrivate::flowOk(const Frame::Method &frame)
{
Q_UNUSED(frame);
}
void ChannelPrivate::close(int code, const QString &text, int classId, int methodId)
{
Frame::Method frame(Frame::fcChannel, miClose);
QByteArray arguments;
QDataStream stream(&arguments, QIODevice::WriteOnly);
Frame::writeField('s',stream, client->virtualHost());
stream << qint16(code);
Frame::writeField('s', stream, text);
stream << qint16(classId);
stream << qint16(methodId);
frame.setArguments(arguments);
sendFrame(frame);
}
void ChannelPrivate::close(const Frame::Method &frame)
{
qDebug(">> CLOSE");
stateChanged(csClosed);
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code = 0, classId, methodId;
stream >> code;
QString text(Frame::readField('s', stream).toString());
stream >> classId;
stream >> methodId;
qDebug(">> code: %d", code);
qDebug(">> text: %s", qPrintable(text));
qDebug(">> class-id: %d", classId);
qDebug(">> method-id: %d", methodId);
}
void ChannelPrivate::closeOk()
{
Frame::Method frame(Frame::fcChannel, miCloseOk);
sendFrame(frame);
}
void ChannelPrivate::closeOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Channel);
stateChanged(csClosed);
q->channelClosed();
opened = false;
}
void ChannelPrivate::openOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Channel);
qDebug(">> OpenOK");
opened = true;
stateChanged(csOpened);
q->channelOpened();
}
void ChannelPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
Q_UNUSED(prefetchSize)
Q_UNUSED(prefetchCount)
qDebug() << Q_FUNC_INFO << "temporarily disabled";
// client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, number, false);
}
void ChannelPrivate::_q_disconnected()
{
nextChannelNumber = 0;
opened = false;
}
//////////////////////////////////////////////////////////////////////////
Channel::Channel(int channelNumber, Client *parent)
: QObject(parent),
Channel::Channel(int channelNumber, Client *client)
: QObject(client),
d_ptr(new ChannelPrivate(this))
{
Q_D(Channel);
d->init(channelNumber, parent);
d->init(channelNumber, client);
}
Channel::Channel(ChannelPrivate *dd, Client *parent)
@ -63,24 +259,6 @@ void Channel::setName(const QString &name)
d->name = name;
}
void Channel::stateChanged(int state)
{
switch(ChannelPrivate::State(state)) {
case ChannelPrivate::csOpened:
emit opened();
break;
case ChannelPrivate::csClosed:
emit closed();
break;
case ChannelPrivate::csIdle:
emit flowChanged(false);
break;
case ChannelPrivate::csRunning:
emit flowChanged(true);
break;
}
}
void Channel::_q_method(const Frame::Method &frame)
{
Q_D(Channel);
@ -93,199 +271,10 @@ bool Channel::isOpened() const
return d->opened;
}
void Channel::onOpen()
{
}
void Channel::onClose()
{
}
void Channel::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
Q_D(Channel);
d->setQOS(prefetchSize, prefetchCount);
}
//////////////////////////////////////////////////////////////////////////
int ChannelPrivate::nextChannelNumber_ = 0;
ChannelPrivate::ChannelPrivate(Channel *q)
: number(0),
opened(false),
needOpen(true),
q_ptr(q)
{
}
ChannelPrivate::~ChannelPrivate()
{
}
void ChannelPrivate::init(int channelNumber, Client *parent)
{
Q_Q(Channel);
needOpen = channelNumber == -1 ? true : false;
number = channelNumber == -1 ? ++nextChannelNumber_ : channelNumber;
nextChannelNumber_ = qMax(channelNumber, (nextChannelNumber_ + 1));
q->setParent(parent);
client_ = parent;
}
bool ChannelPrivate::_q_method(const Frame::Method &frame)
{
Q_ASSERT(frame.channel() == number);
if (frame.channel() != number)
return true;
if (frame.methodClass() != Frame::fcChannel)
return false;
qDebug("Channel#%d:", number);
switch (frame.id()) {
case miOpenOk:
openOk(frame);
break;
case miFlow:
flow(frame);
break;
case miFlowOk:
flowOk(frame);
break;
case miClose:
close(frame);
break;
case miCloseOk:
closeOk(frame);
break;
}
return true;
}
void ChannelPrivate::_q_open()
{
open();
}
void ChannelPrivate::sendFrame(const Frame::Base &frame)
{
if (client_)
client_->d_func()->network_->sendFrame(frame);
}
void ChannelPrivate::open()
{
if (!needOpen || opened)
return;
if (!client_->d_func()->connection_->isConnected())
return;
qDebug("Open channel #%d", number);
Frame::Method frame(Frame::fcChannel, miOpen);
frame.setChannel(number);
QByteArray arguments_;
arguments_.resize(1);
arguments_[0] = 0;
frame.setArguments(arguments_);
sendFrame(frame);
}
void ChannelPrivate::flow()
{
}
void ChannelPrivate::flow(const Frame::Method &frame)
{
Q_UNUSED(frame);
}
void ChannelPrivate::flowOk()
{
}
void ChannelPrivate::flowOk(const Frame::Method &frame)
{
Q_UNUSED(frame);
}
void ChannelPrivate::close(int code, const QString &text, int classId, int methodId)
{
Frame::Method frame(Frame::fcChannel, miClose);
QByteArray arguments_;
QDataStream stream(&arguments_, QIODevice::WriteOnly);
Frame::writeField('s',stream, client_->virtualHost());
stream << qint16(code);
Frame::writeField('s', stream, text);
stream << qint16(classId);
stream << qint16(methodId);
frame.setArguments(arguments_);
client_->d_func()->network_->sendFrame(frame);
}
void ChannelPrivate::close(const Frame::Method &frame)
{
Q_Q(Channel);
q->stateChanged(csClosed);
qDebug(">> CLOSE");
QByteArray data = frame.arguments();
QDataStream stream(&data, QIODevice::ReadOnly);
qint16 code_ = 0, classId, methodId;
stream >> code_;
QString text(Frame::readField('s', stream).toString());
stream >> classId;
stream >> methodId;
qDebug(">> code: %d", code_);
qDebug(">> text: %s", qPrintable(text));
qDebug(">> class-id: %d", classId);
qDebug(">> method-id: %d", methodId);
}
void ChannelPrivate::closeOk()
{
Frame::Method frame(Frame::fcChannel, miCloseOk);
sendFrame(frame);
}
void ChannelPrivate::closeOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Channel);
q->stateChanged(csClosed);
q->onClose();
opened = false;
}
void ChannelPrivate::openOk(const Frame::Method &frame)
{
Q_UNUSED(frame)
Q_Q(Channel);
qDebug(">> OpenOK");
opened = true;
q->stateChanged(csOpened);
q->onOpen();
}
void ChannelPrivate::setQOS(qint32 prefetchSize, quint16 prefetchCount)
{
Q_UNUSED(prefetchSize)
Q_UNUSED(prefetchCount)
qDebug() << Q_FUNC_INFO << "temporarily disabled";
// client_->d_func()->connection_->d_func()->setQOS(prefetchSize, prefetchCount, number, false);
}
void ChannelPrivate::_q_disconnected()
{
nextChannelNumber_ = 0;
opened = false;
}
#include "moc_amqp_channel.cpp"

View File

@ -35,8 +35,12 @@ Q_SIGNALS:
void flowChanged(bool enabled);
protected:
Channel(int channelNumber = -1, Client *parent = 0);
Channel(ChannelPrivate *dd, Client *parent = 0);
virtual void channelOpened() = 0;
virtual void channelClosed() = 0;
protected:
explicit Channel(int channelNumber = -1, Client *client = 0);
explicit Channel(ChannelPrivate *dd, Client *client);
Q_DISABLE_COPY(Channel)
Q_DECLARE_PRIVATE(Channel)
@ -45,10 +49,7 @@ protected:
Q_PRIVATE_SLOT(d_func(), void _q_open())
Q_PRIVATE_SLOT(d_func(), void _q_disconnected())
// should move to private classes
virtual void onOpen();
virtual void onClose();
void stateChanged(int state);
// method handling
void _q_method(const Frame::Method &frame);
friend class ClientPrivate;

View File

@ -8,6 +8,7 @@
namespace QAMQP
{
class Client;
class Network;
class ClientPrivate;
class ChannelPrivate
{
@ -43,7 +44,8 @@ public:
ChannelPrivate(Channel *q);
virtual ~ChannelPrivate();
void init(int channelNumber, Client *parent);
void init(int channelNumber, Client *client);
void stateChanged(State state);
void open();
void flow();
@ -66,12 +68,10 @@ public:
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
void sendFrame(const Frame::Base &frame);
QPointer<Client> client_;
QPointer<Client> client;
QString name;
int number;
static int nextChannelNumber_;
static int nextChannelNumber;
bool opened;
bool needOpen;

View File

@ -20,14 +20,14 @@ Exchange::~Exchange()
remove();
}
void Exchange::onOpen()
void Exchange::channelOpened()
{
Q_D(Exchange);
if (d->delayedDeclare)
declare(Exchange::Direct);
}
void Exchange::onClose()
void Exchange::channelClosed()
{
remove(true, true);
}

View File

@ -67,8 +67,8 @@ Q_SIGNALS:
void removed();
protected:
void onOpen();
void onClose();
virtual void channelOpened();
virtual void channelClosed();
private:
explicit Exchange(int channelNumber = -1, Client *parent = 0);

View File

@ -21,7 +21,7 @@ Queue::~Queue()
remove();
}
void Queue::onOpen()
void Queue::channelOpened()
{
Q_D(Queue);
if (d->delayedDeclare)
@ -35,7 +35,7 @@ void Queue::onOpen()
}
}
void Queue::onClose()
void Queue::channelClosed()
{
remove(true, true);
}

View File

@ -76,8 +76,9 @@ Q_SIGNALS:
void empty();
protected:
void onOpen();
void onClose();
// reimp Channel
virtual void channelOpened();
virtual void channelClosed();
private:
Queue(int channelNumber = -1, Client *parent = 0);