diff --git a/QAMQP.vcproj b/QAMQP.vcproj
index 22a7fb7..efe4726 100644
--- a/QAMQP.vcproj
+++ b/QAMQP.vcproj
@@ -166,6 +166,10 @@
RelativePath=".\src\qamqp\amqp.cpp"
>
+
+
@@ -230,6 +234,10 @@
/>
+
+
diff --git a/src/qamqp/amqp.cpp b/src/qamqp/amqp.cpp
index 4e6c1a1..c5dc36e 100644
--- a/src/qamqp/amqp.cpp
+++ b/src/qamqp/amqp.cpp
@@ -6,6 +6,7 @@
#include "qamqp_global.h"
#include "amqp_exchange.h"
#include "amqp_queue.h"
+#include "amqp_authenticator.h"
using namespace QAMQP;
@@ -31,8 +32,6 @@ ClientPrivate::ClientPrivate(int version )
, port(AMQPPORT)
, host(QString::fromLatin1(AMQPHOST))
, virtualHost(QString::fromLatin1(AMQPVHOST))
- , user(QString::fromLatin1(AMQPLOGIN))
- , password(QString::fromLatin1(AMQPPSWD))
{
}
@@ -55,6 +54,8 @@ void ClientPrivate::init(QObject * parent)
connection_ = new QAMQP::Connection(q_func());
}
+ setAuth(new AMQPlainAuthenticator(QString::fromLatin1(AMQPLOGIN), QString::fromLatin1(AMQPPSWD)));
+
QObject::connect(network_, SIGNAL(method(const QAMQP::Frame::Method &)),
connection_, SLOT(_q_method(const QAMQP::Frame::Method &)));
}
@@ -66,14 +67,27 @@ void ClientPrivate::init(QObject * parent, const QUrl & con)
ClientPrivate::connect();
}
+
+void ClientPrivate::setAuth( Authenticator* auth )
+{
+ auth_ = QSharedPointer(auth);
+}
+
+
void ClientPrivate::printConnect() const
{
QTextStream stream(stdout);
stream << "port = " << port << endl;
stream << "host = " << host << endl;
stream << "vhost = " << virtualHost << endl;
- stream << "user = " << user << endl;
- stream << "passw = " << password << endl;
+
+ if(auth_ && auth_->type() == "AMQPLAIN")
+ {
+ QSharedPointer a = auth_.staticCast();
+ stream << "user = " << a->login() << endl;
+ stream << "passw = " << a->password() << endl;
+ }
+
}
void ClientPrivate::connect()
@@ -221,22 +235,48 @@ void QAMQP::Client::setVirtualHost( const QString & virtualHost )
QString QAMQP::Client::user() const
{
- return d_func()->user;
+ const Authenticator * auth = d_func()->auth_.data();
+
+ if(auth && auth->type() == "AMQPLAIN")
+ {
+ const AMQPlainAuthenticator * a = static_cast(auth);
+ return a->login();
+ }
+ return QString();
}
void QAMQP::Client::setUser( const QString & user )
{
- d_func()->user = user;
+ Authenticator * auth = d_func()->auth_.data();
+
+ if(auth && auth->type() == "AMQPLAIN")
+ {
+ AMQPlainAuthenticator * a = static_cast(auth);
+ a->setLogin(user);
+ }
}
QString QAMQP::Client::password() const
{
- return d_func()->password;
+ const Authenticator * auth = d_func()->auth_.data();
+
+ if(auth && auth->type() == "AMQPLAIN")
+ {
+ const AMQPlainAuthenticator * a = static_cast(auth);
+ return a->password();
+ }
+ return QString();
}
void QAMQP::Client::setPassword( const QString & password )
{
- d_func()->password = password;
+ Authenticator * auth = d_func()->auth_.data();
+
+ if(auth && auth->type() == "AMQPLAIN")
+ {
+ AMQPlainAuthenticator * a = static_cast(auth);
+ a->setPassword(password);
+ }
}
void QAMQP::Client::printConnect() const
@@ -291,4 +331,14 @@ void QAMQP::Client::reopen()
{
return d_func()->connect();
return d_func()->disconnect();
+}
+
+void QAMQP::Client::setAuth( Authenticator * auth )
+{
+ d_func()->setAuth(auth);
+}
+
+Authenticator * QAMQP::Client::auth() const
+{
+ return d_func()->auth_.data();
}
\ No newline at end of file
diff --git a/src/qamqp/amqp.h b/src/qamqp/amqp.h
index a2e9d37..2ccf50b 100644
--- a/src/qamqp/amqp.h
+++ b/src/qamqp/amqp.h
@@ -9,6 +9,7 @@ namespace QAMQP
class Exchange;
class Queue;
class ClientPrivate;
+ class Authenticator;
class ConnectionPrivate;
class Client : public QObject
{
@@ -53,7 +54,9 @@ namespace QAMQP
QString password() const;
void setPassword(const QString & password);
-
+
+ void setAuth(Authenticator * auth);
+ Authenticator * auth() const;
void open();
void open(const QUrl & connectionString);
void close();
diff --git a/src/qamqp/amqp_authenticator.cpp b/src/qamqp/amqp_authenticator.cpp
new file mode 100644
index 0000000..4cc6b52
--- /dev/null
+++ b/src/qamqp/amqp_authenticator.cpp
@@ -0,0 +1,47 @@
+#include "amqp_authenticator.h"
+#include "amqp_frame.h"
+
+QString QAMQP::AMQPlainAuthenticator::login() const
+{
+ return login_;
+}
+
+QString QAMQP::AMQPlainAuthenticator::password() const
+{
+ return password_;
+}
+
+QAMQP::AMQPlainAuthenticator::AMQPlainAuthenticator( const QString & l /*= QString()*/, const QString & p /*= QString()*/ )
+{
+ login_ = l;
+ password_ = p;
+}
+
+QAMQP::AMQPlainAuthenticator::~AMQPlainAuthenticator()
+{
+
+}
+
+QString QAMQP::AMQPlainAuthenticator::type() const
+{
+ return "AMQPLAIN";
+}
+
+void QAMQP::AMQPlainAuthenticator::setLogin( const QString& l )
+{
+ login_ = l;
+}
+
+void QAMQP::AMQPlainAuthenticator::setPassword( const QString &p )
+{
+ password_ = p;
+}
+
+void QAMQP::AMQPlainAuthenticator::write( QDataStream & out )
+{
+ QAMQP::Frame::writeField('s', out, type());
+ QAMQP::Frame::TableField response;
+ response["LOGIN"] = login_;
+ response["PASSWORD"] = password_;
+ QAMQP::Frame::serialize(out, response);
+}
\ No newline at end of file
diff --git a/src/qamqp/amqp_authenticator.h b/src/qamqp/amqp_authenticator.h
new file mode 100644
index 0000000..4d46f9d
--- /dev/null
+++ b/src/qamqp/amqp_authenticator.h
@@ -0,0 +1,33 @@
+#ifndef amqp_authenticator_h__
+#define amqp_authenticator_h__
+
+#include "qamqp_global.h"
+#include
+#include
+
+namespace QAMQP
+{
+ class Authenticator
+ {
+ public:
+ virtual ~Authenticator(){};
+ virtual QString type() const = 0;
+ virtual void write(QDataStream & out) = 0;
+ };
+
+ class AMQPlainAuthenticator : public Authenticator
+ {
+ QString login_, password_;
+ public:
+ AMQPlainAuthenticator(const QString & login = QString(), const QString & password = QString());
+ virtual ~AMQPlainAuthenticator();
+ QString login() const;
+ void setLogin(const QString& l);
+ QString password() const;
+ void setPassword(const QString &p);
+ virtual QString type() const;
+ virtual void write(QDataStream & out);
+ };
+
+}
+#endif // amqp_authenticator_h__
\ No newline at end of file
diff --git a/src/qamqp/amqp_channel.cpp b/src/qamqp/amqp_channel.cpp
index 175794b..9dee78e 100644
--- a/src/qamqp/amqp_channel.cpp
+++ b/src/qamqp/amqp_channel.cpp
@@ -159,11 +159,13 @@ void ChannelPrivate::init(int channelNumber, Client * parent)
}
-void ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
+bool ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
{
- if(frame.methodClass() != QAMQP::Frame::fcChannel
- || frame.channel() != number )
- return;
+ if(frame.channel() != number )
+ return true;
+
+ if(frame.methodClass() != QAMQP::Frame::fcChannel)
+ return false;
qDebug("Channel#%d:", number);
@@ -185,6 +187,7 @@ void ChannelPrivate::_q_method( const QAMQP::Frame::Method & frame )
closeOk(frame);
break;
}
+ return true;
}
void ChannelPrivate::_q_open()
diff --git a/src/qamqp/amqp_channel_p.h b/src/qamqp/amqp_channel_p.h
index 95317e2..c1f899e 100644
--- a/src/qamqp/amqp_channel_p.h
+++ b/src/qamqp/amqp_channel_p.h
@@ -44,7 +44,7 @@ namespace QAMQP
};
ChannelPrivate(int version = QObjectPrivateVersion);
- ~ChannelPrivate();
+ virtual ~ChannelPrivate();
void init(int channelNumber, Client * parent);
@@ -62,7 +62,7 @@ namespace QAMQP
void close(const QAMQP::Frame::Method & frame);
void closeOk(const QAMQP::Frame::Method & frame);
- virtual void _q_method(const QAMQP::Frame::Method & frame);
+ virtual bool _q_method(const QAMQP::Frame::Method & frame);
void _q_open();
void setQOS(qint32 prefetchSize, quint16 prefetchCount);
diff --git a/src/qamqp/amqp_connection.cpp b/src/qamqp/amqp_connection.cpp
index 69ece86..141a8ae 100644
--- a/src/qamqp/amqp_connection.cpp
+++ b/src/qamqp/amqp_connection.cpp
@@ -57,16 +57,13 @@ void ConnectionPrivate::startOk()
QDataStream stream(&arguments_, QIODevice::WriteOnly);
QAMQP::Frame::TableField clientProperties;
- clientProperties["version"] = "0.0.1";
+ clientProperties["version"] = "0.0.3";
clientProperties["platform"] = QString("Qt %1").arg(qVersion());
clientProperties["product"] = "QAMQP";
QAMQP::Frame::serialize(stream, clientProperties);
- QAMQP::Frame::writeField('s', stream, "AMQPLAIN");
- QAMQP::Frame::TableField response;
- response["LOGIN"] = client_->user();
- response["PASSWORD"] = client_->password();
- QAMQP::Frame::serialize(stream, response);
+ client_->d_func()->auth_->write(stream);
+
QAMQP::Frame::writeField('s', stream, "en_US");
frame.setArguments(arguments_);
@@ -234,10 +231,10 @@ void ConnectionPrivate::setQOS( qint32 prefetchSize, quint16 prefetchCount, int
}
-void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
+bool ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
{
if(frame.methodClass() != QAMQP::Frame::fcConnection)
- return;
+ return true;
qDebug() << "Connection:";
@@ -245,7 +242,7 @@ void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
{
if( frame.id() == miCloseOk)
closeOk(frame);
- return;
+ return true;
}
switch(MethodId(frame.id()))
@@ -270,8 +267,9 @@ void ConnectionPrivate::_q_method( const QAMQP::Frame::Method & frame )
break;
default:
qWarning("Unknown method-id %d", frame.id());
+ return false;
}
-
+ return true;
}
//////////////////////////////////////////////////////////////////////////
diff --git a/src/qamqp/amqp_connection_p.h b/src/qamqp/amqp_connection_p.h
index d93c219..3ceecb1 100644
--- a/src/qamqp/amqp_connection_p.h
+++ b/src/qamqp/amqp_connection_p.h
@@ -38,7 +38,7 @@ namespace QAMQP
void openOk(const QAMQP::Frame::Method & frame);
void close(const QAMQP::Frame::Method & frame);
void closeOk(const QAMQP::Frame::Method & frame);
- void _q_method(const QAMQP::Frame::Method & frame);
+ bool _q_method(const QAMQP::Frame::Method & frame);
void setQOS(qint32 prefetchSize, quint16 prefetchCount, int channel, bool global);
diff --git a/src/qamqp/amqp_exchange.cpp b/src/qamqp/amqp_exchange.cpp
index e0e713e..ac36121 100644
--- a/src/qamqp/amqp_exchange.cpp
+++ b/src/qamqp/amqp_exchange.cpp
@@ -128,12 +128,13 @@ ExchangePrivate::~ExchangePrivate()
}
-void ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
+bool ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
{
- ChannelPrivate::_q_method(frame);
- if(frame.methodClass() != QAMQP::Frame::fcExchange
- || frame.channel() != number )
- return;
+ if(ChannelPrivate::_q_method(frame))
+ return true;
+
+ if(frame.methodClass() != QAMQP::Frame::fcExchange)
+ return false;
switch(frame.id())
{
@@ -146,6 +147,7 @@ void ExchangePrivate::_q_method( const QAMQP::Frame::Method & frame )
default:
break;
}
+ return true;
}
void ExchangePrivate::declareOk( const QAMQP::Frame::Method & frame )
diff --git a/src/qamqp/amqp_exchange_p.h b/src/qamqp/amqp_exchange_p.h
index f91f429..7a28a43 100644
--- a/src/qamqp/amqp_exchange_p.h
+++ b/src/qamqp/amqp_exchange_p.h
@@ -30,7 +30,7 @@ namespace QAMQP
Exchange::ExchangeOptions options;
TableField arguments;
- void _q_method(const QAMQP::Frame::Method & frame);
+ bool _q_method(const QAMQP::Frame::Method & frame);
bool deleyedDeclare;
bool declared;
diff --git a/src/qamqp/amqp_message.h b/src/qamqp/amqp_message.h
index 640078a..5266723 100644
--- a/src/qamqp/amqp_message.h
+++ b/src/qamqp/amqp_message.h
@@ -9,8 +9,13 @@ namespace QAMQP
{
Message()
{
+ qDebug("Message create");
leftSize = 0;
}
+ ~Message()
+ {
+ qDebug("Message release");
+ }
typedef QAMQP::Frame::Content::Property MessageProperty;
Q_DECLARE_FLAGS(MessageProperties, MessageProperty);
diff --git a/src/qamqp/amqp_p.h b/src/qamqp/amqp_p.h
index b41b3c6..5edd73b 100644
--- a/src/qamqp/amqp_p.h
+++ b/src/qamqp/amqp_p.h
@@ -2,9 +2,11 @@
#define qamqp_amqp_p_h__
#include
+#include
#include "amqp_network.h"
#include "amqp_connection.h"
+#include "amqp_authenticator.h"
namespace QAMQP
{
@@ -23,17 +25,20 @@ namespace QAMQP
void parseCnnString( const QUrl & connectionString);
void sockConnect();
void login();
-
+ void setAuth(Authenticator* auth);
Exchange * createExchange(int channelNumber, const QString &name);
Queue * createQueue(int channelNumber, const QString &name);
quint32 port;
QString host;
QString virtualHost;
+/*
QString user;
- QString password;
+ QString password;*/
+
QPointer network_;
QPointer connection_;
+ QSharedPointer auth_;
};
}
#endif // amqp_p_h__
diff --git a/src/qamqp/amqp_queue.cpp b/src/qamqp/amqp_queue.cpp
index ef0aaf9..be1f6b3 100644
--- a/src/qamqp/amqp_queue.cpp
+++ b/src/qamqp/amqp_queue.cpp
@@ -171,11 +171,10 @@ QueuePrivate::~QueuePrivate()
}
-void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
+bool QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
{
- ChannelPrivate::_q_method(frame);
- if(frame.channel() != number)
- return;
+ if(ChannelPrivate::_q_method(frame))
+ return true;
if(frame.methodClass() == QAMQP::Frame::fcQueue)
{
@@ -199,6 +198,7 @@ void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
default:
break;
}
+ return true;
}
if(frame.methodClass() == QAMQP::Frame::fcBasic)
@@ -214,13 +214,17 @@ void QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
default:
break;
}
+ return true;
}
+
+ return false;
}
void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame )
{
+
qDebug() << "Declared queue: " << name;
declared = true;
@@ -237,7 +241,7 @@ void QueuePrivate::declareOk( const QAMQP::Frame::Method & frame )
void QueuePrivate::deleteOk( const QAMQP::Frame::Method & frame )
{
- qDebug() << "Deleted or purged queue: " << name;
+ qDebug() << "Deleted or purged queue: " << name;
declared = false;
QByteArray data = frame.arguments();
@@ -257,6 +261,7 @@ void QueuePrivate::bindOk( const QAMQP::Frame::Method & frame )
void QueuePrivate::unbindOk( const QAMQP::Frame::Method & frame )
{
+
qDebug() << "Unbinded queue: " << name;
QMetaObject::invokeMethod(q_func(), "binded", Q_ARG(bool, false));
}
@@ -281,6 +286,7 @@ void QueuePrivate::declare()
frame.setArguments(arguments_);
sendFrame(frame);
deleyedDeclare = false;
+
}
@@ -307,6 +313,7 @@ void QueuePrivate::remove( bool ifUnused /*= true*/, bool ifEmpty /*= true*/, bo
frame.setArguments(arguments_);
sendFrame(frame);
+
}
void QueuePrivate::purge()
@@ -349,6 +356,7 @@ void QueuePrivate::bind( const QString & exchangeName, const QString & key )
frame.setArguments(arguments_);
sendFrame(frame);
+
}
void QueuePrivate::unbind( const QString & exchangeName, const QString & key )
@@ -394,11 +402,13 @@ void QueuePrivate::consume( Queue::ConsumeOptions options )
frame.setArguments(arguments_);
sendFrame(frame);
+
}
void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame )
{
+
qDebug() << "Consume ok: " << name;
declared = false;
@@ -411,7 +421,6 @@ void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame )
void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
{
- qDebug() << "* Receive message: ";
declared = false;
QByteArray data = frame.arguments();
@@ -427,12 +436,6 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
QString exchangeName = readField('s',in).toString();
QString routingKey = readField('s',in).toString();
- qDebug() << "| Delivery-tag: " << deliveryTag;
- qDebug() << "| Redelivered: " << redelivered;
- qDebug("| Exchange-name: %s", qPrintable(exchangeName));
- qDebug("| Routing-key: %s", qPrintable(routingKey));
-
-
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
@@ -444,9 +447,6 @@ void QueuePrivate::_q_content( const QAMQP::Frame::Content & frame )
{
if(frame.channel() != number)
return;
- QFile::remove("dump.jpg");
- qDebug() << "Content-type: " << qPrintable(frame.property(Content::cpContentType).toString());
- qDebug() << "Encoding-type: " << qPrintable(frame.property(Content::cpContentEncoding).toString());
if(messages_.isEmpty())
{
qErrnoWarning("Received content-header without method frame before");
diff --git a/src/qamqp/amqp_queue_p.h b/src/qamqp/amqp_queue_p.h
index 6648e4b..d1f1a84 100644
--- a/src/qamqp/amqp_queue_p.h
+++ b/src/qamqp/amqp_queue_p.h
@@ -48,7 +48,7 @@ namespace QAMQP
QString type;
Queue::QueueOptions options;
- void _q_method(const QAMQP::Frame::Method & frame);
+ bool _q_method(const QAMQP::Frame::Method & frame);
bool deleyedDeclare;
bool declared;
diff --git a/src/test.cpp b/src/test.cpp
index 36936d5..1c83a8e 100644
--- a/src/test.cpp
+++ b/src/test.cpp
@@ -1,19 +1,28 @@
#include "test.h"
+#include
+#include
Test::Test()
{
QUrl con(QString("amqp://guest:guest@localhost:5672/"));
client_ = new QAMQP::Client(this);
client_->open(con);
- exchange_ = client_->createExchange("test.test");
- exchange_->declare("direct");
+ exchange_ = client_->createExchange("test.test2");
+ exchange_->declare("fanout");
queue_ = client_->createQueue("test.my_queue", exchange_->channelNumber());
queue_->declare();
+ queue2_ = client_->createQueue("test.my_queue2");
+ queue2_->declare();
+
exchange_->bind(queue_);
+ exchange_->bind(queue2_);
connect(queue_, SIGNAL(declared()), this, SLOT(declared()));
+ connect(queue_, SIGNAL(messageRecieved()), this, SLOT(newMessage()));
+
+ connect(queue2_, SIGNAL(messageRecieved()), this, SLOT(newMessage()));
}
Test::~Test()
@@ -24,5 +33,32 @@ Test::~Test()
void Test::declared()
{
qDebug("\t-= Ready =-");
- exchange_->publish("test 3432 432 24 23 423 32 23 4324 32 423 423 423", exchange_->name());
-}
\ No newline at end of file
+ //queue_->purge();
+ QFile f("D:/geoip.eap");
+ f.open(QIODevice::ReadOnly);
+ exchange_->publish(f.readAll(), exchange_->name(), "image/jpg");
+ //queue_->remove(true, false, false);
+ queue_->setQOS(0,10);
+ queue_->setConsumerTag("qamqp-consumer");
+ queue_->consume(QAMQP::Queue::coNoAck);
+
+ queue2_->setQOS(0,10);
+ queue2_->setConsumerTag("qamqp-consumer2");
+ queue2_->consume(QAMQP::Queue::coNoAck);
+ //exchange_->remove(false, false);
+}
+
+void Test::newMessage()
+{
+ QAMQP::Queue * q = qobject_cast(sender());
+ while (q->hasMessage())
+ {
+ QAMQP::MessagePtr message = q->getMessage();
+ qDebug("+ RECEIVE MESSAGE");
+ qDebug("| Exchange-name: %s", qPrintable(message->exchangeName));
+ qDebug("| Routing-key: %s", qPrintable(message->routeKey));
+ qDebug("| Content-type: %s", qPrintable(message->property[QAMQP::Frame::Content::cpContentType].toString()));
+
+ }
+
+}
diff --git a/src/test.h b/src/test.h
index 667d388..27e768c 100644
--- a/src/test.h
+++ b/src/test.h
@@ -15,8 +15,10 @@ public:
private slots:
void declared();
+ void newMessage();
+
private:
QPointer client_;
QPointer exchange_;
- QPointer queue_;
+ QPointer queue_, queue2_;
};
\ No newline at end of file