From 0c96cfca52c255f7ec4d3069153e66fc67e60a09 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Thu, 19 Nov 2015 17:43:31 -0500 Subject: [PATCH] refactor(QAmqpChannel): reset channel state on disconnect We now track channels in QAmqpChannelHash for use across reconnects however, the internal state of these channels was not being reset when the connection was reestablished. This provides a way to do that for not only channels, but both of its subclasses --- src/qamqpchannel.cpp | 7 +++++++ src/qamqpchannel.h | 1 + src/qamqpchannelhash.cpp | 20 ++++++++++++-------- src/qamqpchannelhash_p.h | 7 ++++++- src/qamqpclient.cpp | 19 ++++++++++++++++++- src/qamqpclient_p.h | 1 + src/qamqpexchange.cpp | 10 ++++++++++ src/qamqpexchange.h | 2 ++ src/qamqpqueue.cpp | 12 ++++++++++++ src/qamqpqueue.h | 4 +++- 10 files changed, 72 insertions(+), 11 deletions(-) diff --git a/src/qamqpchannel.cpp b/src/qamqpchannel.cpp index 5cd5254..4e10cbb 100644 --- a/src/qamqpchannel.cpp +++ b/src/qamqpchannel.cpp @@ -348,4 +348,11 @@ void QAmqpChannel::resume() d->flow(true); } +void QAmqpChannel::resetInternalState() +{ + Q_D(QAmqpChannel); + d->opened = false; + d->needOpen = true; +} + #include "moc_qamqpchannel.cpp" diff --git a/src/qamqpchannel.h b/src/qamqpchannel.h index 2e29b37..f706b31 100644 --- a/src/qamqpchannel.h +++ b/src/qamqpchannel.h @@ -63,6 +63,7 @@ Q_SIGNALS: protected: virtual void channelOpened() = 0; virtual void channelClosed() = 0; + virtual void resetInternalState(); protected: explicit QAmqpChannel(QAmqpChannelPrivate *dd, QAmqpClient *client); diff --git a/src/qamqpchannelhash.cpp b/src/qamqpchannelhash.cpp index 1bc93ab..58d7a1a 100644 --- a/src/qamqpchannelhash.cpp +++ b/src/qamqpchannelhash.cpp @@ -31,10 +31,14 @@ QAmqpChannel* QAmqpChannelHash::get(const QString& name) const { if (name.isEmpty()) - return channels.value(QString()); - return channels.value(name); + return m_channels.value(QString()); + return m_channels.value(name); } +QStringList QAmqpChannelHash::channels() const +{ + return m_channels.keys(); +} /*! * Return true if the named channel exists. @@ -42,8 +46,8 @@ QAmqpChannel* QAmqpChannelHash::get(const QString& name) const bool QAmqpChannelHash::contains(const QString& name) const { if (name.isEmpty()) - return channels.contains(QString()); - return channels.contains(name); + return m_channels.contains(QString()); + return m_channels.contains(name); } /*! @@ -76,11 +80,11 @@ void QAmqpChannelHash::put(QAmqpQueue* queue) */ void QAmqpChannelHash::channelDestroyed(QObject* object) { - QList names(channels.keys()); + QList names(m_channels.keys()); QList::iterator it; for (it = names.begin(); it != names.end(); it++) { - if (channels.value(*it) == object) - channels.remove(*it); + if (m_channels.value(*it) == object) + m_channels.remove(*it); } } @@ -103,7 +107,7 @@ void QAmqpChannelHash::queueDeclared() void QAmqpChannelHash::put(const QString& name, QAmqpChannel* channel) { connect(channel, SIGNAL(destroyed(QObject*)), this, SLOT(channelDestroyed(QObject*))); - channels[name] = channel; + m_channels[name] = channel; } #include "moc_qamqpchannelhash_p.cpp" diff --git a/src/qamqpchannelhash_p.h b/src/qamqpchannelhash_p.h index 29d3486..452f821 100644 --- a/src/qamqpchannelhash_p.h +++ b/src/qamqpchannelhash_p.h @@ -52,6 +52,11 @@ public: */ bool contains(const QString& name) const; + /** + * Returns a list of channels tracked by this hash + */ + QStringList channels() const; + /*! * Store an exchange in the hash. The nameless exchange is stored under * the name "". @@ -85,7 +90,7 @@ private: void put(const QString& name, QAmqpChannel* channel); /*! A collection of channels. Key is the channel's "name". */ - QHash channels; + QHash m_channels; }; /* vim: set ts=4 sw=4 et */ diff --git a/src/qamqpclient.cpp b/src/qamqpclient.cpp index 5806917..ae2f311 100644 --- a/src/qamqpclient.cpp +++ b/src/qamqpclient.cpp @@ -55,7 +55,6 @@ void QAmqpClientPrivate::initSocket() socket = new QSslSocket(q); socket->setSocketOption(QAbstractSocket::LowDelayOption, 1); socket->setSocketOption(QAbstractSocket::KeepAliveOption, 1); - QObject::connect(socket, SIGNAL(connected()), q, SLOT(_q_socketConnected())); QObject::connect(socket, SIGNAL(disconnected()), q, SLOT(_q_socketDisconnected())); QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(_q_readyRead())); @@ -69,6 +68,23 @@ void QAmqpClientPrivate::initSocket() q, SIGNAL(sslErrors(QList))); } +void QAmqpClientPrivate::resetChannelState() +{ + foreach (QString exchangeName, exchanges.channels()) { + QAmqpExchange *exchange = + qobject_cast(exchanges.get(exchangeName)); + if (exchange) exchange->resetInternalState(); + else qDebug() << "INVALID EXCHANGE: " << exchangeName; + } + + foreach (QString queueName, queues.channels()) { + QAmqpQueue *queue = + qobject_cast(queues.get(queueName)); + if (queue) queue->resetInternalState(); + else qDebug() << "INVALID QUEUE: " << queueName; + } +} + void QAmqpClientPrivate::setUsername(const QString &username) { QAmqpAuthenticator *auth = authenticator.data(); @@ -156,6 +172,7 @@ void QAmqpClientPrivate::_q_socketDisconnected() { Q_Q(QAmqpClient); buffer.clear(); + resetChannelState(); if (connected) { connected = false; Q_EMIT q->disconnected(); diff --git a/src/qamqpclient_p.h b/src/qamqpclient_p.h index cd00742..37856d6 100644 --- a/src/qamqpclient_p.h +++ b/src/qamqpclient_p.h @@ -38,6 +38,7 @@ public: virtual void init(); virtual void initSocket(); + void resetChannelState(); void setUsername(const QString &username); void setPassword(const QString &password); void parseConnectionString(const QString &uri); diff --git a/src/qamqpexchange.cpp b/src/qamqpexchange.cpp index a18c71a..59c5a22 100644 --- a/src/qamqpexchange.cpp +++ b/src/qamqpexchange.cpp @@ -351,3 +351,13 @@ bool QAmqpExchange::waitForConfirms(int msecs) return (d->unconfirmedDeliveryTags.isEmpty()); } + +void QAmqpExchange::resetInternalState() +{ + Q_D(QAmqpExchange); + QAmqpChannel::resetInternalState(); + + d->delayedDeclare = false; + d->declared = false; + d->nextDeliveryTag = 0; +} diff --git a/src/qamqpexchange.h b/src/qamqpexchange.h index 92606a9..6de3846 100644 --- a/src/qamqpexchange.h +++ b/src/qamqpexchange.h @@ -106,6 +106,7 @@ public Q_SLOTS: protected: virtual void channelOpened(); virtual void channelClosed(); + virtual void resetInternalState(); private: explicit QAmqpExchange(int channelNumber = -1, QAmqpClient *parent = 0); @@ -113,6 +114,7 @@ private: Q_DISABLE_COPY(QAmqpExchange) Q_DECLARE_PRIVATE(QAmqpExchange) friend class QAmqpClient; + friend class QAmqpClientPrivate; }; diff --git a/src/qamqpqueue.cpp b/src/qamqpqueue.cpp index 0eadc41..3e4e724 100644 --- a/src/qamqpqueue.cpp +++ b/src/qamqpqueue.cpp @@ -603,4 +603,16 @@ bool QAmqpQueue::cancel(bool noWait) return true; } +void QAmqpQueue::resetInternalState() +{ + Q_D(QAmqpQueue); + QAmqpChannel::resetInternalState(); + + d->delayedDeclare = false; + d->declared = false; + d->recievingMessage = false; + d->consuming = false; + d->consumeRequested = false; +} + #include "moc_qamqpqueue.cpp" diff --git a/src/qamqpqueue.h b/src/qamqpqueue.h index 14efc66..cbcdbc6 100644 --- a/src/qamqpqueue.h +++ b/src/qamqpqueue.h @@ -109,14 +109,16 @@ protected: // reimp Channel virtual void channelOpened(); virtual void channelClosed(); + virtual void resetInternalState(); private: explicit QAmqpQueue(int channelNumber = -1, QAmqpClient *parent = 0); Q_DISABLE_COPY(QAmqpQueue) Q_DECLARE_PRIVATE(QAmqpQueue) - friend class QAmqpClient; + friend class QAmqpClientPrivate; + }; #endif // QAMQPQUEUE_H