implemented consuming, refactored a lot of code from cpp files to header files

This commit is contained in:
Emiel Bruijntjes 2014-01-05 09:50:41 -08:00
parent 5269f51a92
commit cf1cbfa551
66 changed files with 1028 additions and 886 deletions

View File

@ -5,6 +5,12 @@ LIBRARY_DIR = ${PREFIX}/lib
all:
$(MAKE) -C src all
static:
$(MAKE) -C src static
shared:
$(MAKE) -C src shared
clean:
$(MAKE) -C src clean

View File

@ -274,6 +274,13 @@ public:
/**
* Set the Quality of Service (QOS) for this channel
*
* When you consume messages, every single messages needs to be ack'ed to inform
* the RabbitMQ server that is has been received. The Qos setting specifies the
* number of unacked messages that may exist in the client application. The server
* stops delivering more messages if the number of unack'ed messages has reached
* the prefetchCount
*
* @param prefetchCount maximum number of messages to prefetch
* @return bool whether the Qos frame is sent.
*/
@ -282,6 +289,58 @@ public:
return _implementation.setQos(prefetchCount);
}
/**
* Tell the RabbitMQ server that we're ready to consume messages
*
* After this method is called, RabbitMQ starts delivering messages to the client
* application. The consume tag is a string identifier that will be passed to
* each received message, so that you can associate incoming messages with a
* consumer. If you do not specify a consumer tag, the server will assign one
* for you.
*
* The following flags are supported:
*
* - nolocal if set, messages published on this channel are not also consumed
* - noack if set, consumed messages do not have to be acked, this happens automatically
* - exclusive request exclusive access, only this consumer can access the queue
* - nowait the server does not have to send a response back that consuming is active
*
* The method ChannelHandler::onConsumerStarted() will be called when the
* consumer has started (unless the nowait option was set, in which case
* no confirmation method is called)
*
* @param queue the queue from which you want to consume
* @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags
* @param arguments additional arguments
* @return bool
*/
bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments) { return _implementation.consume(queue, tag, flags, arguments); }
bool consume(const std::string &queue, const std::string &tag, int flags = 0) { return _implementation.consume(queue, tag, flags, Table()); }
bool consume(const std::string &queue, const std::string &tag, const Table &arguments) { return _implementation.consume(queue, tag, 0, arguments); }
bool consume(const std::string &queue, int flags, const Table &arguments) { return _implementation.consume(queue, std::string(), flags, arguments); }
bool consume(const std::string &queue, int flags = 0) { return _implementation.consume(queue, std::string(), flags, Table()); }
bool consume(const std::string &queue, const Table &arguments) { return _implementation.consume(queue, std::string(), 0, arguments); }
/**
* Cancel a running consume call
*
* If you want to stop a running consumer, you can use this method with the consumer tag
*
* The following flags are supported:
*
* - nowait the server does not have to send a response back that the consumer has been cancelled
*
* The method ChannelHandler::onConsumerStopped() will be called when the consumer
* was succesfully stopped (unless the nowait option was used, in which case no
* confirmation method is called)
*
* @param tag the consumer tag
* @param flags optional additional flags
* @return bool
*/
bool cancel(const std::string &tag, int flags = 0) { return _implementation.cancel(tag, flags); }
/**
* Close the current channel
* @return bool

View File

@ -23,91 +23,92 @@ class ChannelHandler
public:
/**
* Method that is called when the channel was succesfully created.
* @param channel
* @param channel the channel that is ready
*/
virtual void onReady(Channel *channel) {}
/**
* An error has occured on the channel
* @param channel
* @param message
* The channel is no longer usable after an error has occured on it
* @param channel the channel on which the error occured
* @param message human readable error message
*/
virtual void onError(Channel *channel, const std::string &message) {}
/**
* Method that is called when the channel was paused
* This is the result of a call to Channel::pause()
* @param channel
* @param channel the channel that is now paused
*/
virtual void onPaused(Channel *channel) {}
/**
* Method that is called when the channel was resumed
* This is the result of a call to Channel::resume()
* @param channel
* @param channel the channel that is no longer paused
*/
virtual void onResumed(Channel *channel) {}
/**
* Method that is called when a channel is closed
* This is the result of a call to Channel::close()
* @param channel
* @param channel the channel that is closed
*/
virtual void onClosed(Channel *channel) {}
/**
* Method that is called when a transaction was started
* This is the result of a call to Channel::startTransaction()
* @param channel
* @param channel the channel on which the transaction was started
*/
virtual void onTransactionStarted(Channel *channel) {}
/**
* Method that is called when a transaction was committed
* This is the result of a call to Channel::commitTransaction()
* @param channel
* @param channel the channel on which the transaction was committed
*/
virtual void onTransactionCommitted(Channel *channel) {}
/**
* Method that is called when a transaction was rolled back
* This is the result of a call to Channel::rollbackTransaction()
* @param channel
* @param channel the channel on which the transaction was rolled back
*/
virtual void onTransactionRolledBack(Channel *channel) {}
/**
* Method that is called when an exchange is bound
* This is the result of a call to Channel::bindExchange()
* @param channel
* @param channel the channel on which the exchange was bound
*/
virtual void onExchangeBound(Channel *channel) {}
/**
* Method that is called when an exchange is unbound
* This is the result of a call to Channel::unbindExchange()
* @param channel
* @param channel the channel on which the exchange was unbound
*/
virtual void onExchangeUnbound(Channel *channel) {}
/**
* Method that is called when an exchange is deleted
* This is the result of a call to Channel::deleteExchange()
* @param channel
* @param channel the channel on which the exchange was deleted
*/
virtual void onExchangeDeleted(Channel *channel) {}
/**
* Mehod that is called when an exchange is declared
* This is the result of a call to Channel::declareExchange()
* @param channel
* @param channel the channel on which the exchange was declared
*/
virtual void onExchangeDeclared(Channel *channel) {}
/**
* Method that is called when a queue is declared
* This is the result of a call to Channel::declareQueue()
* @param channel
* @param channel the channel on which the queue was declared
* @param name name of the queue
* @param messageCount number of messages in queue
* @param consumerCount number of active consumers
@ -117,15 +118,14 @@ public:
/**
* Method that is called when a queue is bound
* This is the result of a call to Channel::bindQueue()
* @param channel
* @param
* @param channel the channel on which the queue was bound
*/
virtual void onQueueBound(Channel *channel) {}
/**
* Method that is called when a queue is deleted
* This is the result of a call to Channel::deleteQueue()
* @param channel
* @param channel the channel on which the queue was deleted
* @param messageCount number of messages deleted along with the queue
*/
virtual void onQueueDeleted(Channel *channel, uint32_t messageCount) {}
@ -133,22 +133,47 @@ public:
/**
* Method that is called when a queue is unbound
* This is the result of a call to Channel::unbindQueue()
* @param channel
* @param channel the channel on which the queue was unbound
*/
virtual void onQueueUnbound(Channel *channel) {}
/**
* Method that is called when a queue is purged
* This is the result of a call to Channel::purgeQueue()
* @param messageCount number of message purged
* @param channel the channel on which the queue was emptied
* @param messageCount number of message purged
*/
virtual void onQueuePurged(Channel *channel, uint32_t messageCount) {}
/**
* Method that is called when the quality-of-service was changed
* This is the result of a call to Channel::setQos()
* @param channel the channel on which the qos was set
*/
virtual void onQosSet(Channel *channel) {}
/**
* Method that is called when a consumer was started
* This is the result of a call to Channel::consume()
* @param channel the channel on which the consumer was started
* @param tag the consumer tag
*/
virtual void onConsumerStarted(Channel *channel, const std::string &tag) {}
/**
* Method that is called when a message has been consumed
* @param channel the channel on which the consumer was started
* @param message the consumed message
*/
virtual void onConsumed(Channel *channel, const Message &message) {}
/**
* Method that is called when a consumer was stopped
* This is the result of a call to Channel::cancel()
* @param channel the channel on which the consumer was stopped
* @param tag the consumer tag
*/
virtual void onConsumerStopped(Channel *channel, const std::string &tag) {}
};

