Some bugfixes and performance enhancements
This commit is contained in:
parent
45deeaa754
commit
fcc9522e16
|
|
@ -149,7 +149,7 @@ public:
|
||||||
* Get the byte value
|
* Get the byte value
|
||||||
* @return value
|
* @return value
|
||||||
*/
|
*/
|
||||||
uint8_t value()
|
uint8_t value() const
|
||||||
{
|
{
|
||||||
return _byte;
|
return _byte;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -146,7 +146,7 @@ public:
|
||||||
* What is the state of the connection - is the protocol handshake completed?
|
* What is the state of the connection - is the protocol handshake completed?
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
bool protocolOk()
|
bool protocolOk() const
|
||||||
{
|
{
|
||||||
// must be busy doing the connection handshake, or already connected
|
// must be busy doing the connection handshake, or already connected
|
||||||
return _state == state_handshake || _state == state_connected;
|
return _state == state_handshake || _state == state_connected;
|
||||||
|
|
@ -209,7 +209,7 @@ public:
|
||||||
* The max frame size
|
* The max frame size
|
||||||
* @return uint32_t
|
* @return uint32_t
|
||||||
*/
|
*/
|
||||||
uint32_t maxFrame()
|
uint32_t maxFrame() const
|
||||||
{
|
{
|
||||||
return _maxFrame;
|
return _maxFrame;
|
||||||
}
|
}
|
||||||
|
|
@ -218,7 +218,7 @@ public:
|
||||||
* The max payload size for body frames
|
* The max payload size for body frames
|
||||||
* @return uint32_t
|
* @return uint32_t
|
||||||
*/
|
*/
|
||||||
uint32_t maxPayload()
|
uint32_t maxPayload() const
|
||||||
{
|
{
|
||||||
// 8 bytes for header and end-of-frame byte
|
// 8 bytes for header and end-of-frame byte
|
||||||
return _maxFrame - 8;
|
return _maxFrame - 8;
|
||||||
|
|
|
||||||
|
|
@ -110,7 +110,7 @@ protected:
|
||||||
// this is the same as a regular success message
|
// this is the same as a regular success message
|
||||||
return reportSuccess();
|
return reportSuccess();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Indicate failure
|
* Indicate failure
|
||||||
* @param error Description of the error that occured
|
* @param error Description of the error that occured
|
||||||
|
|
@ -148,7 +148,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Protected constructor that can only be called
|
* Protected constructor that can only be called
|
||||||
* from within the channel implementation
|
* from within the channel implementation
|
||||||
*
|
*
|
||||||
* Note: this constructor _should_ be protected, but because make_shared
|
* Note: this constructor _should_ be protected, but because make_shared
|
||||||
* will then not work, we have decided to make it public after all,
|
* will then not work, we have decided to make it public after all,
|
||||||
* because the work-around would result in not-so-easy-to-read code.
|
* because the work-around would result in not-so-easy-to-read code.
|
||||||
|
|
@ -156,7 +156,7 @@ public:
|
||||||
* @param failed are we already failed?
|
* @param failed are we already failed?
|
||||||
*/
|
*/
|
||||||
Deferred(bool failed = false) : _failed(failed) {}
|
Deferred(bool failed = false) : _failed(failed) {}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Deleted copy and move constructors
|
* Deleted copy and move constructors
|
||||||
|
|
@ -167,7 +167,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Destructor
|
* Destructor
|
||||||
*/
|
*/
|
||||||
virtual ~Deferred()
|
virtual ~Deferred()
|
||||||
{
|
{
|
||||||
// report to the finalize callback
|
// report to the finalize callback
|
||||||
if (_finalizeCallback) _finalizeCallback();
|
if (_finalizeCallback) _finalizeCallback();
|
||||||
|
|
@ -176,7 +176,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Cast to a boolean
|
* Cast to a boolean
|
||||||
*/
|
*/
|
||||||
operator bool ()
|
operator bool () const
|
||||||
{
|
{
|
||||||
return !_failed;
|
return !_failed;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -162,7 +162,7 @@ public:
|
||||||
* @param value
|
* @param value
|
||||||
* @return FieldProxy
|
* @return FieldProxy
|
||||||
*/
|
*/
|
||||||
FieldProxy& operator=(const DecimalField value)
|
FieldProxy& operator=(const DecimalField &value)
|
||||||
{
|
{
|
||||||
// assign value and allow chaining
|
// assign value and allow chaining
|
||||||
_source->set(_index, DecimalField(value));
|
_source->set(_index, DecimalField(value));
|
||||||
|
|
|
||||||
|
|
@ -258,7 +258,7 @@ public:
|
||||||
* This is an alias for retrieving the delivery mode and checking if it is set to 2
|
* This is an alias for retrieving the delivery mode and checking if it is set to 2
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
bool persistent()
|
bool persistent() const
|
||||||
{
|
{
|
||||||
return hasDeliveryMode() && deliveryMode() == 2;
|
return hasDeliveryMode() && deliveryMode() == 2;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* Monitor.h
|
* Monitor.h
|
||||||
*
|
*
|
||||||
* A monitor object monitors if the connection is still valid. When the
|
* A monitor object monitors if the connection is still valid. When the
|
||||||
* connection is parsing incoming data, it calls the user handler for each
|
* connection is parsing incoming data, it calls the user handler for each
|
||||||
* incoming frame. However, it is unknown what this handler is going to do,
|
* incoming frame. However, it is unknown what this handler is going to do,
|
||||||
* it could for example decide to destruct the connection object. In that
|
* it could for example decide to destruct the connection object. In that
|
||||||
|
|
@ -46,7 +46,7 @@ public:
|
||||||
// register with the watchable
|
// register with the watchable
|
||||||
_watchable->add(this);
|
_watchable->add(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destructor
|
* Destructor
|
||||||
*/
|
*/
|
||||||
|
|
@ -55,22 +55,22 @@ public:
|
||||||
// remove from watchable
|
// remove from watchable
|
||||||
if (_watchable) _watchable->remove(this);
|
if (_watchable) _watchable->remove(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the object is valid
|
* Check if the object is valid
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
bool valid()
|
bool valid() const
|
||||||
{
|
{
|
||||||
return _watchable != nullptr;
|
return _watchable != nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The watchable can access private data
|
* The watchable can access private data
|
||||||
*/
|
*/
|
||||||
friend class Watchable;
|
friend class Watchable;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -79,4 +79,4 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -103,10 +103,10 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return whether to acknowledgement multiple messages
|
* Return whether to acknowledge multiple messages
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
bool multiple()
|
bool multiple() const
|
||||||
{
|
{
|
||||||
return _multiple.get(0);
|
return _multiple.get(0);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
/**
|
/**
|
||||||
* Class describing a basic negative-acknowledgement frame
|
* Class describing a basic negative-acknowledgement frame
|
||||||
*
|
*
|
||||||
* @copyright 2014 Copernica BV
|
* @copyright 2014 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
@ -37,10 +37,10 @@ protected:
|
||||||
{
|
{
|
||||||
// call base
|
// call base
|
||||||
BasicFrame::fill(buffer);
|
BasicFrame::fill(buffer);
|
||||||
|
|
||||||
// add the delivery tag
|
// add the delivery tag
|
||||||
buffer.add(_deliveryTag);
|
buffer.add(_deliveryTag);
|
||||||
|
|
||||||
// add the booleans
|
// add the booleans
|
||||||
_bits.fill(buffer);
|
_bits.fill(buffer);
|
||||||
}
|
}
|
||||||
|
|
@ -54,20 +54,20 @@ public:
|
||||||
* @param multiple nack mutiple messages
|
* @param multiple nack mutiple messages
|
||||||
* @param requeue requeue the message
|
* @param requeue requeue the message
|
||||||
*/
|
*/
|
||||||
BasicNackFrame(uint16_t channel, uint64_t deliveryTag, bool multiple = false, bool requeue = false) :
|
BasicNackFrame(uint16_t channel, uint64_t deliveryTag, bool multiple = false, bool requeue = false) :
|
||||||
BasicFrame(channel, 9),
|
BasicFrame(channel, 9),
|
||||||
_deliveryTag(deliveryTag),
|
_deliveryTag(deliveryTag),
|
||||||
_bits(multiple, requeue) {}
|
_bits(multiple, requeue) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct based on received frame
|
* Construct based on received frame
|
||||||
* @param frame
|
* @param frame
|
||||||
*/
|
*/
|
||||||
BasicNackFrame(ReceivedFrame &frame) :
|
BasicNackFrame(ReceivedFrame &frame) :
|
||||||
BasicFrame(frame),
|
BasicFrame(frame),
|
||||||
_deliveryTag(frame.nextUint64()),
|
_deliveryTag(frame.nextUint64()),
|
||||||
_bits(frame) {}
|
_bits(frame) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destructor
|
* Destructor
|
||||||
*/
|
*/
|
||||||
|
|
@ -95,7 +95,7 @@ public:
|
||||||
* Return whether to acknowledgement multiple messages
|
* Return whether to acknowledgement multiple messages
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
bool multiple()
|
bool multiple() const
|
||||||
{
|
{
|
||||||
return _bits.get(0);
|
return _bits.get(0);
|
||||||
}
|
}
|
||||||
|
|
@ -104,7 +104,7 @@ public:
|
||||||
* Should the message be put back in the queue?
|
* Should the message be put back in the queue?
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
bool requeue()
|
bool requeue() const
|
||||||
{
|
{
|
||||||
return _bits.get(1);
|
return _bits.get(1);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
/**
|
/**
|
||||||
* Class describing an AMQP Body Frame
|
* Class describing an AMQP Body Frame
|
||||||
*
|
*
|
||||||
* @copyright 2014 Copernica BV
|
* @copyright 2014 Copernica BV
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
@ -11,7 +11,7 @@ namespace AMQP {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class implementation
|
* Class implementation
|
||||||
*/
|
*/
|
||||||
class BodyFrame : public ExtFrame
|
class BodyFrame : public ExtFrame
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
|
|
@ -21,12 +21,6 @@ private:
|
||||||
* @var const char *
|
* @var const char *
|
||||||
*/
|
*/
|
||||||
const char *_payload;
|
const char *_payload;
|
||||||
|
|
||||||
/**
|
|
||||||
* Size of the payload
|
|
||||||
* @var uint64_t
|
|
||||||
*/
|
|
||||||
uint32_t _size;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
/**
|
/**
|
||||||
|
|
@ -53,8 +47,7 @@ public:
|
||||||
*/
|
*/
|
||||||
BodyFrame(uint16_t channel, const char *payload, uint32_t size) :
|
BodyFrame(uint16_t channel, const char *payload, uint32_t size) :
|
||||||
ExtFrame(channel, size),
|
ExtFrame(channel, size),
|
||||||
_payload(payload),
|
_payload(payload)
|
||||||
_size(size)
|
|
||||||
{}
|
{}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -63,10 +56,9 @@ public:
|
||||||
* @param frame received frame to decode
|
* @param frame received frame to decode
|
||||||
* @return shared pointer to newly created frame
|
* @return shared pointer to newly created frame
|
||||||
*/
|
*/
|
||||||
BodyFrame(ReceivedFrame& frame) :
|
BodyFrame(ReceivedFrame& frame) :
|
||||||
ExtFrame(frame),
|
ExtFrame(frame),
|
||||||
_payload(frame.nextData(frame.payloadSize())),
|
_payload(frame.nextData(frame.payloadSize()))
|
||||||
_size(frame.payloadSize())
|
|
||||||
{}
|
{}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -77,7 +69,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Return the type of frame
|
* Return the type of frame
|
||||||
* @return uint8_t
|
* @return uint8_t
|
||||||
*/
|
*/
|
||||||
uint8_t type() const
|
uint8_t type() const
|
||||||
{
|
{
|
||||||
return 3;
|
return 3;
|
||||||
|
|
@ -91,7 +83,7 @@ public:
|
||||||
{
|
{
|
||||||
return _payload;
|
return _payload;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the frame
|
* Process the frame
|
||||||
* @param connection The connection over which it was received
|
* @param connection The connection over which it was received
|
||||||
|
|
@ -101,25 +93,25 @@ public:
|
||||||
{
|
{
|
||||||
// we need the appropriate channel
|
// we need the appropriate channel
|
||||||
auto channel = connection->channel(this->channel());
|
auto channel = connection->channel(this->channel());
|
||||||
|
|
||||||
// channel does not exist
|
// channel does not exist
|
||||||
if (!channel) return false;
|
if (!channel) return false;
|
||||||
|
|
||||||
// is there a current message?
|
// is there a current message?
|
||||||
MessageImpl *message = channel->message();
|
MessageImpl *message = channel->message();
|
||||||
if (!message) return false;
|
if (!message) return false;
|
||||||
|
|
||||||
// store size
|
// store size
|
||||||
if (!message->append(_payload, _size)) return true;
|
if (!message->append(_payload, _size)) return true;
|
||||||
|
|
||||||
// the message is complete
|
// the message is complete
|
||||||
channel->reportMessage();
|
channel->reportMessage();
|
||||||
|
|
||||||
// done
|
// done
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ public:
|
||||||
* @param failingClass id of the failing class if applicable
|
* @param failingClass id of the failing class if applicable
|
||||||
* @param failingMethod id of the failing method if applicable
|
* @param failingMethod id of the failing method if applicable
|
||||||
*/
|
*/
|
||||||
ConnectionCloseFrame(uint16_t code, const std::string text, uint16_t failingClass = 0, uint16_t failingMethod = 0) :
|
ConnectionCloseFrame(uint16_t code, const std::string &text, uint16_t failingClass = 0, uint16_t failingMethod = 0) :
|
||||||
ConnectionFrame(text.length() + 7), // 1 for extra string byte, 2 for each uint16
|
ConnectionFrame(text.length() + 7), // 1 for extra string byte, 2 for each uint16
|
||||||
_code(code),
|
_code(code),
|
||||||
_text(text),
|
_text(text),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue