Merge pull request #27 from vrtsystems/state-machine-client

Add storage of Queues and Exchanges to QAmqpClient.
This commit is contained in:
Matt Broadstone 2015-06-08 09:27:11 -04:00
commit 73d80de53a
9 changed files with 246 additions and 2 deletions

View File

@ -203,9 +203,17 @@ void QAmqpChannelPrivate::close(const QAmqpMethodFrame &frame)
QAmqpMethodFrame closeOkFrame(QAmqpFrame::Channel, miCloseOk);
closeOkFrame.setChannel(channelNumber);
sendFrame(closeOkFrame);
// notify everyone that the channel was closed on us.
notifyClosed();
}
void QAmqpChannelPrivate::closeOk(const QAmqpMethodFrame &)
{
notifyClosed();
}
void QAmqpChannelPrivate::notifyClosed()
{
Q_Q(QAmqpChannel);
Q_EMIT q->closed();

View File

@ -44,6 +44,7 @@ public:
void flow(bool active);
void flowOk();
void close(int code, const QString &text, int classId, int methodId);
void notifyClosed();
// reimp MethodHandler
virtual bool _q_method(const QAmqpMethodFrame &frame);

110
src/qamqpchannelhash.cpp Normal file
View File

@ -0,0 +1,110 @@
/*
* Copyright (C) 2012-2014 Alexey Shcherbakov
* Copyright (C) 2014-2015 Matt Broadstone
* Contact: https://github.com/mbroadst/qamqp
*
* This file is part of the QAMQP Library.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*/
#include "qamqpqueue.h"
#include "qamqpexchange.h"
#include "qamqpchannelhash_p.h"
/*!
* Retrieve a pointer to the named channel.
*
* A NULL string is assumed to be equivalent to "" for the purpose
* of retrieving the nameless (default) exchange.
*
* \param[in] name The name of the channel to retrieve.
* \retval NULL Channel does not exist.
*/
QAmqpChannel* QAmqpChannelHash::get(const QString& name) const
{
if (name.isEmpty())
return channels.value(QString());
return channels.value(name);
}
/*!
* Return true if the named channel exists.
*/
bool QAmqpChannelHash::contains(const QString& name) const
{
if (name.isEmpty())
return channels.contains(QString());
return channels.contains(name);
}
/*!
* Store an exchange in the hash. The nameless exchange is stored under
* the name "".
*/
void QAmqpChannelHash::put(QAmqpExchange* exchange)
{
if (exchange->name().isEmpty())
put(QString(), exchange);
else
put(exchange->name(), exchange);
}
/*!
* Store a queue in the hash. If the queue is nameless, we hook its
* declared signal and store it when the queue receives a name from the
* broker, otherwise we store it under the name given.
*/
void QAmqpChannelHash::put(QAmqpQueue* queue)
{
if (queue->name().isEmpty())
connect(queue, SIGNAL(declared()), this, SLOT(queueDeclared()));
else
put(queue->name(), queue);
}
/*!
* Handle destruction of a channel. Do a full garbage collection run.
*/
void QAmqpChannelHash::channelDestroyed(QObject* object)
{
QList<QString> names(channels.keys());
QList<QString>::iterator it;
for (it = names.begin(); it != names.end(); it++) {
if (channels.value(*it) == object)
channels.remove(*it);
}
}
/*!
* Handle a queue that has just been declared and given a new name. The
* caller is assumed to be a QAmqpQueue instance.
*/
void QAmqpChannelHash::queueDeclared()
{
QAmqpQueue *queue = qobject_cast<QAmqpQueue*>(sender());
if (queue)
put(queue);
}
/*!
* Store a channel in the hash. The channel is assumed
* to be named at the time of storage. This hooks the 'destroyed' signal
* so the channel can be removed from our list.
*/
void QAmqpChannelHash::put(const QString& name, QAmqpChannel* channel)
{
connect(channel, SIGNAL(destroyed(QObject*)), this, SLOT(channelDestroyed(QObject*)));
channels[name] = channel;
}
#include "moc_qamqpchannelhash_p.cpp"
/* vim: set ts=4 sw=4 et */

92
src/qamqpchannelhash_p.h Normal file
View File

@ -0,0 +1,92 @@
/*
* Copyright (C) 2012-2014 Alexey Shcherbakov
* Copyright (C) 2014-2015 Matt Broadstone
* Contact: https://github.com/mbroadst/qamqp
*
* This file is part of the QAMQP Library.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*/
#ifndef QAMQPCHANNELHASH_P_H
#define QAMQPCHANNELHASH_P_H
#include <QHash>
#include <QString>
#include <QObject>
/* Forward declarations */
class QAmqpChannel;
class QAmqpQueue;
class QAmqpExchange;
/*!
* QAmqpChannelHash is a container for storing queues and exchanges for later
* retrieval. When the objects are destroyed, they are automatically removed
* from the container.
*/
class QAmqpChannelHash : public QObject
{
Q_OBJECT
public:
/*!
* Retrieve a pointer to the named channel.
*
* A NULL string is assumed to be equivalent to "" for the purpose
* of retrieving the nameless (default) exchange.
*
* \param[in] name The name of the channel to retrieve.
* \retval NULL Channel does not exist.
*/
QAmqpChannel* get(const QString& name) const;
/*!
* Return true if the named channel exists.
*/
bool contains(const QString& name) const;
/*!
* Store an exchange in the hash. The nameless exchange is stored under
* the name "".
*/
void put(QAmqpExchange* exchange);
/*!
* Store a queue in the hash. If the queue is nameless, we hook its
* declared signal and store it when the queue receives a name from the
* broker, otherwise we store it under the name given.
*/
void put(QAmqpQueue* queue);
private Q_SLOTS:
/*!
* Handle destruction of a channel. Do a full garbage collection run.
*/
void channelDestroyed(QObject* object);
/*!
* Handle a queue that has just been declared and given a new name. The
* caller is assumed to be a QAmqpQueue instance.
*/
void queueDeclared();
private:
/*!
* Store a channel in the hash. This hooks the 'destroyed' signal
* so the channel can be removed from our list.
*/
void put(const QString& name, QAmqpChannel* channel);
/*! A collection of channels. Key is the channel's "name". */
QHash<QString, QAmqpChannel*> channels;
};
/* vim: set ts=4 sw=4 et */
#endif

View File

@ -660,7 +660,14 @@ QAmqpExchange *QAmqpClient::createExchange(int channelNumber)
QAmqpExchange *QAmqpClient::createExchange(const QString &name, int channelNumber)
{
Q_D(QAmqpClient);
QAmqpExchange *exchange = new QAmqpExchange(channelNumber, this);
QAmqpExchange *exchange;
if (d->exchanges.contains(name)) {
exchange = qobject_cast<QAmqpExchange*>(d->exchanges.get(name));
if (exchange)
return exchange;
}
exchange = new QAmqpExchange(channelNumber, this);
d->methodHandlersByChannel[exchange->channelNumber()].append(exchange->d_func());
connect(this, SIGNAL(connected()), exchange, SLOT(_q_open()));
connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected()));
@ -668,6 +675,7 @@ QAmqpExchange *QAmqpClient::createExchange(const QString &name, int channelNumbe
if (!name.isEmpty())
exchange->setName(name);
d->exchanges.put(exchange);
return exchange;
}
@ -679,7 +687,14 @@ QAmqpQueue *QAmqpClient::createQueue(int channelNumber)
QAmqpQueue *QAmqpClient::createQueue(const QString &name, int channelNumber)
{
Q_D(QAmqpClient);
QAmqpQueue *queue = new QAmqpQueue(channelNumber, this);
QAmqpQueue *queue;
if (d->queues.contains(name)) {
queue = qobject_cast<QAmqpQueue*>(d->queues.get(name));
if (queue)
return queue;
}
queue = new QAmqpQueue(channelNumber, this);
d->methodHandlersByChannel[queue->channelNumber()].append(queue->d_func());
d->contentHandlerByChannel[queue->channelNumber()].append(queue->d_func());
d->bodyHandlersByChannel[queue->channelNumber()].append(queue->d_func());
@ -689,6 +704,7 @@ QAmqpQueue *QAmqpClient::createQueue(const QString &name, int channelNumber)
if (!name.isEmpty())
queue->setName(name);
d->queues.put(queue);
return queue;
}