View File

@ -57,6 +57,12 @@ private:
* @var bool
*/
bool _transaction = false;
/**
* The message that is now being received
* @var MessageImpl
*/
MessageImpl *_message = nullptr;
/**
* Construct a channel object
@ -236,6 +242,23 @@ public:
*/
bool setQos(uint16_t prefetchCount);
/**
* Tell the RabbitMQ server that we're ready to consume messages
* @param queue the queue from which you want to consume
* @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags
* @param arguments additional arguments
* @return bool
*/
bool consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments);
/**
* Cancel a running consumer
* @param tag the consumer tag
* @param flags optional flags
*/
bool cancel(const std::string &tag, int flags);
/**
* Close the current channel
* @return bool
@ -395,6 +418,45 @@ public:
if (_handler) _handler->onQosSet(_parent);
}
/**
* Report that a consumer has started
* @param tag the consumer tag
*/
void reportConsumerStarted(const std::string &tag)
{
if (_handler) _handler->onConsumerStarted(_parent, tag);
}
/**
* Report that a consumer has stopped
* @param tag the consumer tag
*/
void reportConsumerStopped(const std::string &tag)
{
if (_handler) _handler->onConsumerStopped(_parent, tag);
}
/**
* Report the consumed message
*/
void reportDelivery();
/**
* Create an incoming message
* @param frame
* @return MessageImpl
*/
MessageImpl *message(const BasicDeliverFrame &frame);
/**
* Retrieve the current incoming message
* @return MessageImpl
*/
MessageImpl *message()
{
return _message;
}
/**
* The channel class is its friend, thus can it instantiate this object
*/

View File

