Merge pull request #134 from mgibula/master

Add Publisher Confirms support
This commit is contained in:
Emiel Bruijntjes 2018-05-14 22:42:56 +02:00 committed by GitHub
commit 025fdb5530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 578 additions and 1 deletions

View File

@ -974,6 +974,42 @@ knows in the database world. It is not possible to wrap all sort of
operations in a transaction, they are only meaningful for publishing
and consuming.
PUBLISHER CONFIRMS
===================
RabbitMQ supports lightweight method of confirming that broker received and processed
a message. For this method to work, the channel needs to be put in so-called _confirm mode_.
This is done using confirmSelect() method. When channel is successfully put in
confirm mode, the server and client count messages (starting from 1) and server sends
acknowledgments for every message it processed (it can also acknowledge multiple message at
once).
If server is unable to process a message, it will send send negative acknowledgments. Both
positive and negative acknowledgments handling are implemented as callbacks for Channel object.
````c++
// setup confirm mode and ack/nack callbacks
channel.confirmSelect().onSuccess([&]() {
// from this moment onwards ack/nack confirmations are comming in
channel.publish("my-exchange", "my-key", "my first message");
// message counter is now 1, will call onAck/onNack with deliverTag=1
channel.publish("my-exchange", "my-key", "my second message");
// message counter is now 2, will call onAck/onNack with deliverTag=2
}).onAck([&](uint64_t deliverTag, bool multiple) {
// deliverTag is message number
// multiple is set to true, if all messages UP TO deliverTag have been processed
}).onNack([&](uint64 deliveryTag, bool multiple, bool requeue) {
// deliverTag is message number
// multiple is set to true, if all messages UP TO deliverTag have not been processed
// requeue is to be ignored
});
````
For more information, please see http://www.rabbitmq.com/confirms.html.
CONSUMING MESSAGES
==================
@ -1107,7 +1143,6 @@ need additional attention:
- ability to set up secure connections (or is this fully done on the IO level)
- login with other protocols than login/password
- publish confirms
We also need to add more safety checks so that strange or invalid data from
RabbitMQ does not break the library (although in reality RabbitMQ only sends

View File

@ -70,6 +70,7 @@
#include "amqpcpp/deferredqueue.h"
#include "amqpcpp/deferreddelete.h"
#include "amqpcpp/deferredcancel.h"
#include "amqpcpp/deferredconfirm.h"
#include "amqpcpp/deferredget.h"
#include "amqpcpp/deferredpublisher.h"
#include "amqpcpp/channelimpl.h"

View File

@ -77,6 +77,13 @@ using ReturnedCallback = std::function<void()>;
using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;
using BounceCallback = std::function<void(const Message &message, int16_t code, const std::string &description)>;
/**
* When using publisher confirms, AckCallback is called when server confirms that message is received
* and processed. NackCallback is called otherwise.
*/
using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
using NackCallback = std::function<void(uint64_t deliveryTag, bool multiple, bool requeue)>;
/**
* End namespace
*/

View File

@ -127,6 +127,17 @@ public:
return _implementation->connected();
}
/**
* Put channel in a confirm mode (RabbitMQ specific)
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
DeferredConfirm &confirmSelect()
{
return _implementation->confirmSelect();
}
/**
* Start a transaction
*

View File

@ -42,6 +42,7 @@ class ConsumedMessage;
class ConnectionImpl;
class DeferredDelete;
class DeferredCancel;
class DeferredConfirm;
class DeferredQueue;
class DeferredGet;
class DeferredPublisher;
@ -80,6 +81,12 @@ private:
*/
std::shared_ptr<DeferredPublisher> _publisher;
/**
* Handler that deals with publisher confirms frames
* @var std::shared_ptr<DeferredConfirm>
*/
std::shared_ptr<DeferredConfirm> _confirm;
/**
* Handlers for all consumers that are active
* @var std::map<std::string,std::shared_ptr<DeferredConsumer>
@ -252,6 +259,11 @@ public:
return _state == state_connected || _state == state_ready;
}
/**
* Put channel in a confirm mode (RabbitMQ specific)
*/
DeferredConfirm &confirmSelect();
/**
* Start a transaction
*/
@ -715,6 +727,12 @@ public:
*/
DeferredPublisher *publisher() const { return _publisher.get(); }
/**
* Retrieve the deferred confirm that handles publisher confirms
* @return The deferred confirm object
*/
DeferredConfirm *confirm() const { return _confirm.get(); }
/**
* The channel class is its friend, thus can it instantiate this object
*/

