204 lines
5.3 KiB
C++
204 lines
5.3 KiB
C++
|
|
/**
|
||
|
|
* ConnectionImpl.cpp
|
||
|
|
*
|
||
|
|
* Implementation of an AMQP connection
|
||
|
|
*
|
||
|
|
* @copyright 2014 Copernica BV
|
||
|
|
*/
|
||
|
|
#include "includes.h"
|
||
|
|
#include "protocolheaderframe.h"
|
||
|
|
#include "exception.h"
|
||
|
|
#include "protocolexception.h"
|
||
|
|
|
||
|
|
/**
|
||
|
|
* set namespace
|
||
|
|
*/
|
||
|
|
namespace AMQP {
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Destructor
|
||
|
|
*/
|
||
|
|
ConnectionImpl::~ConnectionImpl()
|
||
|
|
{
|
||
|
|
// still connected
|
||
|
|
if (_state == state_invalid) return;
|
||
|
|
|
||
|
|
// still in a connected state - should we send the close frame?
|
||
|
|
close();
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Initialize the connection
|
||
|
|
*/
|
||
|
|
void ConnectionImpl::initialize()
|
||
|
|
{
|
||
|
|
// we need a protocol header
|
||
|
|
ProtocolHeaderFrame header;
|
||
|
|
|
||
|
|
// send out the protocol header
|
||
|
|
send(header);
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Add a channel to the connection, and return the channel ID that it
|
||
|
|
* is allowed to use, or 0 when no more ID's are available
|
||
|
|
* @param channel
|
||
|
|
* @return uint16_t
|
||
|
|
*/
|
||
|
|
uint16_t ConnectionImpl::add(ChannelImpl *channel)
|
||
|
|
{
|
||
|
|
// check if we have exceeded the limit already
|
||
|
|
if (_maxChannels > 0 && _channels.size() >= _maxChannels) return 0;
|
||
|
|
|
||
|
|
// keep looping to find an id that is not in use
|
||
|
|
while (true)
|
||
|
|
{
|
||
|
|
// is this id in use?
|
||
|
|
if (_nextFreeChannel > 0 && _channels.find(_nextFreeChannel) == _channels.end()) break;
|
||
|
|
|
||
|
|
// id is in use, move on
|
||
|
|
_nextFreeChannel++;
|
||
|
|
}
|
||
|
|
|
||
|
|
// we have a new channel
|
||
|
|
_channels[_nextFreeChannel] = channel;
|
||
|
|
|
||
|
|
// done
|
||
|
|
return _nextFreeChannel++;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Remove a channel
|
||
|
|
* @param channel
|
||
|
|
*/
|
||
|
|
void ConnectionImpl::remove(ChannelImpl *channel)
|
||
|
|
{
|
||
|
|
// skip zero channel
|
||
|
|
if (channel->id() == 0) return;
|
||
|
|
|
||
|
|
// remove it
|
||
|
|
_channels.erase(channel->id());
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Parse the buffer into a recognized frame
|
||
|
|
*
|
||
|
|
* Every time that data comes in on the connection, you should call this method to parse
|
||
|
|
* the incoming data, and let it handle by the AMQP library. This method returns the number
|
||
|
|
* of bytes that were processed.
|
||
|
|
*
|
||
|
|
* If not all bytes could be processed because it only contained a partial frame, you should
|
||
|
|
* call this same method later on when more data is available. The AMQP library does not do
|
||
|
|
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
|
||
|
|
* later call.
|
||
|
|
*
|
||
|
|
* @param buffer buffer to decode
|
||
|
|
* @param size size of the buffer to decode
|
||
|
|
* @return number of bytes that were processed
|
||
|
|
*/
|
||
|
|
size_t ConnectionImpl::parse(char *buffer, size_t size)
|
||
|
|
{
|
||
|
|
// number of bytes processed
|
||
|
|
size_t processed = 0;
|
||
|
|
|
||
|
|
// create a monitor object that checks if the connection still exists
|
||
|
|
Monitor monitor(this);
|
||
|
|
|
||
|
|
// keep looping until we have processed all bytes, and the monitor still
|
||
|
|
// indicates that the connection is in a valid state
|
||
|
|
while (size > 0 && monitor.valid())
|
||
|
|
{
|
||
|
|
// prevent protocol exceptions
|
||
|
|
try
|
||
|
|
{
|
||
|
|
// try to recognize the frame
|
||
|
|
ReceivedFrame receivedFrame(buffer, size, _maxFrame);
|
||
|
|
if (!receivedFrame.complete()) return processed;
|
||
|
|
|
||
|
|
// process the frame
|
||
|
|
receivedFrame.process(this);
|
||
|
|
|
||
|
|
// number of bytes processed
|
||
|
|
size_t bytes = receivedFrame.totalSize();
|
||
|
|
|
||
|
|
// add bytes
|
||
|
|
processed += bytes; size -= bytes; buffer += bytes;
|
||
|
|
}
|
||
|
|
catch (const ProtocolException &exception)
|
||
|
|
{
|
||
|
|
// something terrible happened on the protocol (like data out of range)
|
||
|
|
reportConnectionError(exception.what());
|
||
|
|
|
||
|
|
// done
|
||
|
|
return processed;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// done
|
||
|
|
return processed;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Close the connection
|
||
|
|
* This will close all channels
|
||
|
|
* @return bool
|
||
|
|
*/
|
||
|
|
bool ConnectionImpl::close()
|
||
|
|
{
|
||
|
|
// leap out if not yet connected
|
||
|
|
if (_state != state_connected) return false;
|
||
|
|
|
||
|
|
// loop over all channels
|
||
|
|
for (auto iter = _channels.begin(); iter != _channels.end(); iter++)
|
||
|
|
{
|
||
|
|
// close the channel
|
||
|
|
iter->second->close();
|
||
|
|
}
|
||
|
|
|
||
|
|
// we're in a new state
|
||
|
|
_state = state_invalid;
|
||
|
|
|
||
|
|
// done
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Send a frame over the connection
|
||
|
|
* @param frame The frame to send
|
||
|
|
* @return size_t Number of bytes sent
|
||
|
|
*/
|
||
|
|
size_t ConnectionImpl::send(const Frame &frame)
|
||
|
|
{
|
||
|
|
std::cout << "send frame of " << frame.totalSize() << " bytes" << std::endl;
|
||
|
|
|
||
|
|
// we need an output buffer
|
||
|
|
OutBuffer buffer(frame.totalSize());
|
||
|
|
|
||
|
|
// fill the buffer
|
||
|
|
frame.fill(buffer);
|
||
|
|
|
||
|
|
// append an end of frame byte (but not when still negotiating the protocol)
|
||
|
|
if (_state != state_protocol) buffer.add((uint8_t)206);
|
||
|
|
|
||
|
|
// send the buffer
|
||
|
|
_handler->onData(_parent, buffer.data(), buffer.size());
|
||
|
|
|
||
|
|
// done
|
||
|
|
return buffer.size();
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Set the Quality of Service (QOS) of the entire connection
|
||
|
|
* @param prefetchSize maximum size (in octets) of messages to be prefetched
|
||
|
|
* @param prefetchCount maximum number of messages to prefetch
|
||
|
|
* @return bool whether the Qos frame is sent.
|
||
|
|
*/
|
||
|
|
//bool Connection::setQOS(uint32_t prefetchSize = 0, uint16_t prefetchCount = 0)
|
||
|
|
//{
|
||
|
|
// BasicQosFrame *frame = new BasicQosFrame(0, prefetchSize, prefetchCount, true);
|
||
|
|
// if(_connection->send(frame->buffer(), frame->totalSize()) != frame->totalSize()) return false;
|
||
|
|
//
|
||
|
|
// return true;
|
||
|
|
//}
|
||
|
|
|
||
|
|
}
|