@ -14,6 +14,9 @@ namespace AMQP {
* All classes defined by this library
*/
class Array;
class BasicDeliverFrame;
class BasicHeaderFrame;
class BodyFrame;
class Channel;
class Connection;
class ConnectionHandler;
@ -21,6 +24,7 @@ class ConnectionImpl;
class Exchange;
class Frame;
class Login;
class MessageImpl;
class Monitor;
class OutBuffer;
class ReceivedFrame;

View File

@ -17,7 +17,7 @@ namespace AMQP {
*/
class Envelope : public MetaData
{
private:
protected:
/**
* Pointer to the body data (the memory buffer is not managed by the AMQP
* library!)
@ -29,7 +29,7 @@ private:
* Size of the data
* @var uint64_t
*/
uint64_t _bodysize;
uint64_t _bodySize;
public:
/**
@ -41,13 +41,13 @@ public:
* @param body
* @param size
*/
Envelope(const char *body, uint64_t size) : MetaData(), _body(body), _bodysize(size) {}
Envelope(const char *body, uint64_t size) : MetaData(), _body(body), _bodySize(size) {}
/**
* Constructor based on a string
* @param body
*/
Envelope(const std::string &body) : MetaData(), _body(body.data()), _bodysize(body.size()) {}
Envelope(const std::string &body) : MetaData(), _body(body.data()), _bodySize(body.size()) {}
/**
* Destructor
@ -69,7 +69,16 @@ public:
*/
uint64_t bodySize() const
{
return _bodysize;
return _bodySize;
}
/**
* Body as a string
* @return string
*/
std::string message() const
{
return std::string(_body, _bodySize);
}
};

124
include/message.h Normal file
View File

@ -0,0 +1,124 @@
/**
* Message.h
*
* An incoming message has the same sort of information as an outgoing
* message, plus some additional information.
*
* Message objects can not be constructed by end users, they are only constructed
* by the AMQP library, and passed to the ChannelHandler::onDelivered() method
*
* @copyright 2014 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class Message : public Envelope
{
protected:
/**
* The consumer tag over which it was delivered
* @var string
*/
std::string _consumerTag;
/**
* Unique delivery tag to identify and ack the mesage
* @var uint64_t
*/
uint64_t _deliveryTag;
/**
* Is this a redelivered message / has it been delivered before?
* @var bool
*/
bool _redelivered;
/**
* The exchange to which it was originally published
* @var string
*/
std::string _exchange;
/**
* The routing key that was originally used
* @var string
*/
std::string _routingKey;
protected:
/**
* The constructor is protected to ensure that endusers can not
* instantiate a message
* @param consumerTag
* @param deliveryTag
* @param redelivered
* @param exchange
* @param routingKey
*/
Message(const std::string &consumerTag, uint64_t deliveryTag, bool redelivered, const std::string &exchange, const std::string &routingKey) :
Envelope(nullptr, 0), _consumerTag(consumerTag), _deliveryTag(deliveryTag), _redelivered(redelivered), _exchange(exchange), _routingKey(routingKey)
{}
public:
/**
* Destructor
*/
virtual ~Message() {}
/**
* The consumer tag over which it was delivered
* @return string
*/
std::string &consumerTag()
{
return _consumerTag;
}
/**
* Unique delivery tag to identify and ack the mesage
* @return uint64_t
*/
uint64_t deliveryTag()
{
return _deliveryTag;
}
/**
* Is this a redelivered message / has it been delivered before?
* @var bool
*/
bool redelivered()
{
return _redelivered;
}
/**
* The exchange to which it was originally published
* @var string
*/
std::string &exchange()
{
return _exchange;
}
/**
* The routing key that was originally used
* @var string
*/
std::string &routingKey()
{
return _routingKey;
}
};
/**
* End of namespace
*/
}

View File

