refactor(QAmqpChannel): reset channel state on disconnect

We now track channels in QAmqpChannelHash for use across reconnects
however, the internal state of these channels was not being reset
when the connection was reestablished. This provides a way to do
that for not only channels, but both of its subclasses
This commit is contained in:
Matt Broadstone 2015-11-19 17:43:31 -05:00
parent c025333204
commit 0c96cfca52
10 changed files with 72 additions and 11 deletions

View File

@ -348,4 +348,11 @@ void QAmqpChannel::resume()
d->flow(true);
}
void QAmqpChannel::resetInternalState()
{
Q_D(QAmqpChannel);
d->opened = false;
d->needOpen = true;
}
#include "moc_qamqpchannel.cpp"

View File

@ -63,6 +63,7 @@ Q_SIGNALS:
protected:
virtual void channelOpened() = 0;
virtual void channelClosed() = 0;
virtual void resetInternalState();
protected:
explicit QAmqpChannel(QAmqpChannelPrivate *dd, QAmqpClient *client);

View File

@ -31,10 +31,14 @@
QAmqpChannel* QAmqpChannelHash::get(const QString& name) const
{
if (name.isEmpty())
return channels.value(QString());
return channels.value(name);
return m_channels.value(QString());
return m_channels.value(name);
}
QStringList QAmqpChannelHash::channels() const
{
return m_channels.keys();
}
/*!
* Return true if the named channel exists.
@ -42,8 +46,8 @@ QAmqpChannel* QAmqpChannelHash::get(const QString& name) const
bool QAmqpChannelHash::contains(const QString& name) const
{
if (name.isEmpty())
return channels.contains(QString());
return channels.contains(name);
return m_channels.contains(QString());
return m_channels.contains(name);
}
/*!
@ -76,11 +80,11 @@ void QAmqpChannelHash::put(QAmqpQueue* queue)
*/
void QAmqpChannelHash::channelDestroyed(QObject* object)
{
QList<QString> names(channels.keys());
QList<QString> names(m_channels.keys());
QList<QString>::iterator it;
for (it = names.begin(); it != names.end(); it++) {
if (channels.value(*it) == object)
channels.remove(*it);
if (m_channels.value(*it) == object)
m_channels.remove(*it);
}
}
@ -103,7 +107,7 @@ void QAmqpChannelHash::queueDeclared()
void QAmqpChannelHash::put(const QString& name, QAmqpChannel* channel)
{
connect(channel, SIGNAL(destroyed(QObject*)), this, SLOT(channelDestroyed(QObject*)));
channels[name] = channel;
m_channels[name] = channel;
}
#include "moc_qamqpchannelhash_p.cpp"

View File

@ -52,6 +52,11 @@ public:
*/
bool contains(const QString& name) const;
/**
* Returns a list of channels tracked by this hash
*/
QStringList channels() const;
/*!
* Store an exchange in the hash. The nameless exchange is stored under
* the name "".
@ -85,7 +90,7 @@ private:
void put(const QString& name, QAmqpChannel* channel);
/*! A collection of channels. Key is the channel's "name". */
QHash<QString, QAmqpChannel*> channels;
QHash<QString, QAmqpChannel*> m_channels;
};
/* vim: set ts=4 sw=4 et */

View File

@ -55,7 +55,6 @@ void QAmqpClientPrivate::initSocket()
socket = new QSslSocket(q);
socket->setSocketOption(QAbstractSocket::LowDelayOption, 1);
socket->setSocketOption(QAbstractSocket::KeepAliveOption, 1);
QObject::connect(socket, SIGNAL(connected()), q, SLOT(_q_socketConnected()));
QObject::connect(socket, SIGNAL(disconnected()), q, SLOT(_q_socketDisconnected()));
QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(_q_readyRead()));
@ -69,6 +68,23 @@ void QAmqpClientPrivate::initSocket()
q, SIGNAL(sslErrors(QList<QSslError>)));
}
void QAmqpClientPrivate::resetChannelState()
{
foreach (QString exchangeName, exchanges.channels()) {
QAmqpExchange *exchange =
qobject_cast<QAmqpExchange*>(exchanges.get(exchangeName));
if (exchange) exchange->resetInternalState();
else qDebug() << "INVALID EXCHANGE: " << exchangeName;
}
foreach (QString queueName, queues.channels()) {
QAmqpQueue *queue =
qobject_cast<QAmqpQueue*>(queues.get(queueName));
if (queue) queue->resetInternalState();
else qDebug() << "INVALID QUEUE: " << queueName;
}
}
void QAmqpClientPrivate::setUsername(const QString &username)
{
QAmqpAuthenticator *auth = authenticator.data();
@ -156,6 +172,7 @@ void QAmqpClientPrivate::_q_socketDisconnected()
{
Q_Q(QAmqpClient);
buffer.clear();
resetChannelState();
if (connected) {
connected = false;
Q_EMIT q->disconnected();

View File

@ -38,6 +38,7 @@ public:
virtual void init();
virtual void initSocket();
void resetChannelState();
void setUsername(const QString &username);
void setPassword(const QString &password);
void parseConnectionString(const QString &uri);

View File

@ -351,3 +351,13 @@ bool QAmqpExchange::waitForConfirms(int msecs)
return (d->unconfirmedDeliveryTags.isEmpty());
}
void QAmqpExchange::resetInternalState()
{
Q_D(QAmqpExchange);
QAmqpChannel::resetInternalState();
d->delayedDeclare = false;
d->declared = false;
d->nextDeliveryTag = 0;
}

View File

@ -106,6 +106,7 @@ public Q_SLOTS:
protected:
virtual void channelOpened();
virtual void channelClosed();
virtual void resetInternalState();
private:
explicit QAmqpExchange(int channelNumber = -1, QAmqpClient *parent = 0);
@ -113,6 +114,7 @@ private:
Q_DISABLE_COPY(QAmqpExchange)
Q_DECLARE_PRIVATE(QAmqpExchange)
friend class QAmqpClient;
friend class QAmqpClientPrivate;
};

View File

@ -603,4 +603,16 @@ bool QAmqpQueue::cancel(bool noWait)
return true;
}
void QAmqpQueue::resetInternalState()
{
Q_D(QAmqpQueue);
QAmqpChannel::resetInternalState();
d->delayedDeclare = false;
d->declared = false;
d->recievingMessage = false;
d->consuming = false;
d->consumeRequested = false;
}
#include "moc_qamqpqueue.cpp"

View File

@ -109,14 +109,16 @@ protected:
// reimp Channel
virtual void channelOpened();
virtual void channelClosed();
virtual void resetInternalState();
private:
explicit QAmqpQueue(int channelNumber = -1, QAmqpClient *parent = 0);
Q_DISABLE_COPY(QAmqpQueue)
Q_DECLARE_PRIVATE(QAmqpQueue)
friend class QAmqpClient;
friend class QAmqpClientPrivate;
};
#endif // QAMQPQUEUE_H