View File

@ -24,6 +24,8 @@ class BasicDeliverFrame;
class BasicGetOKFrame;
class BasicHeaderFrame;
class BasicReturnFrame;
class BasicAckFrame;
class BasicNackFrame;
class BodyFrame;
class Channel;
class Connection;

View File

@ -0,0 +1,118 @@
/**
* DeferredConfirm.h
*
* Deferred callback for RabbitMQ-specific publisher confirms mechanism.
*
* @author Marcin Gibula <m.gibula@gmail.com>
* @copyright 2018 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Set up namespace
*/
namespace AMQP {
/**
* We extend from the default deferred and add extra functionality
*/
class DeferredConfirm : public Deferred
{
private:
/**
* Callback to execute when server confirms that message is processed
* @var AckCallback
*/
AckCallback _ackCallback;
/**
* Callback to execute when server sends negative acknowledgement
* @var NackCallback
*/
NackCallback _nackCallback;
/**
* Process an ACK frame
*
* @param frame The frame to process
*/
void process(BasicAckFrame &frame);
/**
* Process an ACK frame
*
* @param frame The frame to process
*/
void process(BasicNackFrame &frame);
/**
* The channel implementation may call our
* private members and construct us
*/
friend class ChannelImpl;
friend class BasicAckFrame;
friend class BasicNackFrame;
public:
/**
* Protected constructor that can only be called
* from within the channel implementation
*
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code.
*
* @param boolean are we already failed?
*/
DeferredConfirm(bool failed = false) : Deferred(failed) {}
public:
/**
* Register the function that is called when channel is put in publisher
* confirmed mode
* @param callback
*/
DeferredConfirm &onSuccess(const SuccessCallback &callback)
{
// call base
Deferred::onSuccess(callback);
// allow chaining
return *this;
}
/**
* Callback that is called when the broker confirmed message publication
* @param callback the callback to execute
*/
DeferredConfirm &onAck(const AckCallback &callback)
{
// store callback
_ackCallback = callback;
// allow chaining
return *this;
}
/**
* Callback that is called when the broker denied message publication
* @param callback the callback to execute
*/
DeferredConfirm &onNack(const NackCallback &callback)
{
// store callback
_nackCallback = callback;
// allow chaining
return *this;
}
};
/**
* End namespace
*/
}

View File