@ -153,6 +153,31 @@ public:
*/
virtual ~MetaData() {}
/**
* Set all meta data
* @param data
*/
void set(const MetaData &data)
{
// simply copy all fields
_bools1 = data._bools1;
_bools2 = data._bools2;
_contentType = data._contentType;
_contentEncoding = data._contentEncoding;
_headers = data._headers;
_deliveryMode = data._deliveryMode;
_priority = data._priority;
_correlationID = data._correlationID;
_replyTo = data._replyTo;
_expiration = data._expiration;
_messageID = data._messageID;
_timestamp = data._timestamp;
_typeName = data._typeName;
_userID = data._userID;
_appID = data._appID;
_clusterID = data._clusterID;
}
/**
* Check if a certain field is set
* @return bool

View File

@ -44,6 +44,7 @@
// envelope for publishing and consuming
#include <libamqp/metadata.h>
#include <libamqp/envelope.h>
#include <libamqp/message.h>
// mid level includes
#include <libamqp/exchangetype.h>

View File

@ -3,27 +3,30 @@ RM = rm -f
CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g
LD = g++
LD_FLAGS = -Wall -shared -O2
RESULT = liblibamqp.so
STATIC = $(RESULT:%.so=%.a)
SHARED_LIB = liblibamqp.so
STATIC_LIB = $(SHARED_LIB:%.so=%.a)
SOURCES = $(wildcard *.cpp)
OBJECTS = $(SOURCES:%.cpp=%.o)
OBJECTS_STATIC = $(SOURCES:%.cpp=%.s.o)
SHARED_OBJECTS = $(SOURCES:%.cpp=%.o)
STATIC_OBJECTS = $(SOURCES:%.cpp=%.s.o)
all: ${OBJECTS} ${OBJECTS_STATIC} ${RESULT}
$(MAKE) STATIC_LIB
all: shared static
${RESULT}: ${OBJECTS}
${LD} ${LD_FLAGS} -o $@ ${OBJECTS}
shared: ${SHARED_OBJECTS} ${SHARED_LIB}
static: ${STATIC_OBJECTS} ${STATIC_LIB}
${SHARED_LIB}: ${SHARED_OBJECTS}
${LD} ${LD_FLAGS} -o $@ ${SHARED_OBJECTS}
${STATIC_LIB}: ${STATIC_OBJECTS}
ar rcs ${STATIC_LIB} ${STATIC_OBJECTS}
clean:
${RM} *.obj *~* ${OBJECTS} ${OBJECTS_STATIC} ${RESULT} ${STATIC}
${RM} *.obj *~* ${SHARED_OBJECTS} ${STATIC_OBJECTS} ${SHARED_LIB} ${STATIC_LIB}
${OBJECTS}:
${SHARED_OBJECTS}:
${CPP} ${CPPFLAGS} -fpic -o $@ ${@:%.o=%.cpp}
${OBJECTS_STATIC}:
${STATIC_OBJECTS}:
${CPP} ${CPPFLAGS} -o $@ ${@:%.s.o=%.cpp}
STATIC_LIB:
ar rcs ${STATIC} ${OBJECTS_STATIC}

View File

@ -81,6 +81,26 @@ public:
{
return 31;
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
channel->reportConsumerStopped(consumerTag());
// done
return true;
}
};
/**

View File

@ -79,6 +79,26 @@ public:
{
return _consumerTag;
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
channel->reportConsumerStarted(consumerTag());
// done
return true;
}
};
/**

View File

@ -155,6 +155,26 @@ public:
{
return _redelivered.get(0);
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// construct the message
channel->message(*this);
// done
return true;
}
};
/**

View File

@ -101,6 +101,33 @@ public:
{
return 60;
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// is there a current message?
MessageImpl *message = channel->message();
if (!message) return false;
// store size
message->setBodySize(bodySize());
// and copy the meta data
message->set(_metadata);
// done
return true;
}
};
/**

View File

@ -1,40 +0,0 @@
/**
* BasicQosOkFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "basicqosokframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool BasicQosOKFrame::process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
channel->reportQosSet();
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -58,8 +58,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection);
virtual bool process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// report
channel->reportQosSet();
// done
return true;
}
};
/**

View File

@ -91,6 +91,35 @@ public:
{
return _payload;
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// is there a current message?
MessageImpl *message = channel->message();
if (!message) return false;
// store size
if (!message->append(_payload, _size)) return true;
// the message is complete
channel->reportDelivery();
// done
return true;
}
};
/**

View File

@ -1,42 +0,0 @@
/**
* ChannelCloseFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "channelcloseframe.h"
#include "channelcloseokframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ChannelCloseFrame::process(ConnectionImpl *connection)
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// send back an ok frame
connection->send(ChannelCloseOKFrame(this->channel()));
// what if channel doesn't exist?
if (!channel) return false;
// report to the handler
channel->reportError(text());
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -144,8 +144,23 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// send back an ok frame
connection->send(ChannelCloseOKFrame(this->channel()));
// what if channel doesn't exist?
if (!channel) return false;
// report to the handler
channel->reportError(text());
// done
return true;
}
};
/**

View File

@ -1,38 +0,0 @@
/**
* ChannelCloseOkFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "channelcloseokframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ChannelCloseOKFrame::process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is closed
channel->reportClosed();
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -62,7 +62,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is closed
channel->reportClosed();
// done
return true;
}
};
/**

View File

@ -1,39 +0,0 @@
/**
* ChannelFlowOkFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "channelflowokframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ChannelFlowOKFrame::process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// is the flow active?
if (active()) channel->reportResumed();
else channel->reportPaused();
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -84,7 +84,21 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if (!channel) return false;
// is the flow active?
if (active()) channel->reportResumed();
else channel->reportPaused();
// done
return true;
}
};
/**

View File

@ -6,8 +6,11 @@
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "basicdeliverframe.h"
#include "messageimpl.h"
#include "channelopenframe.h"
#include "channelflowframe.h"
#include "channelcloseokframe.h"
#include "channelcloseframe.h"
#include "transactionselectframe.h"
#include "transactioncommitframe.h"
@ -25,8 +28,8 @@
#include "basicheaderframe.h"
#include "bodyframe.h"
#include "basicqosframe.h"
#include <iostream>
#include "basicconsumeframe.h"
#include "basiccancelframe.h"
/**
* Set up namespace
@ -71,6 +74,10 @@ ChannelImpl::ChannelImpl(Channel *parent, Connection *connection, ChannelHandler
*/
ChannelImpl::~ChannelImpl()
{
// remove incoming message
if (_message) delete _message;
_message = nullptr;
// remove this channel from the connection
_connection->_implementation.remove(this);
@ -78,7 +85,10 @@ ChannelImpl::~ChannelImpl()
if (!connected()) return;
// close the channel now
// @todo is this ok?
close();
// do we have
}
/**
@ -390,6 +400,8 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
{
// @todo prevent crash when connection is destructed
// @todo do not copy the entire buffer to individual frames
// send the publish frame
send(BasicPublishFrame(_id, exchange, routingKey, flags & mandatory, flags & immediate));
@ -429,13 +441,44 @@ bool ChannelImpl::publish(const std::string &exchange, const std::string &routin
*/
bool ChannelImpl::setQos(uint16_t prefetchCount)
{
// set for the entire connection
// send a qos frame
send(BasicQosFrame(_id, prefetchCount, false));
// done
return true;
}
/**
* Tell the RabbitMQ server that we're ready to consume messages
* @param queue the queue from which you want to consume
* @param tag a consumer tag that will be associated with this consume operation
* @param flags additional flags
* @param arguments additional arguments
* @return bool
*/
bool ChannelImpl::consume(const std::string &queue, const std::string &tag, int flags, const Table &arguments)
{
// send a consume frame
send(BasicConsumeFrame(_id, queue, tag, flags & nolocal, flags & noack, flags & exclusive, flags & nowait, arguments));
// done
return true;
}
/**
* Cancel a running consumer
* @param tag the consumer tag
* @param flags optional flags
*/
bool ChannelImpl::cancel(const std::string &tag, int flags)
{
// send a cancel frame
send(BasicCancelFrame(_id, tag, flags & nowait));
// done
return true;
}
/**
* Send a frame over the channel
* @param frame frame to send
@ -447,6 +490,28 @@ size_t ChannelImpl::send(const Frame &frame)
return _connection->_implementation.send(frame);
}
/**
* Report the consumed message
*/
void ChannelImpl::reportDelivery()
{
if (_handler) _handler->onConsumed(_parent, *_message);
}
/**
* Create an incoming message
* @param frame
* @return MessageImpl
*/
MessageImpl *ChannelImpl::message(const BasicDeliverFrame &frame)
{
// it should not be possible that a message already exists, but lets check it anyhow
if (_message) delete _message;
// construct a message
return _message = new MessageImpl(frame);
}
/**
* End of namespace
*/

