Make sure single gets are processed correctly as well

This commit is contained in:
David van Erkelens 2016-07-05 16:39:12 +02:00
parent 28d440449d
commit dddd16b74d
5 changed files with 39 additions and 4 deletions

View File

@ -663,11 +663,15 @@ public:
*
* @param consumertag The consumer tag
* @param consumer The consumer handler
* @param active Is this the new active consumer
*/
void install(std::string consumertag, const std::shared_ptr<DeferredConsumerBase> &consumer)
void install(std::string consumertag, const std::shared_ptr<DeferredConsumerBase> &consumer, bool active = false)
{
// install the consumer handler
_consumers[consumertag] = consumer;
// should we become the current consumer?
if (active) _consumer = consumer;
}
/**

View File

@ -28,6 +28,7 @@ namespace AMQP {
* Forward declarations
*/
class BasicDeliverFrame;
class BasicGetOKFrame;
class BasicHeaderFrame;
class BodyFrame;
@ -52,6 +53,13 @@ private:
*/
void process(BasicDeliverFrame &frame);
/**
* Process a delivery frame from a get request
*
* @param frame The frame to process
*/
void process(BasicGetOKFrame &frame);
/**
* Process the message headers
*
@ -85,6 +93,7 @@ private:
*/
friend class ChannelImpl;
friend class BasicDeliverFrame;
friend class BasicGetOKFrame;
friend class BasicHeaderFrame;
friend class BodyFrame;
protected:

View File

@ -173,8 +173,11 @@ public:
// report success for the get operation
channel->reportSuccess(messageCount(), deliveryTag(), redelivered());
// notice that the channel is not yet synchronized here, because
// we first have to receive the entire body
// check if we have a valid consumer
if (!channel->consumer()) return false;
// pass on to consumer
channel->consumer()->process(*this);
// done
return true;

View File

@ -12,6 +12,7 @@
*/
#include "../include/deferredconsumerbase.h"
#include "basicdeliverframe.h"
#include "basicgetokframe.h"
#include "basicheaderframe.h"
#include "bodyframe.h"
@ -38,6 +39,24 @@ void DeferredConsumerBase::process(BasicDeliverFrame &frame)
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
}
/**
* Process a delivery frame from a get request
*
* @param frame The frame to process
*/
void DeferredConsumerBase::process(BasicGetOKFrame &frame)
{
// retrieve the delivery tag and whether we were redelivered
_deliveryTag = frame.deliveryTag();
_redelivered = frame.redelivered();
// anybody interested in the new message?
if (_beginCallback) _beginCallback();
// do we have anybody interested in messages?
if (_messageCallback) _message.construct(frame.exchange(), frame.routingKey());
}
/**
* Process the message headers
*

View File

@ -31,7 +31,7 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess(uint32_t messagecoun
_redelivered = redelivered;
// install ourselves in the channel
_channel->install("", shared_from_this());
_channel->install("", shared_from_this(), true);
// report the size (note that this is the size _minus_ the message that is retrieved
// (and for which the callback will be called later), so it could be zero)