View File

@ -7,6 +7,7 @@
#include <QAbstractSocket>
#include <QSslError>
#include "qamqpchannelhash_p.h"
#include "qamqpglobal.h"
#include "qamqpauthenticator.h"
#include "qamqptable.h"
@ -100,6 +101,12 @@ public:
QAMQP::Error error;
QString errorString;
/*! Exchange objects */
QAmqpChannelHash exchanges;
/*! Named queue objects */
QAmqpChannelHash queues;
QAmqpClient * const q_ptr;
Q_DECLARE_PUBLIC(QAmqpClient)

View File

@ -33,6 +33,7 @@ greaterThan(NEED_GCOV_SUPPORT, 0) {
PRIVATE_HEADERS += \
qamqpchannel_p.h \
qamqpchannelhash_p.h \
qamqpclient_p.h \
qamqpexchange_p.h \
qamqpframe_p.h \
@ -56,6 +57,7 @@ HEADERS += \
SOURCES += \
qamqpauthenticator.cpp \
qamqpchannel.cpp \
qamqpchannelhash.cpp \
qamqpclient.cpp \
qamqpexchange.cpp \
qamqpframe.cpp \

View File

@ -129,6 +129,12 @@ void tst_QAMQPExchange::invalidRedeclaration()
// this is for rabbitmq:
QCOMPARE(redeclared->error(), QAMQP::PreconditionFailedError);
// Server has probably closed the channel on us, if so, re-open it.
if (!exchange->isOpen()) {
exchange->reopen();
QVERIFY(waitForSignal(exchange, SIGNAL(opened())));
}
// cleanup
exchange->remove();
QVERIFY(waitForSignal(exchange, SIGNAL(removed())));
@ -209,6 +215,7 @@ void tst_QAMQPExchange::cleanupOnDeletion()
QVERIFY(waitForSignal(exchange, SIGNAL(declared())));
exchange->close();
exchange->deleteLater();
QVERIFY(waitForSignal(exchange, SIGNAL(destroyed())));
// now create, declare, and close the right way
exchange = client->createExchange("test-deletion");

View File

@ -683,6 +683,7 @@ void tst_QAMQPQueue::cleanupOnDeletion()
QVERIFY(waitForSignal(queue, SIGNAL(declared())));
queue->close();
queue->deleteLater();
QVERIFY(waitForSignal(queue, SIGNAL(destroyed())));
// now create, declare, and close the right way
queue = client->createQueue("test-deletion");