View File

@ -1,38 +0,0 @@
/**
* ChannelOpenOkFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "channelopenokframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ChannelOpenOKFrame::process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is open
channel->reportReady();
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -68,7 +68,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is open
channel->reportReady();
// done
return true;
}
};
/**

View File

@ -1,39 +0,0 @@
/**
* ConnectionCloseFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "connectioncloseframe.h"
#include "connectioncloseokframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ConnectionCloseFrame::process(ConnectionImpl *connection)
{
// @todo connection could be destructed after frame was sent
// send back the ok frame
connection->send(ConnectionCloseOKFrame());
// no need to check for a channel, the error is connection wide
// report the error on the connection
connection->reportError(text());
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -144,8 +144,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// @todo connection could be destructed after frame was sent
// send back the ok frame
connection->send(ConnectionCloseOKFrame());
// no need to check for a channel, the error is connection wide
// report the error on the connection
connection->reportError(text());
// done
return true;
}
};
/**

View File

@ -7,8 +7,6 @@
*/
#include "includes.h"
#include "protocolheaderframe.h"
#include "exception.h"
#include "protocolexception.h"
/**
* set namespace
@ -109,6 +107,8 @@ void ConnectionImpl::remove(ChannelImpl *channel)
*/
size_t ConnectionImpl::parse(char *buffer, size_t size)
{
// @todo do not parse if already in an error state
// number of bytes processed
size_t processed = 0;

View File

@ -1,32 +0,0 @@
/**
* ConnectionOpenOKFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "connectionopenokframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ConnectionOpenOKFrame::process(ConnectionImpl *connection)
{
// all is ok, mark the connection as connected
connection->setConnected();
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -77,7 +77,14 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// all is ok, mark the connection as connected
connection->setConnected();
// done
return true;
}
};
/**

View File

@ -1,50 +0,0 @@
/**
* ConnectionStartFrame.h
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "connectionstartframe.h"
#include "connectionstartokframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the connection start frame
* @param connection
* @return bool
* @internal
*/
bool ConnectionStartFrame::process(ConnectionImpl *connection)
{
// @todo we must still be in protocol handshake mode
// the peer properties
Table properties;
// fill the peer properties
properties["product"] = "Copernica AMQP library";
properties["version"] = "0.1";
properties["platform"] = "Ubuntu";
properties["copyright"] = "Copyright 2014 Copernica BV";
properties["information"] = "";
// move connection to handshake mode
connection->setProtocolOk();
// send back a connection start ok frame
connection->send(ConnectionStartOKFrame(properties, "PLAIN", connection->login().saslPlain(), "en_US"));
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -171,8 +171,30 @@ public:
* @return bool
* @internal
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// @todo we must still be in protocol handshake mode
// the peer properties
Table properties;
// fill the peer properties
properties["product"] = "Copernica AMQP library";
properties["version"] = "0.1";
properties["platform"] = "Ubuntu";
properties["copyright"] = "Copyright 2014 Copernica BV";
properties["information"] = "";
// move connection to handshake mode
connection->setProtocolOk();
// send back a connection start ok frame
connection->send(ConnectionStartOKFrame(properties, "PLAIN", connection->login().saslPlain(), "en_US"));
// done
return true;
}
};
/**

View File

@ -1,48 +0,0 @@
/**
* ConnectionTuneFrame.cpp
*
* @copyright 2014
*/
#include "includes.h"
#include "connectiontuneframe.h"
#include "connectiontuneokframe.h"
#include "connectionopenframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ConnectionTuneFrame::process(ConnectionImpl *connection)
{
// @todo this is only allowed when the connection is set up
// remember this in the connection
connection->setCapacity(channelMax(), frameMax());
// theoretically it is possible that the connection object gets destructed between sending the messages
Monitor monitor(connection);
// send it back
connection->send(ConnectionTuneOKFrame(channelMax(), frameMax(), heartbeat()));
// check if the connection object still exists
if (!monitor.valid()) return true;
// and finally we start to open the frame
connection->send(ConnectionOpenFrame(connection->vhost()));
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -129,7 +129,29 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// @todo this is only allowed when the connection is set up
// remember this in the connection
connection->setCapacity(channelMax(), frameMax());
// theoretically it is possible that the connection object gets destructed between sending the messages
Monitor monitor(connection);
// send it back
connection->send(ConnectionTuneOKFrame(channelMax(), frameMax(), heartbeat()));
// check if the connection object still exists
if (!monitor.valid()) return true;
// and finally we start to open the frame
connection->send(ConnectionOpenFrame(connection->vhost()));
// done
return true;
}
};
/**

View File

@ -1,32 +0,0 @@
/**
* Exchangebindokframe.cpp
*/
#include "includes.h"
#include "exchangebindokframe.h"
// setup namespace
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ExchangeBindOKFrame::process(ConnectionImpl *connection)
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
channel->reportExchangeBound();
// done
return true;
}
// end namespace
}

View File

@ -58,8 +58,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
channel->reportExchangeBound();
// done
return true;
}
};
// end namespace

View File

@ -1,34 +0,0 @@
/**
* ExchangeDeclareOKFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "exchangedeclareokframe.h"
// setup namespace
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ExchangeDeclareOKFrame::process(ConnectionImpl *connection)
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report exchange declare ok
channel->reportExchangeDeclared();
// done
return true;
}
// end namespace
}

View File

@ -61,8 +61,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report exchange declare ok
channel->reportExchangeDeclared();
// done
return true;
}
};
/**

View File

@ -1,38 +0,0 @@
/**
* ExchangeDeleteOKFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "exchangedeleteokframe.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ExchangeDeleteOKFrame::process(ConnectionImpl *connection)
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
channel->reportExchangeDeleted();
// done
return true;
}
/**
* End of namespace
*/
}

View File

@ -62,7 +62,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
channel->reportExchangeDeleted();
// done
return true;
}
};
/**

View File

@ -1,32 +0,0 @@
/**
* QueuePurgeOKFrame.cpp
*/
#include "includes.h"
#include "exchangeunbindokframe.h"
// setup namespace
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ExchangeUnbindOKFrame::process(ConnectionImpl *connection)
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
channel->reportExchangeUnbound();
// done
return true;
}
// end namespace
}

View File

@ -59,7 +59,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
channel->reportExchangeUnbound();
// done
return true;
}
};
// end namespace

View File

@ -1,33 +0,0 @@
/**
* ExtFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "exception.h"
#include "protocolexception.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool ExtFrame::process(ConnectionImpl *connection)
{
// this is an exception
throw ProtocolException("unimplemented frame type " + std::to_string(type()));
// unreachable
return false;
}
/**
* End of namespace
*/
}

View File

@ -137,8 +137,14 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection);
virtual bool process(ConnectionImpl *connection) override
{
// this is an exception
throw ProtocolException("unimplemented frame type " + std::to_string(type()));
// unreachable
return false;
}
};
/**

View File

@ -1,33 +0,0 @@
/**
* Frame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "exception.h"
#include "protocolexception.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool Frame::process(ConnectionImpl *connection)
{
// this is an exception
throw ProtocolException("unimplemented frame");
// unreachable
return false;
}
/**
* End of namespace
*/
}

View File

@ -59,7 +59,14 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection);
virtual bool process(ConnectionImpl *connection)
{
// this is an exception
throw ProtocolException("unimplemented frame");
// unreachable
return false;
}
};
/**

View File

@ -1,32 +0,0 @@
/**
* HeartbeatFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "heartbeatframe.h"
/**
* Namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool HeartbeatFrame::process(ConnectionImpl *connection)
{
// send back the same frame
connection->send(*this);
// done
return true;
}
/**
* End namespace
*/
}

View File

@ -55,8 +55,14 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// send back the same frame
connection->send(*this);
// done
return true;
}
};
/**

View File

@ -11,6 +11,8 @@
#include "../libamqp.h"
// classes that are very commonly used
#include "exception.h"
#include "protocolexception.h"
#include "frame.h"
#include "extframe.h"
#include "methodframe.h"

106
src/messageimpl.h Normal file
View File

@ -0,0 +1,106 @@
/**
* MessageImpl.h
*
* Implementation of the message object that is only accessible for the
* AMQP library internals
*
* @copyright 2014 Copernica BV
*/
/**
* Namespace
*/
namespace AMQP {
/**
* Class definition
*/
class MessageImpl : public Message
{
private:
/**
* How many bytes have been received?
* @var uint64_t
*/
uint64_t _received;
/**
* Was the buffer allocated by us?
* @var bool
*/
bool _selfAllocated;
public:
/**
* Constructor
* @param frame
*/
MessageImpl(const BasicDeliverFrame &frame) :
Message(frame.consumerTag(), frame.deliveryTag(), frame.redelivered(), frame.exchange(), frame.routingKey()),
_received(0), _selfAllocated(false)
{}
/**
* Destructor
*/
virtual ~MessageImpl()
{
// clear up memory if it was self allocated
if (_selfAllocated) delete[] _body;
}
/**
* Set the body size
* This field is set when the header is received
* @param uint64_t
*/
void setBodySize(uint64_t size)
{
_bodySize = size;
}
/**
* Append data
* @param buffer incoming data
* @param size size of the data
* @return bool true if the message is now complete
*/
bool append(const char *buffer, uint64_t size)
{
// is this the only data, and also direct complete?
if (_received == 0 && size >= _bodySize)
{
// we have everything
_body = buffer;
_received = _bodySize;
// done
return true;
}
else
{
// it does not yet fit, do we have to allocate?
if (!_body) _body = new char[_bodySize];
_selfAllocated = true;
// prevent that size is too big
if (size > _bodySize - _received) size = _bodySize - _received;
// append data
memcpy((char *)(_body + _received), buffer, size);
// we have more data now
_received += size;
// done
return _received >= _bodySize;
}
}
};
/**
* End of namespace
*/
}

View File

@ -1,33 +0,0 @@
/**
* MethodFrame.cpp
*
* @copyright 2014 Copernica BV
*/
#include "includes.h"
#include "exception.h"
#include "protocolexception.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool MethodFrame::process(ConnectionImpl *connection)
{
// this is an exception
throw ProtocolException("unimplemented frame type " + std::to_string(type()) + " class " + std::to_string(classID()) + " method " + std::to_string(methodID()));
// unreachable
return false;
}
/**
* End of namespace
*/
}

View File

@ -75,7 +75,14 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection);
virtual bool process(ConnectionImpl *connection) override
{
// this is an exception
throw ProtocolException("unimplemented frame type " + std::to_string(type()) + " class " + std::to_string(classID()) + " method " + std::to_string(methodID()));
// unreachable
return false;
}
};
/**

View File

@ -1,31 +0,0 @@
/**
* QueueDeclareOKFrame.cpp
*/
#include "includes.h"
#include "queuebindokframe.h"
// setup namespace
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool QueueBindOKFrame::process(ConnectionImpl *connection)
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
channel->reportQueueBound();
// done
return true;
}
}

View File

@ -60,8 +60,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report to handler
channel->reportQueueBound();
// done
return true;
}
};
/**

View File

@ -1,31 +0,0 @@
/**
* QueueDeclareOKFrame.cpp
*/
#include "includes.h"
#include "queuedeclareokframe.h"
// setup namespace
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool QueueDeclareOKFrame::process(ConnectionImpl *connection)
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// what if channel doesn't exist?
if (!channel) return false;
// report to the handler
channel->reportQueueDeclared(this->name(), this->messageCount(), this->consumerCount());
// done
return true;
}
}

View File

@ -124,7 +124,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// what if channel doesn't exist?
if (!channel) return false;
// report to the handler
channel->reportQueueDeclared(this->name(), this->messageCount(), this->consumerCount());
// done
return true;
}
};
/**

View File

@ -1,33 +0,0 @@
/**
* QueueDeleteOKFrame.cpp
*/
#include "includes.h"
#include "queuedeleteokframe.h"
// setup namespace
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool QueueDeleteOKFrame::process(ConnectionImpl *connection)
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report queue deletion success
channel->reportQueueDeleted(this->messageCount());
// done
return true;
}
// end namespace
}

View File

@ -85,8 +85,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report queue deletion success
channel->reportQueueDeleted(this->messageCount());
// done
return true;
}
};
/**

View File

@ -1,32 +0,0 @@
/**
* QueuePurgeOKFrame.cpp
*/
#include "includes.h"
#include "queuepurgeokframe.h"
// setup namespace
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool QueuePurgeOKFrame::process(ConnectionImpl *connection)
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report queue purge success
channel->reportQueuePurged(this->messageCount());
// done
return true;
}
// end namespace
}

View File

@ -85,7 +85,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report queue purge success
channel->reportQueuePurged(this->messageCount());
// done
return true;
}
};
/**

View File

@ -1,32 +0,0 @@
/**
* QueueUnbindOKFrame.cpp
*/
#include "includes.h"
#include "queueunbindokframe.h"
// setup namespace
namespace AMQP {
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
bool QueueUnbindOKFrame::process(ConnectionImpl *connection)
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report queue unbind success
channel->reportQueueUnbound();
// done
return true;
}
// end namespace
}

View File

