basic.get is asynchronous until the entire body has been received, only then subsequent messages are sent + first work in progress on implementing smarter buffers

This commit is contained in:
Emiel Bruijntjes 2014-08-01 11:55:07 +02:00
parent b97222c4db
commit e299aa5fac
7 changed files with 138 additions and 6 deletions

View File

@ -37,6 +37,8 @@
#include <amqpcpp/monitor.h>
// amqp types
#include <amqpcpp/buffer.h>
#include <amqpcpp/bytebuffer.h>
#include <amqpcpp/field.h>
#include <amqpcpp/numericfield.h>
#include <amqpcpp/decimalfield.h>

42
include/buffer.h Normal file
View File

@ -0,0 +1,42 @@
/**
* Buffer.h
*
* Interface that can be implemented by client applications and that
* is parsed to the Connection::parse() method.
*
* Normally, the Connection::parse() method is fed with a byte
* array. However, if you're receiving big frames, it may be inconvenient
* to copy these big frames into continguous byte arrays, and you
* prefer using objects that internally use linked lists or other
* ways to store the bytes. In such sitations, you can implement this
* interface and pass that to the connection.
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2014 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Namespace
*/
namespace AMQP {
/**
* Class definition
*/
class Buffer
{
};
/**
* End of namespace
*/
}

57
include/bytebuffer.h Normal file
View File

@ -0,0 +1,57 @@
/**
* ByteByffer.h
*
* Very simple implementation of the buffer class that simply wraps
* around a buffer of bytes
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2014 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Open namespace
*/
namespace AMQP {
/**
* Class definition
*/
class ByteBuffer : public Buffer
{
private:
/**
* The actual byte buffer
* @var const char *
*/
const char *_data;
/**
* Size of the buffer
* @var size_t
*/
size_t _size;
public:
/**
* Constructor
* @param data
* @param size
*/
ByteBuffer(const char *data, size_t size) : _data(data), _size(size) {}
/**
* Destructor
*/
virtual ~ByteBuffer() {}
};
/**
* End namespace
*/
}

View File

@ -320,6 +320,7 @@ public:
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, envelope); }
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, Envelope(message)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, Envelope(message, size)); }
bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation.publish(exchange, routingKey, Envelope(message, strlen(message))); }
/**
* Set the Quality of Service (QOS) for this channel

View File

@ -77,8 +77,32 @@ public:
*/
size_t parse(const char *buffer, size_t size)
{
return _implementation.parse(buffer, size);
//return _implementation.parse(ByteBuffer(buffer, size));
return _implementation.parse(buffer, size);
}
/**
* Parse data that was recevied from RabbitMQ
*
* Every time that data comes in from RabbitMQ, you should call this method to parse
* the incoming data, and let it handle by the AMQP library. This method returns the number
* of bytes that were processed.
*
* If not all bytes could be processed because it only contained a partial frame, you should
* call this same method later on when more data is available. The AMQP library does not do
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
* later call.
*
* This method accepts a buffer object. This is an interface that is defined by the AMQP
* library, that can be implemented by you to allow faster access to a buffer.
*
* @param buffer buffer to decode
* @return number of bytes that were processed
*/
//size_t parse(const Buffer &buffer)
//{
// return _implementation.parse(buffer);
//}
/**
* Close the connection

View File

@ -177,8 +177,8 @@ public:
// construct the message
channel->message(*this);
// we're synchronized
channel->synchronized();
// notice that the channel is not yet synchronized here, because
// we first have to receive the entire body
// done
return true;

View File

@ -703,6 +703,15 @@ void ChannelImpl::reportMessage()
// skip if there is no message
if (!_message) return;
// after the report the channel may be destructed, monitor that
Monitor monitor(this);
// synchronize the channel if this comes from a basic.get frame
if (_message->consumer().empty()) synchronized();
// syncing the channel may destruct the channel
if (!monitor.valid()) return;
// look for the consumer
auto iter = _consumers.find(_message->consumer());
if (iter == _consumers.end()) return;
@ -710,9 +719,6 @@ void ChannelImpl::reportMessage()
// is this a valid callback method
if (!iter->second) return;
// after the report the channel may be destructed, monitor that
Monitor monitor(this);
// call the callback
_message->report(iter->second);