@ -111,6 +111,13 @@ private:
*/
bool processBasicFrame(ConnectionImpl *connection);
/**
* Process a confirm frame
* @param connection
* @return bool
*/
bool processConfirmFrame(ConnectionImpl *connection);
/**
* Process a transaction frame
* @param connection

View File

@ -29,6 +29,8 @@ add_sources(
channelimpl.cpp
channelopenframe.h
channelopenokframe.h
confirmselectframe.h
confirmselectokframe.h
connectioncloseframe.h
connectioncloseokframe.h
connectionframe.h
@ -43,6 +45,7 @@ add_sources(
connectiontuneokframe.h
consumedmessage.h
deferredcancel.cpp
deferredconfirm.cpp
deferredconsumer.cpp
deferredreceiver.cpp
deferredextreceiver.cpp

View File

@ -110,6 +110,32 @@ public:
{
return _multiple.get(0);
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// get the current confirm
auto confirm = channel->confirm();
// if there is no deferred confirm, we can just as well stop
if (confirm == nullptr) return false;
// process the frame
confirm->process(*this);
// done
return true;
}
};
/**

View File

@ -108,6 +108,32 @@ public:
{
return _bits.get(1);
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// get the current confirm
auto confirm = channel->confirm();
// if there is no deferred confirm, we can just as well stop
if (confirm == nullptr) return false;
// process the frame
confirm->process(*this);
// done
return true;
}
};
/**

View File

@ -14,6 +14,7 @@
#include "channelflowframe.h"
#include "channelcloseokframe.h"
#include "channelcloseframe.h"
#include "confirmselectframe.h"
#include "transactionselectframe.h"
#include "transactioncommitframe.h"
#include "transactionrollbackframe.h"
@ -180,6 +181,27 @@ Deferred &ChannelImpl::resume()
return push(ChannelFlowFrame(_id, true));
}
/**
* Put channel in a confirm mode
*
* This function returns a deferred handler. Callbacks can be installed
* using onSuccess(), onError() and onFinalize() methods.
*/
DeferredConfirm &ChannelImpl::confirmSelect()
{
// the frame to send
ConfirmSelectFrame frame(_id);
// send the frame, and create deferred object
_confirm = std::make_shared<DeferredConfirm>(!send(frame));
// push to list
push(_confirm);
// done
return *_confirm;
}
/**
* Start a transaction
*

56
src/confirmframe.h Normal file
View File

@ -0,0 +1,56 @@
/**
* Class describing an AMQP confirm frame
*
* @author Marcin Gibula <m.gibula@gmail.com>
* @copyright 2017 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class implementation
*/
class ConfirmFrame : public MethodFrame
{
protected:
/**
* Constructor
* @param channel channel identifier
* @param size frame size
*/
ConfirmFrame(uint16_t channel, uint32_t size) :
MethodFrame(channel, size)
{}
/**
* Constructor based on incoming frame
* @param frame
*/
ConfirmFrame(ReceivedFrame &frame) :
MethodFrame(frame)
{}
public:
/**
* Destructor
*/
virtual ~ConfirmFrame() {}
/**
* Class id
* @return uint16_t
*/
virtual uint16_t classID() const override
{
return 85;
}
};
/**
* end namespace
*/
}

88
src/confirmselectframe.h Normal file
View File

@ -0,0 +1,88 @@
/**
* Class describing an AMQP confirm select frame
*
* @author Marcin Gibula <m.gibula@gmail.com>
* @copyright 2017 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class implementation
*/
class ConfirmSelectFrame : public ConfirmFrame
{
private:
/**
* whether to wait for a response
* @var BooleanSet
*/
BooleanSet _noWait;
protected:
/**
* Encode a frame on a string buffer
*
* @param buffer buffer to write frame to
*/
virtual void fill(OutBuffer& buffer) const override
{
// call base
ConfirmFrame::fill(buffer);
// add boolean
_noWait.fill(buffer);
}
public:
/**
* Decode a confirm select frame from a received frame
*
* @param frame received frame to decode
*/
ConfirmSelectFrame(ReceivedFrame& frame) : ConfirmFrame(frame), _noWait(frame) {}
/**
* Construct a confirm select frame
*
* @param channel channel identifier
* @return newly created confirm select frame
*/
ConfirmSelectFrame(uint16_t channel, bool noWait = false) :
ConfirmFrame(channel, 1), //sizeof bool
_noWait(noWait)
{}
/**
* Destructor
*/
virtual ~ConfirmSelectFrame() {}
/**
* return the method id
* @return uint16_t
*/
virtual uint16_t methodID() const override
{
return 10;
}
/**
* Return whether to wait for a response
* @return boolean
*/
bool noWait() const
{
return _noWait.get(0);
}
};
/**
* end namespace
*/
}

View File