@ -64,7 +64,20 @@ public:
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override;
virtual bool process(ConnectionImpl *connection) override
{
// check if we have a channel
ChannelImpl *channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report queue unbind success
channel->reportQueueUnbound();
// done
return true;
}
};
/**

View File

@ -6,24 +6,23 @@
* @documentation private
*/
#include "includes.h"
#include "bodyframe.h"
#include "heartbeatframe.h"
#include "connectionstartframe.h"
#include "connectionstartokframe.h"
#include "connectionstartframe.h"
#include "connectionsecureframe.h"
#include "connectionsecureokframe.h"
#include "connectiontuneframe.h"
#include "connectiontuneokframe.h"
#include "connectionopenframe.h"
#include "connectionopenokframe.h"
#include "connectioncloseframe.h"
#include "connectionopenframe.h"
#include "connectiontuneokframe.h"
#include "connectiontuneframe.h"
#include "connectioncloseokframe.h"
#include "connectioncloseframe.h"
#include "channelopenframe.h"
#include "channelopenokframe.h"
#include "channelflowframe.h"
#include "channelflowokframe.h"
#include "channelcloseframe.h"
#include "channelcloseokframe.h"
#include "channelcloseframe.h"
#include "exchangedeclareframe.h"
#include "exchangedeclareokframe.h"
#include "exchangedeleteframe.h"
@ -65,9 +64,9 @@
#include "transactioncommitokframe.h"
#include "transactionrollbackframe.h"
#include "transactionrollbackokframe.h"
#include "messageimpl.h"
#include "bodyframe.h"
#include "basicheaderframe.h"
#include "exception.h"
#include "protocolexception.h"
#include "framecheck.h"
#define TYPE_INVALID 0

View File

@ -355,11 +355,12 @@ void MyConnection::onQueueBound(AMQP::Channel *channel)
// show
std::cout << "AMQP Queue bound" << std::endl;
_connection->setQos(10);
// _channel->setQos(10);
// _connection->setQos(10);
_channel->setQos(1);
_channel->publish("my_exchange", "key", "this is the message");
_channel->consume("my_queue");
}
/**
@ -403,3 +404,38 @@ void MyConnection::onQosSet(AMQP::Channel *channel)
std::cout << "AMQP Qos set" << std::endl;
}
/**
* Method that is called when a consumer was started
* This is the result of a call to Channel::consume()
* @param channel the channel on which the consumer was started
* @param tag the consumer tag
*/
void MyConnection::onConsumerStarted(AMQP::Channel *channel, const std::string &tag)
{
// show
std::cout << "AMQP consumer started" << std::endl;
}
/**
* Method that is called when a message has been consumed
* @param channel the channel on which the consumer was started
* @param message the consumed message
*/
void MyConnection::onConsumed(AMQP::Channel *channel, const AMQP::Message &message)
{
// show
std::cout << "AMQP consumed: " << message.message() << std::endl;
}
/**
* Method that is called when a consumer was stopped
* This is the result of a call to Channel::cancel()
* @param channel the channel on which the consumer was stopped
* @param tag the consumer tag
*/
void MyConnection::onConsumerStopped(AMQP::Channel *channel, const std::string &tag)
{
// show
std::cout << "AMQP consumer stopped" << std::endl;
}

View File

@ -157,19 +157,19 @@ private:
* Method that is called when an exchange is bound
* @param channel
*/
virtual void onExchangeBound(AMQP::Channel *channel);
virtual void onExchangeBound(AMQP::Channel *channel) override;
/**
* Method that is called when an exchange is unbound
* @param channel
*/
virtual void onExchangeUnbound(AMQP::Channel *channel);
virtual void onExchangeUnbound(AMQP::Channel *channel) override;
/**
* Method that is called when an exchange is deleted
* @param channel
*/
virtual void onExchangeDeleted(AMQP::Channel *channel);
virtual void onExchangeDeleted(AMQP::Channel *channel) override;
/**
* Mehod that is called when an exchange is declared
@ -184,39 +184,62 @@ private:
* @param messageCount number of messages in queue
* @param consumerCount number of active consumers
*/
virtual void onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount);
virtual void onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) override;
/**
* Method that is called when a queue is bound
* @param channel
* @param
*/
virtual void onQueueBound(AMQP::Channel *channel);
virtual void onQueueBound(AMQP::Channel *channel) override;
/**
* Method that is called when a queue is deleted
* @param channel
* @param messageCount number of messages deleted along with the queue
*/
virtual void onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount);
virtual void onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount) override;
/**
* Method that is called when a queue is unbound
* @param channel
*/
virtual void onQueueUnbound(AMQP::Channel *channel);
virtual void onQueueUnbound(AMQP::Channel *channel) override;
/**
* Method that is called when a queue is purged
* @param messageCount number of message purged
*/
virtual void onQueuePurged(AMQP::Channel *channel, uint32_t messageCount);
virtual void onQueuePurged(AMQP::Channel *channel, uint32_t messageCount) override;
/**
* Method that is called when the quality-of-service was changed
* This is the result of a call to Channel::setQos()
*/
virtual void onQosSet(AMQP::Channel *channel);
virtual void onQosSet(AMQP::Channel *channel) override;
/**
* Method that is called when a consumer was started
* This is the result of a call to Channel::consume()
* @param channel the channel on which the consumer was started
* @param tag the consumer tag
*/
virtual void onConsumerStarted(AMQP::Channel *channel, const std::string &tag) override;
/**
* Method that is called when a message has been consumed
* @param channel the channel on which the consumer was started
* @param message the consumed message
*/
virtual void onConsumed(AMQP::Channel *channel, const AMQP::Message &message) override;
/**
* Method that is called when a consumer was stopped
* This is the result of a call to Channel::cancel()
* @param channel the channel on which the consumer was stopped
* @param tag the consumer tag
*/
virtual void onConsumerStopped(AMQP::Channel *channel, const std::string &tag) override;
public: