basic.get is asynchronous until the entire body has been received, only then subsequent messages are sent + first work in progress on implementing smarter buffers
This commit is contained in:
parent
dca76db2f0
commit
37a51cdc7b
|
|
@ -37,6 +37,8 @@
|
|||
#include <amqpcpp/monitor.h>
|
||||
|
||||
// amqp types
|
||||
#include <amqpcpp/buffer.h>
|
||||
#include <amqpcpp/bytebuffer.h>
|
||||
#include <amqpcpp/field.h>
|
||||
#include <amqpcpp/numericfield.h>
|
||||
#include <amqpcpp/decimalfield.h>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Buffer.h
|
||||
*
|
||||
* Interface that can be implemented by client applications and that
|
||||
* is parsed to the Connection::parse() method.
|
||||
*
|
||||
* Normally, the Connection::parse() method is fed with a byte
|
||||
* array. However, if you're receiving big frames, it may be inconvenient
|
||||
* to copy these big frames into continguous byte arrays, and you
|
||||
* prefer using objects that internally use linked lists or other
|
||||
* ways to store the bytes. In such sitations, you can implement this
|
||||
* interface and pass that to the connection.
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2014 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class Buffer
|
||||
{
|
||||
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* ByteByffer.h
|
||||
*
|
||||
* Very simple implementation of the buffer class that simply wraps
|
||||
* around a buffer of bytes
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2014 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Open namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class ByteBuffer : public Buffer
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* The actual byte buffer
|
||||
* @var const char *
|
||||
*/
|
||||
const char *_data;
|
||||
|
||||
/**
|
||||
* Size of the buffer
|
||||
* @var size_t
|
||||
*/
|
||||
size_t _size;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
* @param data
|
||||
* @param size
|
||||
*/
|
||||
ByteBuffer(const char *data, size_t size) : _data(data), _size(size) {}
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~ByteBuffer() {}
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* End namespace
|
||||
*/
|
||||
}
|
||||
|
|
@ -320,6 +320,7 @@ public:
|
|||
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, envelope); }
|
||||
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, Envelope(message)); }
|
||||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, Envelope(message, size)); }
|
||||
bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation.publish(exchange, routingKey, Envelope(message, strlen(message))); }
|
||||
|
||||
/**
|
||||
* Set the Quality of Service (QOS) for this channel
|
||||
|
|
|
|||
|
|
@ -77,8 +77,32 @@ public:
|
|||
*/
|
||||
size_t parse(const char *buffer, size_t size)
|
||||
{
|
||||
return _implementation.parse(buffer, size);
|
||||
//return _implementation.parse(ByteBuffer(buffer, size));
|
||||
return _implementation.parse(buffer, size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse data that was recevied from RabbitMQ
|
||||
*
|
||||
* Every time that data comes in from RabbitMQ, you should call this method to parse
|
||||
* the incoming data, and let it handle by the AMQP library. This method returns the number
|
||||
* of bytes that were processed.
|
||||
*
|
||||
* If not all bytes could be processed because it only contained a partial frame, you should
|
||||
* call this same method later on when more data is available. The AMQP library does not do
|
||||
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
|
||||
* later call.
|
||||
*
|
||||
* This method accepts a buffer object. This is an interface that is defined by the AMQP
|
||||
* library, that can be implemented by you to allow faster access to a buffer.
|
||||
*
|
||||
* @param buffer buffer to decode
|
||||
* @return number of bytes that were processed
|
||||
*/
|
||||
//size_t parse(const Buffer &buffer)
|
||||
//{
|
||||
// return _implementation.parse(buffer);
|
||||
//}
|
||||
|
||||
/**
|
||||
* Close the connection
|
||||
|
|
|
|||
|
|
@ -177,8 +177,8 @@ public:
|
|||
// construct the message
|
||||
channel->message(*this);
|
||||
|
||||
// we're synchronized
|
||||
channel->synchronized();
|
||||
// notice that the channel is not yet synchronized here, because
|
||||
// we first have to receive the entire body
|
||||
|
||||
// done
|
||||
return true;
|
||||
|
|
|
|||
|
|
@ -703,6 +703,15 @@ void ChannelImpl::reportMessage()
|
|||
// skip if there is no message
|
||||
if (!_message) return;
|
||||
|
||||
// after the report the channel may be destructed, monitor that
|
||||
Monitor monitor(this);
|
||||
|
||||
// synchronize the channel if this comes from a basic.get frame
|
||||
if (_message->consumer().empty()) synchronized();
|
||||
|
||||
// syncing the channel may destruct the channel
|
||||
if (!monitor.valid()) return;
|
||||
|
||||
// look for the consumer
|
||||
auto iter = _consumers.find(_message->consumer());
|
||||
if (iter == _consumers.end()) return;
|
||||
|
|
@ -710,9 +719,6 @@ void ChannelImpl::reportMessage()
|
|||
// is this a valid callback method
|
||||
if (!iter->second) return;
|
||||
|
||||
// after the report the channel may be destructed, monitor that
|
||||
Monitor monitor(this);
|
||||
|
||||
// call the callback
|
||||
_message->report(iter->second);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue