diff --git a/src/qamqpchannel.cpp b/src/qamqpchannel.cpp index 5cd5254..4e10cbb 100644 --- a/src/qamqpchannel.cpp +++ b/src/qamqpchannel.cpp @@ -348,4 +348,11 @@ void QAmqpChannel::resume() d->flow(true); } +void QAmqpChannel::resetInternalState() +{ + Q_D(QAmqpChannel); + d->opened = false; + d->needOpen = true; +} + #include "moc_qamqpchannel.cpp" diff --git a/src/qamqpchannel.h b/src/qamqpchannel.h index 2e29b37..f706b31 100644 --- a/src/qamqpchannel.h +++ b/src/qamqpchannel.h @@ -63,6 +63,7 @@ Q_SIGNALS: protected: virtual void channelOpened() = 0; virtual void channelClosed() = 0; + virtual void resetInternalState(); protected: explicit QAmqpChannel(QAmqpChannelPrivate *dd, QAmqpClient *client); diff --git a/src/qamqpchannelhash.cpp b/src/qamqpchannelhash.cpp index 1bc93ab..58d7a1a 100644 --- a/src/qamqpchannelhash.cpp +++ b/src/qamqpchannelhash.cpp @@ -31,10 +31,14 @@ QAmqpChannel* QAmqpChannelHash::get(const QString& name) const { if (name.isEmpty()) - return channels.value(QString()); - return channels.value(name); + return m_channels.value(QString()); + return m_channels.value(name); } +QStringList QAmqpChannelHash::channels() const +{ + return m_channels.keys(); +} /*! * Return true if the named channel exists. @@ -42,8 +46,8 @@ QAmqpChannel* QAmqpChannelHash::get(const QString& name) const bool QAmqpChannelHash::contains(const QString& name) const { if (name.isEmpty()) - return channels.contains(QString()); - return channels.contains(name); + return m_channels.contains(QString()); + return m_channels.contains(name); } /*! @@ -76,11 +80,11 @@ void QAmqpChannelHash::put(QAmqpQueue* queue) */ void QAmqpChannelHash::channelDestroyed(QObject* object) { - QList names(channels.keys()); + QList names(m_channels.keys()); QList::iterator it; for (it = names.begin(); it != names.end(); it++) { - if (channels.value(*it) == object) - channels.remove(*it); + if (m_channels.value(*it) == object) + m_channels.remove(*it); } } @@ -103,7 +107,7 @@ void QAmqpChannelHash::queueDeclared() void QAmqpChannelHash::put(const QString& name, QAmqpChannel* channel) { connect(channel, SIGNAL(destroyed(QObject*)), this, SLOT(channelDestroyed(QObject*))); - channels[name] = channel; + m_channels[name] = channel; } #include "moc_qamqpchannelhash_p.cpp" diff --git a/src/qamqpchannelhash_p.h b/src/qamqpchannelhash_p.h index 29d3486..452f821 100644 --- a/src/qamqpchannelhash_p.h +++ b/src/qamqpchannelhash_p.h @@ -52,6 +52,11 @@ public: */ bool contains(const QString& name) const; + /** + * Returns a list of channels tracked by this hash + */ + QStringList channels() const; + /*! * Store an exchange in the hash. The nameless exchange is stored under * the name "". @@ -85,7 +90,7 @@ private: void put(const QString& name, QAmqpChannel* channel); /*! A collection of channels. Key is the channel's "name". */ - QHash channels; + QHash m_channels; }; /* vim: set ts=4 sw=4 et */ diff --git a/src/qamqpclient.cpp b/src/qamqpclient.cpp index 5806917..ae2f311 100644 --- a/src/qamqpclient.cpp +++ b/src/qamqpclient.cpp @@ -55,7 +55,6 @@ void QAmqpClientPrivate::initSocket() socket = new QSslSocket(q); socket->setSocketOption(QAbstractSocket::LowDelayOption, 1); socket->setSocketOption(QAbstractSocket::KeepAliveOption, 1); - QObject::connect(socket, SIGNAL(connected()), q, SLOT(_q_socketConnected())); QObject::connect(socket, SIGNAL(disconnected()), q, SLOT(_q_socketDisconnected())); QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(_q_readyRead())); @@ -69,6 +68,23 @@ void QAmqpClientPrivate::initSocket() q, SIGNAL(sslErrors(QList))); } +void QAmqpClientPrivate::resetChannelState() +{ + foreach (QString exchangeName, exchanges.channels()) { + QAmqpExchange *exchange = + qobject_cast(exchanges.get(exchangeName)); + if (exchange) exchange->resetInternalState(); + else qDebug() << "INVALID EXCHANGE: " << exchangeName; + } + + foreach (QString queueName, queues.channels()) { + QAmqpQueue *queue = + qobject_cast(queues.get(queueName)); + if (queue) queue->resetInternalState(); + else qDebug() << "INVALID QUEUE: " << queueName; + } +} + void QAmqpClientPrivate::setUsername(const QString &username) { QAmqpAuthenticator *auth = authenticator.data(); @@ -156,6 +172,7 @@ void QAmqpClientPrivate::_q_socketDisconnected() { Q_Q(QAmqpClient); buffer.clear(); + resetChannelState(); if (connected) { connected = false; Q_EMIT q->disconnected(); diff --git a/src/qamqpclient_p.h b/src/qamqpclient_p.h index cd00742..37856d6 100644 --- a/src/qamqpclient_p.h +++ b/src/qamqpclient_p.h @@ -38,6 +38,7 @@ public: virtual void init(); virtual void initSocket(); + void resetChannelState(); void setUsername(const QString &username); void setPassword(const QString &password); void parseConnectionString(const QString &uri); diff --git a/src/qamqpexchange.cpp b/src/qamqpexchange.cpp index a18c71a..59c5a22 100644 --- a/src/qamqpexchange.cpp +++ b/src/qamqpexchange.cpp @@ -351,3 +351,13 @@ bool QAmqpExchange::waitForConfirms(int msecs) return (d->unconfirmedDeliveryTags.isEmpty()); } + +void QAmqpExchange::resetInternalState() +{ + Q_D(QAmqpExchange); + QAmqpChannel::resetInternalState(); + + d->delayedDeclare = false; + d->declared = false; + d->nextDeliveryTag = 0; +} diff --git a/src/qamqpexchange.h b/src/qamqpexchange.h index 92606a9..6de3846 100644 --- a/src/qamqpexchange.h +++ b/src/qamqpexchange.h @@ -106,6 +106,7 @@ public Q_SLOTS: protected: virtual void channelOpened(); virtual void channelClosed(); + virtual void resetInternalState(); private: explicit QAmqpExchange(int channelNumber = -1, QAmqpClient *parent = 0); @@ -113,6 +114,7 @@ private: Q_DISABLE_COPY(QAmqpExchange) Q_DECLARE_PRIVATE(QAmqpExchange) friend class QAmqpClient; + friend class QAmqpClientPrivate; }; diff --git a/src/qamqpqueue.cpp b/src/qamqpqueue.cpp index 0eadc41..3e4e724 100644 --- a/src/qamqpqueue.cpp +++ b/src/qamqpqueue.cpp @@ -603,4 +603,16 @@ bool QAmqpQueue::cancel(bool noWait) return true; } +void QAmqpQueue::resetInternalState() +{ + Q_D(QAmqpQueue); + QAmqpChannel::resetInternalState(); + + d->delayedDeclare = false; + d->declared = false; + d->recievingMessage = false; + d->consuming = false; + d->consumeRequested = false; +} + #include "moc_qamqpqueue.cpp" diff --git a/src/qamqpqueue.h b/src/qamqpqueue.h index 14efc66..cbcdbc6 100644 --- a/src/qamqpqueue.h +++ b/src/qamqpqueue.h @@ -109,14 +109,16 @@ protected: // reimp Channel virtual void channelOpened(); virtual void channelClosed(); + virtual void resetInternalState(); private: explicit QAmqpQueue(int channelNumber = -1, QAmqpClient *parent = 0); Q_DISABLE_COPY(QAmqpQueue) Q_DECLARE_PRIVATE(QAmqpQueue) - friend class QAmqpClient; + friend class QAmqpClientPrivate; + }; #endif // QAMQPQUEUE_H