diff --git a/src/qamqpchannel.cpp b/src/qamqpchannel.cpp index f20b1ae..5cd5254 100644 --- a/src/qamqpchannel.cpp +++ b/src/qamqpchannel.cpp @@ -203,9 +203,17 @@ void QAmqpChannelPrivate::close(const QAmqpMethodFrame &frame) QAmqpMethodFrame closeOkFrame(QAmqpFrame::Channel, miCloseOk); closeOkFrame.setChannel(channelNumber); sendFrame(closeOkFrame); + + // notify everyone that the channel was closed on us. + notifyClosed(); } void QAmqpChannelPrivate::closeOk(const QAmqpMethodFrame &) +{ + notifyClosed(); +} + +void QAmqpChannelPrivate::notifyClosed() { Q_Q(QAmqpChannel); Q_EMIT q->closed(); diff --git a/src/qamqpchannel_p.h b/src/qamqpchannel_p.h index 7551950..a84e19b 100644 --- a/src/qamqpchannel_p.h +++ b/src/qamqpchannel_p.h @@ -44,6 +44,7 @@ public: void flow(bool active); void flowOk(); void close(int code, const QString &text, int classId, int methodId); + void notifyClosed(); // reimp MethodHandler virtual bool _q_method(const QAmqpMethodFrame &frame); diff --git a/src/qamqpchannelhash.cpp b/src/qamqpchannelhash.cpp new file mode 100644 index 0000000..1bc93ab --- /dev/null +++ b/src/qamqpchannelhash.cpp @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2012-2014 Alexey Shcherbakov + * Copyright (C) 2014-2015 Matt Broadstone + * Contact: https://github.com/mbroadst/qamqp + * + * This file is part of the QAMQP Library. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + */ +#include "qamqpqueue.h" +#include "qamqpexchange.h" +#include "qamqpchannelhash_p.h" + +/*! +* Retrieve a pointer to the named channel. +* +* A NULL string is assumed to be equivalent to "" for the purpose +* of retrieving the nameless (default) exchange. +* +* \param[in] name The name of the channel to retrieve. +* \retval NULL Channel does not exist. +*/ +QAmqpChannel* QAmqpChannelHash::get(const QString& name) const +{ + if (name.isEmpty()) + return channels.value(QString()); + return channels.value(name); +} + + +/*! +* Return true if the named channel exists. +*/ +bool QAmqpChannelHash::contains(const QString& name) const +{ + if (name.isEmpty()) + return channels.contains(QString()); + return channels.contains(name); +} + +/*! +* Store an exchange in the hash. The nameless exchange is stored under +* the name "". +*/ +void QAmqpChannelHash::put(QAmqpExchange* exchange) +{ + if (exchange->name().isEmpty()) + put(QString(), exchange); + else + put(exchange->name(), exchange); +} + +/*! +* Store a queue in the hash. If the queue is nameless, we hook its +* declared signal and store it when the queue receives a name from the +* broker, otherwise we store it under the name given. +*/ +void QAmqpChannelHash::put(QAmqpQueue* queue) +{ + if (queue->name().isEmpty()) + connect(queue, SIGNAL(declared()), this, SLOT(queueDeclared())); + else + put(queue->name(), queue); +} + +/*! +* Handle destruction of a channel. Do a full garbage collection run. +*/ +void QAmqpChannelHash::channelDestroyed(QObject* object) +{ + QList names(channels.keys()); + QList::iterator it; + for (it = names.begin(); it != names.end(); it++) { + if (channels.value(*it) == object) + channels.remove(*it); + } +} + +/*! +* Handle a queue that has just been declared and given a new name. The +* caller is assumed to be a QAmqpQueue instance. +*/ +void QAmqpChannelHash::queueDeclared() +{ + QAmqpQueue *queue = qobject_cast(sender()); + if (queue) + put(queue); +} + +/*! +* Store a channel in the hash. The channel is assumed +* to be named at the time of storage. This hooks the 'destroyed' signal +* so the channel can be removed from our list. +*/ +void QAmqpChannelHash::put(const QString& name, QAmqpChannel* channel) +{ + connect(channel, SIGNAL(destroyed(QObject*)), this, SLOT(channelDestroyed(QObject*))); + channels[name] = channel; +} + +#include "moc_qamqpchannelhash_p.cpp" +/* vim: set ts=4 sw=4 et */ diff --git a/src/qamqpchannelhash_p.h b/src/qamqpchannelhash_p.h new file mode 100644 index 0000000..29d3486 --- /dev/null +++ b/src/qamqpchannelhash_p.h @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2012-2014 Alexey Shcherbakov + * Copyright (C) 2014-2015 Matt Broadstone + * Contact: https://github.com/mbroadst/qamqp + * + * This file is part of the QAMQP Library. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + */ +#ifndef QAMQPCHANNELHASH_P_H +#define QAMQPCHANNELHASH_P_H + +#include +#include +#include + +/* Forward declarations */ +class QAmqpChannel; +class QAmqpQueue; +class QAmqpExchange; + +/*! + * QAmqpChannelHash is a container for storing queues and exchanges for later + * retrieval. When the objects are destroyed, they are automatically removed + * from the container. + */ +class QAmqpChannelHash : public QObject +{ + Q_OBJECT +public: + /*! + * Retrieve a pointer to the named channel. + * + * A NULL string is assumed to be equivalent to "" for the purpose + * of retrieving the nameless (default) exchange. + * + * \param[in] name The name of the channel to retrieve. + * \retval NULL Channel does not exist. + */ + QAmqpChannel* get(const QString& name) const; + + /*! + * Return true if the named channel exists. + */ + bool contains(const QString& name) const; + + /*! + * Store an exchange in the hash. The nameless exchange is stored under + * the name "". + */ + void put(QAmqpExchange* exchange); + + /*! + * Store a queue in the hash. If the queue is nameless, we hook its + * declared signal and store it when the queue receives a name from the + * broker, otherwise we store it under the name given. + */ + void put(QAmqpQueue* queue); + +private Q_SLOTS: + /*! + * Handle destruction of a channel. Do a full garbage collection run. + */ + void channelDestroyed(QObject* object); + + /*! + * Handle a queue that has just been declared and given a new name. The + * caller is assumed to be a QAmqpQueue instance. + */ + void queueDeclared(); + +private: + /*! + * Store a channel in the hash. This hooks the 'destroyed' signal + * so the channel can be removed from our list. + */ + void put(const QString& name, QAmqpChannel* channel); + + /*! A collection of channels. Key is the channel's "name". */ + QHash channels; +}; + +/* vim: set ts=4 sw=4 et */ +#endif diff --git a/src/qamqpclient.cpp b/src/qamqpclient.cpp index 716e7cf..d429c86 100644 --- a/src/qamqpclient.cpp +++ b/src/qamqpclient.cpp @@ -660,7 +660,14 @@ QAmqpExchange *QAmqpClient::createExchange(int channelNumber) QAmqpExchange *QAmqpClient::createExchange(const QString &name, int channelNumber) { Q_D(QAmqpClient); - QAmqpExchange *exchange = new QAmqpExchange(channelNumber, this); + QAmqpExchange *exchange; + if (d->exchanges.contains(name)) { + exchange = qobject_cast(d->exchanges.get(name)); + if (exchange) + return exchange; + } + + exchange = new QAmqpExchange(channelNumber, this); d->methodHandlersByChannel[exchange->channelNumber()].append(exchange->d_func()); connect(this, SIGNAL(connected()), exchange, SLOT(_q_open())); connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected())); @@ -668,6 +675,7 @@ QAmqpExchange *QAmqpClient::createExchange(const QString &name, int channelNumbe if (!name.isEmpty()) exchange->setName(name); + d->exchanges.put(exchange); return exchange; } @@ -679,7 +687,14 @@ QAmqpQueue *QAmqpClient::createQueue(int channelNumber) QAmqpQueue *QAmqpClient::createQueue(const QString &name, int channelNumber) { Q_D(QAmqpClient); - QAmqpQueue *queue = new QAmqpQueue(channelNumber, this); + QAmqpQueue *queue; + if (d->queues.contains(name)) { + queue = qobject_cast(d->queues.get(name)); + if (queue) + return queue; + } + + queue = new QAmqpQueue(channelNumber, this); d->methodHandlersByChannel[queue->channelNumber()].append(queue->d_func()); d->contentHandlerByChannel[queue->channelNumber()].append(queue->d_func()); d->bodyHandlersByChannel[queue->channelNumber()].append(queue->d_func()); @@ -689,6 +704,7 @@ QAmqpQueue *QAmqpClient::createQueue(const QString &name, int channelNumber) if (!name.isEmpty()) queue->setName(name); + d->queues.put(queue); return queue; } diff --git a/src/qamqpclient_p.h b/src/qamqpclient_p.h index 9856019..cd00742 100644 --- a/src/qamqpclient_p.h +++ b/src/qamqpclient_p.h @@ -7,6 +7,7 @@ #include #include +#include "qamqpchannelhash_p.h" #include "qamqpglobal.h" #include "qamqpauthenticator.h" #include "qamqptable.h" @@ -100,6 +101,12 @@ public: QAMQP::Error error; QString errorString; + /*! Exchange objects */ + QAmqpChannelHash exchanges; + + /*! Named queue objects */ + QAmqpChannelHash queues; + QAmqpClient * const q_ptr; Q_DECLARE_PUBLIC(QAmqpClient) diff --git a/src/src.pro b/src/src.pro index c0fc609..2a5cdd1 100644 --- a/src/src.pro +++ b/src/src.pro @@ -33,6 +33,7 @@ greaterThan(NEED_GCOV_SUPPORT, 0) { PRIVATE_HEADERS += \ qamqpchannel_p.h \ + qamqpchannelhash_p.h \ qamqpclient_p.h \ qamqpexchange_p.h \ qamqpframe_p.h \ @@ -56,6 +57,7 @@ HEADERS += \ SOURCES += \ qamqpauthenticator.cpp \ qamqpchannel.cpp \ + qamqpchannelhash.cpp \ qamqpclient.cpp \ qamqpexchange.cpp \ qamqpframe.cpp \ diff --git a/tests/auto/qamqpexchange/tst_qamqpexchange.cpp b/tests/auto/qamqpexchange/tst_qamqpexchange.cpp index 38011a9..e9420b8 100644 --- a/tests/auto/qamqpexchange/tst_qamqpexchange.cpp +++ b/tests/auto/qamqpexchange/tst_qamqpexchange.cpp @@ -129,6 +129,12 @@ void tst_QAMQPExchange::invalidRedeclaration() // this is for rabbitmq: QCOMPARE(redeclared->error(), QAMQP::PreconditionFailedError); + // Server has probably closed the channel on us, if so, re-open it. + if (!exchange->isOpen()) { + exchange->reopen(); + QVERIFY(waitForSignal(exchange, SIGNAL(opened()))); + } + // cleanup exchange->remove(); QVERIFY(waitForSignal(exchange, SIGNAL(removed()))); @@ -209,6 +215,7 @@ void tst_QAMQPExchange::cleanupOnDeletion() QVERIFY(waitForSignal(exchange, SIGNAL(declared()))); exchange->close(); exchange->deleteLater(); + QVERIFY(waitForSignal(exchange, SIGNAL(destroyed()))); // now create, declare, and close the right way exchange = client->createExchange("test-deletion"); diff --git a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp index 428b9f6..2f1b422 100644 --- a/tests/auto/qamqpqueue/tst_qamqpqueue.cpp +++ b/tests/auto/qamqpqueue/tst_qamqpqueue.cpp @@ -683,6 +683,7 @@ void tst_QAMQPQueue::cleanupOnDeletion() QVERIFY(waitForSignal(queue, SIGNAL(declared()))); queue->close(); queue->deleteLater(); + QVERIFY(waitForSignal(queue, SIGNAL(destroyed()))); // now create, declare, and close the right way queue = client->createQueue("test-deletion");