@ -0,0 +1,89 @@
/**
* Class describing an AMQP confirm select ok frame
*
* @author Marcin Gibula <m.gibula@gmail.com>
* @copyright 2017 Copernica BV
*/
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class implementation
*/
class ConfirmSelectOKFrame : public ConfirmFrame
{
protected:
/**
* Encode a frame on a string buffer
*
* @param buffer buffer to write frame to
*/
virtual void fill(OutBuffer& buffer) const override
{
// call base
ConfirmFrame::fill(buffer);
}
public:
/**
* Constructor for an incoming frame
*
* @param frame received frame to decode
*/
ConfirmSelectOKFrame(ReceivedFrame& frame) :
ConfirmFrame(frame)
{}
/**
* Construct a confirm select ok frame
*
* @param channel channel identifier
* @return newly created confirm select ok frame
*/
ConfirmSelectOKFrame(uint16_t channel) :
ConfirmFrame(channel, 0)
{}
/**
* Destructor
*/
virtual ~ConfirmSelectOKFrame() {}
/**
* return the method id
* @return uint16_t
*/
virtual uint16_t methodID() const override
{
return 11;
}
/**
* Process the frame
* @param connection The connection over which it was received
* @return bool Was it succesfully processed?
*/
virtual bool process(ConnectionImpl *connection) override
{
// we need the appropriate channel
auto channel = connection->channel(this->channel());
// channel does not exist
if(!channel) return false;
// report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized();
// done
return true;
}
};
/**
* end namespace
*/
}

42
src/deferredconfirm.cpp Normal file
View File

@ -0,0 +1,42 @@
/**
* DeferredConfirm.cpp
*
* Implementation file for the DeferredConfirm class
*
* @author Marcin Gibula <m.gibula@gmail.com>
* @copyright 2018 Copernica BV
*/
#include "includes.h"
#include "basicackframe.h"
#include "basicnackframe.h"
/**
* Namespace
*/
namespace AMQP {
/**
* Process an ACK frame
*
* @param frame The frame to process
*/
void DeferredConfirm::process(BasicAckFrame &frame)
{
if (_ackCallback) _ackCallback(frame.deliveryTag(), frame.multiple());
}
/**
* Process a NACK frame
*
* @param frame The frame to process
*/
void DeferredConfirm::process(BasicNackFrame &frame)
{
if (_nackCallback) _nackCallback(frame.deliveryTag(), frame.multiple(), frame.requeue());
}
/**
* End namespace
*/
}

View File

@ -72,6 +72,7 @@
#include "amqpcpp/deferredqueue.h"
#include "amqpcpp/deferreddelete.h"
#include "amqpcpp/deferredcancel.h"
#include "amqpcpp/deferredconfirm.h"
#include "amqpcpp/deferredget.h"
#include "amqpcpp/channelimpl.h"
#include "amqpcpp/channel.h"
@ -93,6 +94,7 @@
#include "exchangeframe.h"
#include "queueframe.h"
#include "basicframe.h"
#include "confirmframe.h"
#include "transactionframe.h"

View File

@ -7,6 +7,8 @@
*/
#include "includes.h"
#include "heartbeatframe.h"
#include "confirmselectframe.h"
#include "confirmselectokframe.h"
#include "connectionstartokframe.h"
#include "connectionstartframe.h"
#include "connectionsecureframe.h"
@ -336,6 +338,7 @@ bool ReceivedFrame::processMethodFrame(ConnectionImpl *connection)
case 40: return processExchangeFrame(connection);
case 50: return processQueueFrame(connection);
case 60: return processBasicFrame(connection);
case 85: return processConfirmFrame(connection);
case 90: return processTransactionFrame(connection);
}
@ -493,6 +496,27 @@ bool ReceivedFrame::processBasicFrame(ConnectionImpl *connection)
throw ProtocolException("unrecognized basic frame method " + std::to_string(methodID));
}
/**
* Process a confirm frame
* @param connection
* @return bool
*/
bool ReceivedFrame::processConfirmFrame(ConnectionImpl *connection)
{
// read the method id
uint16_t methodID = nextUint16();
// construct frame based on method id
switch (methodID)
{
case 10: return ConfirmSelectFrame(*this).process(connection);
case 11: return ConfirmSelectOKFrame(*this).process(connection);
}
// this is a problem
throw ProtocolException("unrecognized confirm frame method " + std::to_string(methodID));
}
/**
* Process a transaction frame
* @param connection