This commit is contained in:
fuCtor 2012-02-26 01:21:57 -08:00
parent 267c9801f0
commit 551f9a13a5
7 changed files with 161 additions and 4 deletions

61
README.md Normal file
View File

@ -0,0 +1,61 @@
QAMQP
=============
Qt4 implementation of AMQP 0.9.1.
Implement
------------
### Connection
work with socket connections
* start - start connection negotiation
* startok - select security mechanism and locale
* tune - propose connection tuning parameters
* tuneok - negotiate connection tuning parameters
* open - open connection to virtual host
* openok - signal that connection is ready
* close - request a connection close
* closeok - confirm a connection close
### Channel
work with channels
* open - open a channel for use
* openok - signal that the channel is ready
* close - request a channel close
* closeok - confirm a channel close
### Exchange
work with exchanges
* declare - verify exchange exists, create if needed
* declareok - confirm exchange declaration
* delete - delete an exchange
* deleteok - confirm deletion of an exchange
### Queue
work with queues
* declare - declare queue, create if needed
* declareok - confirms a queue definition
* bind - bind queue to an exchange
* bindok - confirm bind successful
* unbind - unbind a queue from an exchange
* unbindok - confirm unbind successful
* purge - purge a queue
* purgeok - confirms a queue purge
* delete - delete a queue
* deleteok - confirm deletion of a queue
### Basic
work with basic content
* qos - specify quality of service
* qosok - confirm the requested qos
* consume - start a queue consumer
* consumeok - confirm a new consumer
* publish - publish a message
* deliver - notify the client of a consumer message
* get - direct access to a queue
* getok - provide client with a message
* getempty - indicate no messages available
* ack - acknowledge one or more messages

View File

@ -36,7 +36,7 @@ namespace QAMQP
bmReturn = 50,
bmDeliver = 60,
METHOD_ID_ENUM(bmGet, 70),
bmgetEmpty = 72,
bmGetEmpty = 72,
bmAck = 80,
bmReject = 90,
bmRecoverAsync = 100,

View File

@ -54,6 +54,7 @@ namespace QAMQP
Q_SIGNALS:
void declared();
void binded();
void removed();
};
}

View File

@ -11,6 +11,7 @@ namespace QAMQP
{
qDebug("Message create");
leftSize = 0;
deliveryTag = 0;
}
~Message()
{
@ -19,6 +20,7 @@ namespace QAMQP
typedef QAMQP::Frame::Content::Property MessageProperty;
Q_DECLARE_FLAGS(MessageProperties, MessageProperty);
qlonglong deliveryTag;
QByteArray payload;
QHash<MessageProperty, QVariant> property;
QAMQP::Frame::TableField headers;

View File

@ -73,7 +73,15 @@ Queue::QueueOptions Queue::option() const
return d_func()->options;
}
void Queue::setNoAck( bool noAck )
{
d_func()->noAck = noAck;
}
bool Queue::noAck() const
{
return d_func()->noAck;
}
void Queue::declare()
{
@ -153,6 +161,18 @@ QString Queue::consumerTag() const
return d_func()->consumerTag;
}
void Queue::get()
{
d_func()->get();
}
void Queue::ack( const MessagePtr & message )
{
d_func()->ack(message);
}
//////////////////////////////////////////////////////////////////////////
@ -161,6 +181,7 @@ QueuePrivate::QueuePrivate()
, deleyedDeclare(false)
, declared(false)
, recievingMessage(false)
, noAck(true)
{
}
@ -211,6 +232,12 @@ bool QueuePrivate::_q_method( const QAMQP::Frame::Method & frame )
case bmDeliver:
deliver(frame);
break;
case bmGetOk:
getOk(frame);
break;
case bmGetEmpty:
QMetaObject::invokeMethod(q_func(), "empty");
break;
default:
break;
}
@ -382,6 +409,61 @@ void QueuePrivate::unbind( const QString & exchangeName, const QString & key )
}
void QueuePrivate::get()
{
if(!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmGet);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << qint16(0); //reserver 1
writeField('s', out, name);
out << qint8(noAck ? 1 : 0); // noAck
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::getOk( const QAMQP::Frame::Method & frame )
{
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
qlonglong deliveryTag = readField('L',in).toLongLong();
bool redelivered = readField('t',in).toBool();
QString exchangeName = readField('s',in).toString();
QString routingKey = readField('s',in).toString();
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag;
messages_.enqueue(newMessage);
}
void QueuePrivate::ack( const MessagePtr & Message )
{
if(!opened)
{
return;
}
QAMQP::Frame::Method frame(QAMQP::Frame::fcBasic, bmAck);
frame.setChannel(number);
QByteArray arguments_;
QDataStream out(&arguments_, QIODevice::WriteOnly);
out << Message->deliveryTag; //reserver 1
out << qint8(0); // noAck
frame.setArguments(arguments_);
sendFrame(frame);
}
void QueuePrivate::consume( Queue::ConsumeOptions options )
{
@ -421,7 +503,6 @@ void QueuePrivate::consumeOk( const QAMQP::Frame::Method & frame )
void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
{
declared = false;
QByteArray data = frame.arguments();
QDataStream in(&data, QIODevice::ReadOnly);
@ -439,6 +520,7 @@ void QueuePrivate::deliver( const QAMQP::Frame::Method & frame )
MessagePtr newMessage = MessagePtr(new Message);
newMessage->routeKey = routingKey;
newMessage->exchangeName = exchangeName;
newMessage->deliveryTag = deliveryTag;
messages_.enqueue(newMessage);
}

View File

@ -17,6 +17,7 @@ namespace QAMQP
Q_PROPERTY(QueueOptions option READ option );
Q_PROPERTY(QString consumerTag READ consumerTag WRITE setConsumerTag)
Q_PROPERTY(bool noAck READ noAck WRITE setNoAck)
Q_DECLARE_PRIVATE(QAMQP::Queue)
Q_DISABLE_COPY(Queue);
@ -62,16 +63,22 @@ namespace QAMQP
void unbind(Exchange * exchange, const QString & key);
MessagePtr getMessage();
void get();
void ack(const MessagePtr & message);
bool hasMessage() const;
void consume(ConsumeOptions options = ConsumeOptions(NoOptions));
void setConsumerTag(const QString &consumerTag);
QString consumerTag() const;
void setNoAck(bool noAck);
bool noAck() const;
Q_SIGNALS:
void declared();
void binded(bool);
void removed();
void messageRecieved();
void empty();
private:
Q_PRIVATE_SLOT(d_func(), void _q_content(const QAMQP::Frame::Content & frame))

View File

@ -41,9 +41,12 @@ namespace QAMQP
/************************************************************************/
void consume(Queue::ConsumeOptions options);
void consumeOk(const QAMQP::Frame::Method & frame);
void consumeOk(const QAMQP::Frame::Method & frame);
void deliver(const QAMQP::Frame::Method & frame);
void get();
void getOk(const QAMQP::Frame::Method & frame);
void ack(const MessagePtr & Message);
QString type;
Queue::QueueOptions options;
@ -52,6 +55,7 @@ namespace QAMQP
bool deleyedDeclare;
bool declared;
bool noAck;
QString consumerTag;
QMap<QString, QString> delayedBindings;