merge more code from private functions to Queue class
This commit is contained in:
parent
1c3bd9fdf1
commit
598b10bf73
|
|
@ -186,7 +186,26 @@ bool Queue::hasMessage() const
|
||||||
void Queue::consume(ConsumeOptions options)
|
void Queue::consume(ConsumeOptions options)
|
||||||
{
|
{
|
||||||
Q_D(Queue);
|
Q_D(Queue);
|
||||||
d->consume(options);
|
if (!d->opened) {
|
||||||
|
qDebug() << Q_FUNC_INFO << "queue is not open";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Frame::Method frame(Frame::fcBasic, QueuePrivate::bmConsume);
|
||||||
|
frame.setChannel(d->number);
|
||||||
|
|
||||||
|
QByteArray arguments;
|
||||||
|
QDataStream out(&arguments, QIODevice::WriteOnly);
|
||||||
|
|
||||||
|
out << qint16(0); //reserver 1
|
||||||
|
Frame::writeField('s', out, d->name);
|
||||||
|
Frame::writeField('s', out, d->consumerTag);
|
||||||
|
|
||||||
|
out << qint8(options); // no-wait
|
||||||
|
Frame::writeField('F', out, Frame::TableField());
|
||||||
|
|
||||||
|
frame.setArguments(arguments);
|
||||||
|
d->sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Queue::setConsumerTag(const QString &consumerTag)
|
void Queue::setConsumerTag(const QString &consumerTag)
|
||||||
|
|
@ -204,18 +223,49 @@ QString Queue::consumerTag() const
|
||||||
void Queue::get()
|
void Queue::get()
|
||||||
{
|
{
|
||||||
Q_D(Queue);
|
Q_D(Queue);
|
||||||
d->get();
|
if (!d->opened) {
|
||||||
|
qDebug() << Q_FUNC_INFO << "queue is not open";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Frame::Method frame(Frame::fcBasic, QueuePrivate::bmGet);
|
||||||
|
frame.setChannel(d->number);
|
||||||
|
|
||||||
|
QByteArray arguments;
|
||||||
|
QDataStream out(&arguments, QIODevice::WriteOnly);
|
||||||
|
|
||||||
|
out << qint16(0); //reserver 1
|
||||||
|
Frame::writeField('s', out, d->name);
|
||||||
|
|
||||||
|
out << qint8(d->noAck ? 1 : 0); // noAck
|
||||||
|
|
||||||
|
frame.setArguments(arguments);
|
||||||
|
d->sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Queue::ack(const MessagePtr &message)
|
void Queue::ack(const MessagePtr &message)
|
||||||
{
|
{
|
||||||
Q_D(Queue);
|
Q_D(Queue);
|
||||||
d->ack(message);
|
if (!d->opened) {
|
||||||
|
qDebug() << Q_FUNC_INFO << "queue is not open";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Frame::Method frame(Frame::fcBasic, QueuePrivate::bmAck);
|
||||||
|
frame.setChannel(d->number);
|
||||||
|
|
||||||
|
QByteArray arguments;
|
||||||
|
QDataStream out(&arguments, QIODevice::WriteOnly);
|
||||||
|
|
||||||
|
out << message->deliveryTag; //reserver 1
|
||||||
|
out << qint8(0); // noAck
|
||||||
|
|
||||||
|
frame.setArguments(arguments);
|
||||||
|
d->sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
QueuePrivate::QueuePrivate(Queue *q)
|
QueuePrivate::QueuePrivate(Queue *q)
|
||||||
: ChannelPrivate(q),
|
: ChannelPrivate(q),
|
||||||
delayedDeclare(false),
|
delayedDeclare(false),
|
||||||
|
|
@ -418,23 +468,6 @@ void QueuePrivate::unbind(const QString &exchangeName, const QString &key)
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::get()
|
|
||||||
{
|
|
||||||
if (!opened)
|
|
||||||
return;
|
|
||||||
|
|
||||||
Frame::Method frame(Frame::fcBasic, bmGet);
|
|
||||||
frame.setChannel(number);
|
|
||||||
QByteArray arguments_;
|
|
||||||
QDataStream out(&arguments_, QIODevice::WriteOnly);
|
|
||||||
out << qint16(0); //reserver 1
|
|
||||||
Frame::writeField('s', out, name);
|
|
||||||
out << qint8(noAck ? 1 : 0); // noAck
|
|
||||||
|
|
||||||
frame.setArguments(arguments_);
|
|
||||||
sendFrame(frame);
|
|
||||||
}
|
|
||||||
|
|
||||||
void QueuePrivate::getOk(const Frame::Method &frame)
|
void QueuePrivate::getOk(const Frame::Method &frame)
|
||||||
{
|
{
|
||||||
QByteArray data = frame.arguments();
|
QByteArray data = frame.arguments();
|
||||||
|
|
@ -454,41 +487,6 @@ void QueuePrivate::getOk(const Frame::Method &frame)
|
||||||
messages_.enqueue(newMessage);
|
messages_.enqueue(newMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueuePrivate::ack(const MessagePtr &Message)
|
|
||||||
{
|
|
||||||
if (!opened)
|
|
||||||
return;
|
|
||||||
|
|
||||||
Frame::Method frame(Frame::fcBasic, bmAck);
|
|
||||||
frame.setChannel(number);
|
|
||||||
QByteArray arguments_;
|
|
||||||
QDataStream out(&arguments_, QIODevice::WriteOnly);
|
|
||||||
out << Message->deliveryTag; //reserver 1
|
|
||||||
out << qint8(0); // noAck
|
|
||||||
|
|
||||||
frame.setArguments(arguments_);
|
|
||||||
sendFrame(frame);
|
|
||||||
}
|
|
||||||
|
|
||||||
void QueuePrivate::consume(Queue::ConsumeOptions options)
|
|
||||||
{
|
|
||||||
if (!opened)
|
|
||||||
return;
|
|
||||||
|
|
||||||
Frame::Method frame(Frame::fcBasic, bmConsume);
|
|
||||||
frame.setChannel(number);
|
|
||||||
QByteArray arguments_;
|
|
||||||
QDataStream out(&arguments_, QIODevice::WriteOnly);
|
|
||||||
out << qint16(0); //reserver 1
|
|
||||||
Frame::writeField('s', out, name);
|
|
||||||
Frame::writeField('s', out, consumerTag);
|
|
||||||
out << qint8(options); // no-wait
|
|
||||||
Frame::writeField('F', out, Frame::TableField());
|
|
||||||
|
|
||||||
frame.setArguments(arguments_);
|
|
||||||
sendFrame(frame);
|
|
||||||
}
|
|
||||||
|
|
||||||
void QueuePrivate::consumeOk(const Frame::Method &frame)
|
void QueuePrivate::consumeOk(const Frame::Method &frame)
|
||||||
{
|
{
|
||||||
qDebug() << "Consume ok: " << name;
|
qDebug() << "Consume ok: " << name;
|
||||||
|
|
|
||||||
|
|
@ -37,18 +37,16 @@ public:
|
||||||
/* CLASS BASIC METHODS */
|
/* CLASS BASIC METHODS */
|
||||||
/************************************************************************/
|
/************************************************************************/
|
||||||
|
|
||||||
void consume(Queue::ConsumeOptions options);
|
|
||||||
void consumeOk(const Frame::Method &frame);
|
void consumeOk(const Frame::Method &frame);
|
||||||
void deliver(const Frame::Method &frame);
|
void deliver(const Frame::Method &frame);
|
||||||
|
|
||||||
void get();
|
void get();
|
||||||
void getOk(const Frame::Method &frame);
|
void getOk(const Frame::Method &frame);
|
||||||
void ack(const MessagePtr &Message);
|
|
||||||
|
|
||||||
QString type;
|
QString type;
|
||||||
Queue::QueueOptions options;
|
Queue::QueueOptions options;
|
||||||
|
|
||||||
bool _q_method(const Frame::Method &frame);
|
virtual bool _q_method(const Frame::Method &frame);
|
||||||
|
|
||||||
bool delayedDeclare;
|
bool delayedDeclare;
|
||||||
bool declared;
|
bool declared;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue