qamqp/src/amqp_client.cpp

308 lines
7.2 KiB
C++
Raw Normal View History

2014-05-29 01:05:51 +08:00
#include "amqp_client.h"
#include "amqp_client_p.h"
2014-05-29 01:52:27 +08:00
#include "amqp_global.h"
#include "amqp_exchange.h"
#include "amqp_exchange_p.h"
#include "amqp_queue.h"
#include "amqp_queue_p.h"
#include "amqp_connection_p.h"
#include "amqp_authenticator.h"
2014-05-29 01:52:27 +08:00
#include <QTextStream>
using namespace QAMQP;
ClientPrivate::ClientPrivate(Client *q)
: port(AMQPPORT),
host(QString::fromLatin1(AMQPHOST)),
virtualHost(QString::fromLatin1(AMQPVHOST)),
q_ptr(q)
{
}
ClientPrivate::~ClientPrivate()
{
}
void ClientPrivate::init(const QUrl &connectionString)
{
2014-05-29 04:28:45 +08:00
Q_Q(Client);
if (!network_) {
2014-05-29 04:28:45 +08:00
network_ = new Network(q);
QObject::connect(network_.data(), SIGNAL(connected()), q, SIGNAL(connected()));
QObject::connect(network_.data(), SIGNAL(disconnected()), q, SIGNAL(disconnected()));
}
if (!connection_)
connection_ = new Connection(network_, q);
network_->setMethodHandlerConnection(connection_);
auth_ = QSharedPointer<Authenticator>(
new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD)));
QObject::connect(connection_, SIGNAL(connected()), q, SIGNAL(connected()));
QObject::connect(connection_, SIGNAL(disconnected()), q, SIGNAL(disconnected()));
if (connectionString.isValid()) {
parseConnectionString(connectionString);
connect();
}
}
void ClientPrivate::connect()
{
if (network_->state() != QAbstractSocket::UnconnectedState)
disconnect();
network_->connectTo(host, port);
}
void ClientPrivate::parseConnectionString(const QUrl &connectionString)
{
2014-05-29 04:28:45 +08:00
Q_Q(Client);
if (connectionString.scheme() != AMQPSCHEME &&
connectionString.scheme() != AMQPSSCHEME) {
qDebug() << Q_FUNC_INFO << "invalid scheme: " << connectionString.scheme();
return;
}
q->setSsl(connectionString.scheme() == AMQPSSCHEME);
q->setPassword(connectionString.password());
q->setUser(connectionString.userName());
q->setPort(connectionString.port(AMQPPORT));
q->setHost(connectionString.host());
q->setVirtualHost(connectionString.path());
}
void ClientPrivate::disconnect()
{
if (network_->state() == QAbstractSocket::UnconnectedState) {
qDebug() << Q_FUNC_INFO << "already disconnected";
return;
}
network_->disconnect();
connection_->d_func()->connected = false;
}
//////////////////////////////////////////////////////////////////////////
2014-05-29 04:28:45 +08:00
Client::Client(QObject *parent)
: QObject(parent),
d_ptr(new ClientPrivate(this))
{
Q_D(Client);
d->init();
}
Client::Client(const QUrl &connectionString, QObject *parent)
: QObject(parent),
d_ptr(new ClientPrivate(this))
{
Q_D(Client);
d->init(connectionString);
}
2014-05-29 04:28:45 +08:00
Client::~Client()
{
}
2014-05-29 04:28:45 +08:00
quint16 Client::port() const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
return d->port;
}
2014-05-29 04:28:45 +08:00
void Client::setPort(quint16 port)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
d->port = port;
}
2014-05-29 04:28:45 +08:00
QString Client::host() const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
return d->host;
}
void Client::setHost(const QString &host)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
d->host = host;
}
2014-05-29 04:28:45 +08:00
QString Client::virtualHost() const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
return d->virtualHost;
}
2014-05-29 04:28:45 +08:00
void Client::setVirtualHost(const QString &virtualHost)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
d->virtualHost = virtualHost;
}
2014-05-29 04:28:45 +08:00
QString Client::user() const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
const Authenticator *auth = d->auth_.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
const AMQPlainAuthenticator *a = static_cast<const AMQPlainAuthenticator*>(auth);
return a->login();
}
return QString();
}
2014-05-29 04:28:45 +08:00
void Client::setUser(const QString &user)
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
Authenticator *auth = d->auth_.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
AMQPlainAuthenticator *a = static_cast<AMQPlainAuthenticator*>(auth);
a->setLogin(user);
}
}
2014-05-29 04:28:45 +08:00
QString Client::password() const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
const Authenticator *auth = d->auth_.data();
if (auth && auth->type() == "AMQPLAIN") {
const AMQPlainAuthenticator *a = static_cast<const AMQPlainAuthenticator*>(auth);
return a->password();
}
return QString();
}
2014-05-29 04:28:45 +08:00
void Client::setPassword(const QString &password)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
Authenticator *auth = d->auth_.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
AMQPlainAuthenticator *a = static_cast<AMQPlainAuthenticator*>(auth);
a->setPassword(password);
}
}
2014-05-29 04:28:45 +08:00
Exchange *Client::createExchange(int channelNumber)
{
return createExchange(QString(), channelNumber);
}
Exchange *Client::createExchange(const QString &name, int channelNumber)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
Exchange *exchange = new Exchange(channelNumber, this);
d->network_->addMethodHandlerForChannel(exchange->channelNumber(), exchange);
connect(d->connection_, SIGNAL(connected()), exchange, SLOT(_q_open()));
exchange->d_func()->open();
connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected()));
if (!name.isEmpty())
exchange->setName(name);
return exchange;
}
2014-05-29 04:28:45 +08:00
Queue *Client::createQueue(int channelNumber)
{
return createQueue(QString(), channelNumber);
}
Queue *Client::createQueue(const QString &name, int channelNumber)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
Queue *queue = new Queue(channelNumber, this);
d->network_->addMethodHandlerForChannel(queue->channelNumber(), queue);
d->network_->addContentHandlerForChannel(queue->channelNumber(), queue);
d->network_->addContentBodyHandlerForChannel(queue->channelNumber(), queue);
connect(d->connection_, SIGNAL(connected()), queue, SLOT(_q_open()));
queue->d_func()->open();
connect(this, SIGNAL(disconnected()), queue, SLOT(_q_disconnected()));
if (!name.isEmpty())
queue->setName(name);
return queue;
}
2014-05-29 04:28:45 +08:00
void Client::setAuth(Authenticator *auth)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
d->auth_ = QSharedPointer<Authenticator>(auth);
}
2014-05-29 04:28:45 +08:00
Authenticator *Client::auth() const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
return d->auth_.data();
}
2014-05-29 04:28:45 +08:00
bool Client::isSsl() const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
return d->network_->isSsl();
}
2014-05-29 04:28:45 +08:00
void Client::setSsl(bool value)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
d->network_->setSsl(value);
}
2014-05-29 04:28:45 +08:00
bool Client::autoReconnect() const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
return d->network_->autoReconnect();
}
2014-05-29 04:28:45 +08:00
void Client::setAutoReconnect(bool value)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
d->network_->setAutoReconnect(value);
}
2014-05-29 04:28:45 +08:00
bool Client::isConnected() const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
return d->connection_->isConnected();
}
2014-05-29 04:28:45 +08:00
void Client::addCustomProperty(const QString &name, const QString &value)
{
2014-05-29 04:28:45 +08:00
Q_D(Client);
return d->connection_->addCustomProperty(name, value);
}
2014-05-29 04:28:45 +08:00
QString Client::customProperty(const QString &name) const
{
2014-05-29 04:28:45 +08:00
Q_D(const Client);
return d->connection_->customProperty(name);
}
void Client::connectToHost(const QString &connectionString)
{
Q_D(Client);
if (connectionString.isEmpty()) {
d->connect();
return;
}
d->parseConnectionString(QUrl::fromUserInput(connectionString));
d->connect();
}
void Client::connectToHost(const QHostAddress &address, quint16 port)
{
Q_D(Client);
d->host = address.toString();
d->port = port;
d->connect();
}
void Client::disconnectFromHost()
{
Q_D(Client);
d->disconnect();
}