2014-05-29 00:25:28 +08:00
|
|
|
#include "amqp_channel.h"
|
|
|
|
|
#include "amqp_channel_p.h"
|
2014-05-29 01:05:51 +08:00
|
|
|
#include "amqp_client.h"
|
|
|
|
|
#include "amqp_client_p.h"
|
2014-05-29 00:25:28 +08:00
|
|
|
|
|
|
|
|
#include <QDebug>
|
|
|
|
|
#include <QDataStream>
|
|
|
|
|
|
|
|
|
|
using namespace QAMQP;
|
|
|
|
|
|
2014-06-04 01:00:25 +08:00
|
|
|
int ChannelPrivate::nextChannelNumber = 0;
|
|
|
|
|
ChannelPrivate::ChannelPrivate(Channel *q)
|
2014-06-04 21:50:31 +08:00
|
|
|
: channelNumber(0),
|
2014-06-04 01:00:25 +08:00
|
|
|
opened(false),
|
|
|
|
|
needOpen(true),
|
2014-06-26 22:28:58 +08:00
|
|
|
prefetchSize(0),
|
|
|
|
|
requestedPrefetchSize(0),
|
|
|
|
|
prefetchCount(0),
|
|
|
|
|
requestedPrefetchCount(0),
|
2014-06-12 01:44:30 +08:00
|
|
|
error(QAMQP::NoError),
|
2014-06-04 01:00:25 +08:00
|
|
|
q_ptr(q)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2014-06-04 01:00:25 +08:00
|
|
|
ChannelPrivate::~ChannelPrivate()
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2014-06-04 21:50:31 +08:00
|
|
|
void ChannelPrivate::init(int channel, Client *c)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
2014-06-04 01:00:25 +08:00
|
|
|
client = c;
|
2014-06-04 21:50:31 +08:00
|
|
|
needOpen = channel == -1 ? true : false;
|
|
|
|
|
channelNumber = channel == -1 ? ++nextChannelNumber : channel;
|
2014-06-04 01:00:25 +08:00
|
|
|
nextChannelNumber = qMax(channelNumber, (nextChannelNumber + 1));
|
2014-05-29 00:25:28 +08:00
|
|
|
}
|
|
|
|
|
|
2014-06-04 01:00:25 +08:00
|
|
|
void ChannelPrivate::stateChanged(State state)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
2014-06-04 01:00:25 +08:00
|
|
|
Q_Q(Channel);
|
2014-05-29 00:25:28 +08:00
|
|
|
switch(ChannelPrivate::State(state)) {
|
|
|
|
|
case ChannelPrivate::csOpened:
|
2014-06-04 01:00:25 +08:00
|
|
|
Q_EMIT q->opened();
|
2014-05-29 00:25:28 +08:00
|
|
|
break;
|
|
|
|
|
case ChannelPrivate::csClosed:
|
2014-06-04 01:00:25 +08:00
|
|
|
Q_EMIT q->closed();
|
2014-05-29 00:25:28 +08:00
|
|
|
break;
|
|
|
|
|
case ChannelPrivate::csIdle:
|
2014-06-04 01:00:25 +08:00
|
|
|
Q_EMIT q->flowChanged(false);
|
2014-05-29 00:25:28 +08:00
|
|
|
break;
|
|
|
|
|
case ChannelPrivate::csRunning:
|
2014-06-04 01:00:25 +08:00
|
|
|
Q_EMIT q->flowChanged(true);
|
2014-05-29 00:25:28 +08:00
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-29 01:52:27 +08:00
|
|
|
bool ChannelPrivate::_q_method(const Frame::Method &frame)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
2014-06-04 21:50:31 +08:00
|
|
|
Q_ASSERT(frame.channel() == channelNumber);
|
|
|
|
|
if (frame.channel() != channelNumber)
|
2014-05-29 00:25:28 +08:00
|
|
|
return true;
|
|
|
|
|
|
2014-06-26 22:28:58 +08:00
|
|
|
if (frame.methodClass() == Frame::fcBasic) {
|
|
|
|
|
if (frame.id() == bmQosOk) {
|
|
|
|
|
qosOk(frame);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-29 01:52:27 +08:00
|
|
|
if (frame.methodClass() != Frame::fcChannel)
|
2014-05-29 00:25:28 +08:00
|
|
|
return false;
|
|
|
|
|
|
2014-06-07 01:46:08 +08:00
|
|
|
qAmqpDebug("Channel#%d:", channelNumber);
|
2014-05-29 00:25:28 +08:00
|
|
|
|
|
|
|
|
switch (frame.id()) {
|
|
|
|
|
case miOpenOk:
|
|
|
|
|
openOk(frame);
|
|
|
|
|
break;
|
|
|
|
|
case miFlow:
|
|
|
|
|
flow(frame);
|
|
|
|
|
break;
|
|
|
|
|
case miFlowOk:
|
|
|
|
|
flowOk(frame);
|
|
|
|
|
break;
|
|
|
|
|
case miClose:
|
|
|
|
|
close(frame);
|
|
|
|
|
break;
|
|
|
|
|
case miCloseOk:
|
|
|
|
|
closeOk(frame);
|
|
|
|
|
break;
|
|
|
|
|
}
|
2014-06-04 21:46:15 +08:00
|
|
|
|
2014-05-29 00:25:28 +08:00
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ChannelPrivate::_q_open()
|
|
|
|
|
{
|
|
|
|
|
open();
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-29 01:52:27 +08:00
|
|
|
void ChannelPrivate::sendFrame(const Frame::Base &frame)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
2014-06-26 22:28:58 +08:00
|
|
|
if (!client) {
|
|
|
|
|
qAmqpDebug() << Q_FUNC_INFO << "invalid client";
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
client->d_func()->sendFrame(frame);
|
2014-05-29 00:25:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ChannelPrivate::open()
|
|
|
|
|
{
|
|
|
|
|
if (!needOpen || opened)
|
|
|
|
|
return;
|
|
|
|
|
|
2014-06-04 04:11:30 +08:00
|
|
|
if (!client->isConnected())
|
2014-05-29 00:25:28 +08:00
|
|
|
return;
|
|
|
|
|
|
2014-06-07 01:46:08 +08:00
|
|
|
qAmqpDebug("Open channel #%d", channelNumber);
|
2014-05-29 01:52:27 +08:00
|
|
|
Frame::Method frame(Frame::fcChannel, miOpen);
|
2014-06-04 21:50:31 +08:00
|
|
|
frame.setChannel(channelNumber);
|
2014-06-04 01:00:25 +08:00
|
|
|
|
|
|
|
|
QByteArray arguments;
|
|
|
|
|
arguments.resize(1);
|
|
|
|
|
arguments[0] = 0;
|
|
|
|
|
|
|
|
|
|
frame.setArguments(arguments);
|
2014-05-29 00:25:28 +08:00
|
|
|
sendFrame(frame);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ChannelPrivate::flow()
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-29 01:52:27 +08:00
|
|
|
void ChannelPrivate::flow(const Frame::Method &frame)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
|
|
|
|
Q_UNUSED(frame);
|
2014-06-07 01:46:08 +08:00
|
|
|
qAmqpDebug() << Q_FUNC_INFO;
|
2014-05-29 00:25:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ChannelPrivate::flowOk()
|
|
|
|
|
{
|
2014-06-07 01:46:08 +08:00
|
|
|
qAmqpDebug() << Q_FUNC_INFO;
|
2014-05-29 00:25:28 +08:00
|
|
|
}
|
|
|
|
|
|
2014-05-29 01:52:27 +08:00
|
|
|
void ChannelPrivate::flowOk(const Frame::Method &frame)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
|
|
|
|
Q_UNUSED(frame);
|
2014-06-07 01:46:08 +08:00
|
|
|
qAmqpDebug() << Q_FUNC_INFO;
|
2014-05-29 00:25:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ChannelPrivate::close(int code, const QString &text, int classId, int methodId)
|
|
|
|
|
{
|
2014-06-04 01:00:25 +08:00
|
|
|
QByteArray arguments;
|
|
|
|
|
QDataStream stream(&arguments, QIODevice::WriteOnly);
|
2014-05-29 00:25:28 +08:00
|
|
|
|
2014-08-07 10:30:42 +08:00
|
|
|
if (!code) code = 200;
|
|
|
|
|
Frame::writeAmqpField(stream, ShortUint, code);
|
|
|
|
|
if (!text.isEmpty()) {
|
|
|
|
|
Frame::writeAmqpField(stream, ShortString, text);
|
|
|
|
|
} else {
|
|
|
|
|
Frame::writeAmqpField(stream, ShortString, QLatin1String("OK"));
|
|
|
|
|
}
|
2014-05-29 00:25:28 +08:00
|
|
|
|
2014-08-07 10:30:42 +08:00
|
|
|
Frame::writeAmqpField(stream, ShortUint, classId);
|
|
|
|
|
Frame::writeAmqpField(stream, ShortUint, methodId);
|
2014-05-29 00:25:28 +08:00
|
|
|
|
2014-08-07 10:30:42 +08:00
|
|
|
Frame::Method frame(Frame::fcChannel, miClose);
|
|
|
|
|
frame.setChannel(channelNumber);
|
2014-06-04 01:00:25 +08:00
|
|
|
frame.setArguments(arguments);
|
|
|
|
|
sendFrame(frame);
|
2014-05-29 00:25:28 +08:00
|
|
|
}
|
|
|
|
|
|
2014-05-29 01:52:27 +08:00
|
|
|
void ChannelPrivate::close(const Frame::Method &frame)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
2014-06-07 00:10:51 +08:00
|
|
|
Q_Q(Channel);
|
2014-06-07 01:46:08 +08:00
|
|
|
qAmqpDebug(">> CLOSE");
|
2014-06-04 01:00:25 +08:00
|
|
|
stateChanged(csClosed);
|
2014-05-29 00:25:28 +08:00
|
|
|
QByteArray data = frame.arguments();
|
|
|
|
|
QDataStream stream(&data, QIODevice::ReadOnly);
|
2014-06-04 01:00:25 +08:00
|
|
|
qint16 code = 0, classId, methodId;
|
|
|
|
|
stream >> code;
|
2014-08-04 04:39:31 +08:00
|
|
|
QString text = Frame::readAmqpField(stream, ShortString).toString();
|
|
|
|
|
|
2014-05-29 00:25:28 +08:00
|
|
|
stream >> classId;
|
|
|
|
|
stream >> methodId;
|
|
|
|
|
|
2014-06-12 01:44:30 +08:00
|
|
|
Error checkError = static_cast<Error>(code);
|
|
|
|
|
if (checkError != QAMQP::NoError) {
|
2014-06-07 00:10:51 +08:00
|
|
|
error = checkError;
|
|
|
|
|
errorString = qPrintable(text);
|
|
|
|
|
Q_EMIT q->error(error);
|
|
|
|
|
}
|
|
|
|
|
|
2014-06-07 01:46:08 +08:00
|
|
|
qAmqpDebug(">> code: %d", code);
|
|
|
|
|
qAmqpDebug(">> text: %s", qPrintable(text));
|
|
|
|
|
qAmqpDebug(">> class-id: %d", classId);
|
|
|
|
|
qAmqpDebug(">> method-id: %d", methodId);
|
2014-05-29 00:25:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ChannelPrivate::closeOk()
|
|
|
|
|
{
|
2014-05-29 01:52:27 +08:00
|
|
|
Frame::Method frame(Frame::fcChannel, miCloseOk);
|
2014-05-29 00:25:28 +08:00
|
|
|
sendFrame(frame);
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-29 01:52:27 +08:00
|
|
|
void ChannelPrivate::closeOk(const Frame::Method &frame)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
|
|
|
|
Q_UNUSED(frame)
|
2014-05-29 01:52:27 +08:00
|
|
|
Q_Q(Channel);
|
2014-05-29 00:25:28 +08:00
|
|
|
|
2014-06-04 01:00:25 +08:00
|
|
|
stateChanged(csClosed);
|
|
|
|
|
q->channelClosed();
|
2014-05-29 00:25:28 +08:00
|
|
|
opened = false;
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-29 01:52:27 +08:00
|
|
|
void ChannelPrivate::openOk(const Frame::Method &frame)
|
2014-05-29 00:25:28 +08:00
|
|
|
{
|
|
|
|
|
Q_UNUSED(frame)
|
2014-05-29 01:52:27 +08:00
|
|
|
Q_Q(Channel);
|
2014-05-29 00:25:28 +08:00
|
|
|
|
2014-06-07 01:46:08 +08:00
|
|
|
qAmqpDebug(">> OpenOK");
|
2014-05-29 00:25:28 +08:00
|
|
|
opened = true;
|
2014-06-04 01:00:25 +08:00
|
|
|
stateChanged(csOpened);
|
|
|
|
|
q->channelOpened();
|
2014-05-29 00:25:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ChannelPrivate::_q_disconnected()
|
|
|
|
|
{
|
2014-06-04 01:00:25 +08:00
|
|
|
nextChannelNumber = 0;
|
2014-05-29 00:25:28 +08:00
|
|
|
opened = false;
|
|
|
|
|
}
|
|
|
|
|
|
2014-06-26 22:28:58 +08:00
|
|
|
void ChannelPrivate::qosOk(const Frame::Method &frame)
|
|
|
|
|
{
|
|
|
|
|
Q_Q(Channel);
|
|
|
|
|
Q_UNUSED(frame)
|
|
|
|
|
|
|
|
|
|
prefetchCount = requestedPrefetchCount;
|
|
|
|
|
prefetchSize = requestedPrefetchSize;
|
|
|
|
|
Q_EMIT q->qosDefined();
|
|
|
|
|
}
|
|
|
|
|
|
2014-06-04 01:00:25 +08:00
|
|
|
//////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
|
|
|
|
Channel::Channel(int channelNumber, Client *client)
|
|
|
|
|
: QObject(client),
|
|
|
|
|
d_ptr(new ChannelPrivate(this))
|
|
|
|
|
{
|
|
|
|
|
Q_D(Channel);
|
|
|
|
|
d->init(channelNumber, client);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Channel::Channel(ChannelPrivate *dd, Client *parent)
|
|
|
|
|
: QObject(parent),
|
|
|
|
|
d_ptr(dd)
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Channel::~Channel()
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Channel::closeChannel()
|
|
|
|
|
{
|
|
|
|
|
Q_D(Channel);
|
|
|
|
|
d->needOpen = true;
|
|
|
|
|
if (d->opened)
|
|
|
|
|
d->close(0, QString(), 0,0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Channel::reopen()
|
|
|
|
|
{
|
|
|
|
|
Q_D(Channel);
|
2014-08-07 10:30:42 +08:00
|
|
|
if (d->opened)
|
|
|
|
|
closeChannel();
|
2014-06-04 01:00:25 +08:00
|
|
|
d->open();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QString Channel::name() const
|
|
|
|
|
{
|
|
|
|
|
Q_D(const Channel);
|
|
|
|
|
return d->name;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int Channel::channelNumber() const
|
|
|
|
|
{
|
|
|
|
|
Q_D(const Channel);
|
2014-06-04 21:50:31 +08:00
|
|
|
return d->channelNumber;
|
2014-06-04 01:00:25 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Channel::setName(const QString &name)
|
|
|
|
|
{
|
|
|
|
|
Q_D(Channel);
|
|
|
|
|
d->name = name;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool Channel::isOpened() const
|
|
|
|
|
{
|
|
|
|
|
Q_D(const Channel);
|
|
|
|
|
return d->opened;
|
|
|
|
|
}
|
|
|
|
|
|
2014-06-26 22:28:58 +08:00
|
|
|
void Channel::qos(qint16 prefetchCount, qint32 prefetchSize)
|
2014-06-04 01:00:25 +08:00
|
|
|
{
|
|
|
|
|
Q_D(Channel);
|
2014-06-26 22:28:58 +08:00
|
|
|
Frame::Method frame(Frame::fcBasic, ChannelPrivate::bmQos);
|
|
|
|
|
frame.setChannel(d->channelNumber);
|
|
|
|
|
|
|
|
|
|
QByteArray arguments;
|
|
|
|
|
QDataStream stream(&arguments, QIODevice::WriteOnly);
|
|
|
|
|
|
|
|
|
|
d->requestedPrefetchSize = prefetchSize;
|
|
|
|
|
d->requestedPrefetchCount = prefetchCount;
|
|
|
|
|
|
|
|
|
|
stream << qint32(prefetchSize);
|
|
|
|
|
stream << qint16(prefetchCount);
|
|
|
|
|
stream << qint8(0x0); // global
|
|
|
|
|
|
|
|
|
|
frame.setArguments(arguments);
|
|
|
|
|
d->sendFrame(frame);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qint32 Channel::prefetchSize() const
|
|
|
|
|
{
|
|
|
|
|
Q_D(const Channel);
|
|
|
|
|
return d->prefetchSize;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qint16 Channel::prefetchCount() const
|
|
|
|
|
{
|
|
|
|
|
Q_D(const Channel);
|
|
|
|
|
return d->prefetchCount;
|
2014-06-04 01:00:25 +08:00
|
|
|
}
|
|
|
|
|
|
2014-06-12 01:44:30 +08:00
|
|
|
Error Channel::error() const
|
2014-06-07 00:10:51 +08:00
|
|
|
{
|
|
|
|
|
Q_D(const Channel);
|
|
|
|
|
return d->error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QString Channel::errorString() const
|
|
|
|
|
{
|
|
|
|
|
Q_D(const Channel);
|
|
|
|
|
return d->errorString;
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-29 00:25:28 +08:00
|
|
|
#include "moc_amqp_channel.cpp"
|