implemented TcpConnection, so that users of the library do not have to setup their own tcp connections
This commit is contained in:
parent
d18fd082d2
commit
f3955bcd51
|
|
@ -31,6 +31,8 @@ extern const int immediate;
|
|||
extern const int redelivered;
|
||||
extern const int multiple;
|
||||
extern const int requeue;
|
||||
extern const int readable;
|
||||
extern const int writable;
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
|
|
|
|||
|
|
@ -0,0 +1,99 @@
|
|||
/**
|
||||
* TcpConnection.h
|
||||
*
|
||||
* Extended Connection object that creates a TCP connection for the
|
||||
* IO between the client application and the RabbitMQ server.
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Set up namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Forward declarations
|
||||
*/
|
||||
class TcpState;
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class TcpConnection : public Connection, private ConnectionHandler
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* The state of the TCP connection - this state objecs changes based on
|
||||
* the state of the connection (resolving, connected or closed)
|
||||
*
|
||||
* @var TcpState
|
||||
*/
|
||||
TcpState *_state = nullptr;
|
||||
|
||||
|
||||
/**
|
||||
* Method that is called by the connection when data needs to be sent over the network
|
||||
* @param connection The connection that created this output
|
||||
* @param buffer Data to send
|
||||
* @param size Size of the buffer
|
||||
*/
|
||||
virtual void onData(Connection *connection, const char *buffer, size_t size) override;
|
||||
|
||||
/**
|
||||
* Method called when the connection ends up in an error state
|
||||
* @param connection The connection that entered the error state
|
||||
* @param message Error message
|
||||
*/
|
||||
virtual void onError(Connection *connection, const char *message) override;
|
||||
|
||||
/**
|
||||
* Method that is called when the connection is established
|
||||
* @param connection The connection that can now be used
|
||||
*/
|
||||
virtual void onConnected(Connection *connection) override;
|
||||
|
||||
/**
|
||||
* Method that is called when the connection was closed.
|
||||
* @param connection The connection that was closed and that is now unusable
|
||||
*/
|
||||
virtual void onClosed(Connection *connection) override;
|
||||
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
* @param handler User implemented handler object
|
||||
* @param hostname The address to connect to
|
||||
*/
|
||||
TcpConnection(TcpHandler *handler, const Address &address);
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~TcpConnection();
|
||||
|
||||
/**
|
||||
* Process the TCP connection
|
||||
*
|
||||
* This method should be called when the filedescriptor that is registered
|
||||
* in the event loop becomes active. You should pass in a flag holding the
|
||||
* flags AMQP::readable or AMQP::writable to indicate whether the descriptor
|
||||
* was readable or writable, or bitwise-or if it was both
|
||||
*
|
||||
* @param fd The filedescriptor that became readable or writable
|
||||
* @param events What sort of events occured?
|
||||
*/
|
||||
void process(int fd, int flags);
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* TcpHandler.h
|
||||
*
|
||||
* Interface to be implemented by the caller of the AMQP library in case
|
||||
* the "Tcp" class is being used as alternative for the ConnectionHandler
|
||||
* class.
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Set up namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Forward declarations
|
||||
*/
|
||||
class TcpConnection;
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class TcpHandler
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Method that is called when the TCP connection ends up in a connected state
|
||||
* @param connection The TCP connection
|
||||
*/
|
||||
virtual void onConnected(TcpConnection *connection) {}
|
||||
|
||||
/**
|
||||
* Method that is called when the TCP connection ends up in an error state
|
||||
* @param connection The TCP connection
|
||||
* @param message Error message
|
||||
*/
|
||||
virtual void onError(TcpConnection *connection, const char *message) {}
|
||||
|
||||
/**
|
||||
* Method that is called when the TCP connection is closed
|
||||
* @param connection The TCP connection
|
||||
*/
|
||||
virtual void onClosed(TcpConnection *connection) {}
|
||||
|
||||
/**
|
||||
* Monitor a filedescriptor for readability or writability
|
||||
*
|
||||
* When a TCP connection is opened, it creates a non-blocking socket
|
||||
* connection. This method is called to inform you about this socket,
|
||||
* so that you can include it in the event loop. When the socket becomes
|
||||
* active, you should call the "process()" method in the Tcp class.
|
||||
*
|
||||
* The flags is AMQP::readable if the filedescriptor should be monitored
|
||||
* for readability, AMQP::writable if it is to be monitored for writability,
|
||||
* or AMQP::readable | AMQP::writable if it has to be checked for both.
|
||||
* If flags has value 0, the filedescriptor should be removed from the
|
||||
* event loop.
|
||||
*
|
||||
* @param connection The TCP connection object that is reporting
|
||||
* @param fd The filedescriptor to be monitored
|
||||
* @param flags Should the object be monitored for readability or writability?
|
||||
*/
|
||||
virtual void monitor(TcpConnection *connection, int fd, int flags) = 0;
|
||||
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* AddressInfo.h
|
||||
*
|
||||
* Utility wrapper arround "getAddressInfo()"
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class AddressInfo
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* The addresses
|
||||
* @var struct AddressInfo
|
||||
*/
|
||||
struct addrinfo *_info = nullptr;
|
||||
|
||||
/**
|
||||
* Vector of addrinfo pointers
|
||||
* @var std::vector<struct addrinfo *>
|
||||
*/
|
||||
std::vector<struct addrinfo *> _v;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
* @param hostname
|
||||
* @param port
|
||||
*/
|
||||
AddressInfo(const char *hostname, uint16_t port = 5672)
|
||||
{
|
||||
// store portnumber in buffer
|
||||
auto portnumber = std::to_string(port);
|
||||
|
||||
// info about the lookup
|
||||
struct addrinfo hints;
|
||||
|
||||
// set everything to zero
|
||||
memset(&hints, 0, sizeof(struct AddressInfo));
|
||||
|
||||
// set hints
|
||||
hints.ai_family = AF_UNSPEC; // allow IPv4 or IPv6
|
||||
hints.ai_socktype = SOCK_STREAM; // datagram socket/
|
||||
|
||||
// get address of the server
|
||||
auto code = getaddrinfo(hostname, portnumber.data(), &hints, &_info);
|
||||
|
||||
// was there an error
|
||||
if (code != 0) throw std::runtime_error(gai_strerror(code));
|
||||
|
||||
// keep looping
|
||||
for (auto *current = _info; current; current = current->ai_next)
|
||||
{
|
||||
// store in vector
|
||||
_v.push_back(current);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~AddressInfo()
|
||||
{
|
||||
// free address info
|
||||
freeaddrinfo(_info);
|
||||
}
|
||||
|
||||
/**
|
||||
* Size of the array
|
||||
* @return size_t
|
||||
*/
|
||||
size_t size() const
|
||||
{
|
||||
return _v.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get reference to struct
|
||||
* @param index
|
||||
* @return struct addrinfo*
|
||||
*/
|
||||
const struct addrinfo *operator[](int index) const
|
||||
{
|
||||
// expose vector
|
||||
return _v[index];
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
|
|
@ -33,6 +33,12 @@ const int redelivered = 0x2000;
|
|||
const int multiple = 0x4000;
|
||||
const int requeue = 0x8000;
|
||||
|
||||
/**
|
||||
* Flags for event loops
|
||||
*/
|
||||
const int readable = 0x1;
|
||||
const int writable = 0x2;
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -21,6 +21,10 @@
|
|||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netdb.h>
|
||||
#include <unistd.h>
|
||||
|
||||
// forward declarations
|
||||
#include "../include/classes.h"
|
||||
|
|
@ -62,9 +66,12 @@
|
|||
#include "../include/channelimpl.h"
|
||||
#include "../include/channel.h"
|
||||
#include "../include/login.h"
|
||||
#include "../include/address.h"
|
||||
#include "../include/connectionhandler.h"
|
||||
#include "../include/connectionimpl.h"
|
||||
#include "../include/connection.h"
|
||||
#include "../include/tcphandler.h"
|
||||
#include "../include/tcpconnection.h"
|
||||
|
||||
// classes that are very commonly used
|
||||
#include "exception.h"
|
||||
|
|
@ -79,6 +86,6 @@
|
|||
#include "queueframe.h"
|
||||
#include "basicframe.h"
|
||||
#include "transactionframe.h"
|
||||
|
||||
#include "addressinfo.h"
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Pipe.h
|
||||
*
|
||||
* Pipe of two filedescriptors, used to communicate between master and child thread
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Dependencies
|
||||
*/
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
|
||||
/**
|
||||
* Set up namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class Pipe
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* The two filedescriptors that make up the pipe
|
||||
* @var int[]
|
||||
*/
|
||||
int _fds[2];
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
Pipe()
|
||||
{
|
||||
// construct the pipe
|
||||
if (pipe2(_fds, O_CLOEXEC) == 0) return;
|
||||
|
||||
// something went wrong
|
||||
throw std::runtime_error(strerror(errno));
|
||||
}
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~Pipe()
|
||||
{
|
||||
// close the two filedescriptors
|
||||
close(_fds[0]);
|
||||
close(_fds[1]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Expose the filedescriptors
|
||||
* @return int
|
||||
*/
|
||||
int in() const { return _fds[0]; }
|
||||
int out() const { return _fds[1]; }
|
||||
|
||||
/**
|
||||
* Notify the pipe, so that the other thread wakes up
|
||||
*/
|
||||
void notify()
|
||||
{
|
||||
// one byte to send
|
||||
char byte = 0;
|
||||
|
||||
// send one byte over the pipe - this will wake up the other thread
|
||||
write(_fds[1], &byte, 1);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,373 @@
|
|||
/**
|
||||
* TcpBuffer.h
|
||||
*
|
||||
* When data could not be sent out immediately, it is buffered in a temporary
|
||||
* output buffer. This is the implementation of that buffer
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Dependencies
|
||||
*/
|
||||
#include <sys/ioctl.h>
|
||||
|
||||
/**
|
||||
* Set up namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class TcpBuffer : public Buffer
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* All output buffers
|
||||
* @var std::deque
|
||||
*/
|
||||
mutable std::deque<std::vector<char>> _buffers;
|
||||
|
||||
/**
|
||||
* Number of bytes in first buffer that is no longer in use
|
||||
* @var size_t
|
||||
*/
|
||||
size_t _skip = 0;
|
||||
|
||||
/**
|
||||
* Total number of bytes in the buffer
|
||||
* @var size_t
|
||||
*/
|
||||
size_t _size = 0;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Regular constructor
|
||||
*/
|
||||
TcpBuffer() {}
|
||||
|
||||
/**
|
||||
* No copy'ing allowed
|
||||
* @param that
|
||||
*/
|
||||
TcpBuffer(const TcpBuffer &that) = delete;
|
||||
|
||||
/**
|
||||
* Move operator
|
||||
* @param that
|
||||
*/
|
||||
TcpBuffer(TcpBuffer &&that) :
|
||||
_buffers(std::move(that._buffers)),
|
||||
_skip(that._skip),
|
||||
_size(that._size)
|
||||
{
|
||||
// reset other object
|
||||
that._skip = 0;
|
||||
that._size = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Does the buffer exist (is it non-empty)
|
||||
* @return bool
|
||||
*/
|
||||
operator bool () const
|
||||
{
|
||||
// there must be a size
|
||||
return _size > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the buffer empty
|
||||
* @return bool
|
||||
*/
|
||||
bool operator!() const
|
||||
{
|
||||
// size should be zero
|
||||
return _size == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Total size of the buffer
|
||||
* @return size_t
|
||||
*/
|
||||
virtual size_t size() const override
|
||||
{
|
||||
// this simply is a member
|
||||
return _size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get access to a single byte
|
||||
*
|
||||
* No safety checks are necessary: this method will only be called
|
||||
* for bytes that actually exist
|
||||
*
|
||||
* @param pos position in the buffer
|
||||
* @return char value of the byte in the buffer
|
||||
*/
|
||||
virtual char byte(size_t pos) const override
|
||||
{
|
||||
// incorporate the skipped bytes
|
||||
pos += _skip;
|
||||
|
||||
// iterate over the parts
|
||||
for (const auto &buffer : _buffers)
|
||||
{
|
||||
// is the byte within this buffer?
|
||||
if (buffer.size() > pos) return buffer[pos];
|
||||
|
||||
// prepare for next iteration
|
||||
pos -= buffer.size();
|
||||
}
|
||||
|
||||
// normally unreachable
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get access to the raw data
|
||||
* @param pos position in the buffer
|
||||
* @param size number of continuous bytes
|
||||
* @return char*
|
||||
*/
|
||||
virtual const char *data(size_t pos, size_t size) const override
|
||||
{
|
||||
// incorporate the skipped bytes
|
||||
pos += _skip;
|
||||
|
||||
// the buffer into which all data is going to be merged
|
||||
std::vector<char> *result = nullptr;
|
||||
|
||||
// amount of data that we still have to process
|
||||
size_t togo = _size + _skip;
|
||||
|
||||
// number of trailing empty buffers
|
||||
size_t empty = 0;
|
||||
|
||||
// iterate over the parts
|
||||
for (auto &buffer : _buffers)
|
||||
{
|
||||
// are we already merging?
|
||||
if (result)
|
||||
{
|
||||
// merge buffer
|
||||
result->insert(result->end(), buffer.begin(), buffer.end());
|
||||
|
||||
// one more empty buffer
|
||||
++empty;
|
||||
}
|
||||
|
||||
// does the data start within this buffer?
|
||||
else if (buffer.size() > pos)
|
||||
{
|
||||
// remember that this is buffer into which all data is going to be merged
|
||||
result = &buffer;
|
||||
|
||||
// reserve enough space
|
||||
result->reserve(togo);
|
||||
}
|
||||
|
||||
// data does not start in this part
|
||||
else
|
||||
{
|
||||
// prepare for next iteration
|
||||
pos -= buffer.size();
|
||||
}
|
||||
}
|
||||
|
||||
// remove empty buffers
|
||||
if (empty > 0) _buffers.resize(_buffers.size() - empty);
|
||||
|
||||
// done
|
||||
return result->data();
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy bytes to a buffer
|
||||
*
|
||||
* No safety checks are necessary: this method will only be called
|
||||
* for bytes that actually exist
|
||||
*
|
||||
* @param pos position in the buffer
|
||||
* @param size number of bytes to copy
|
||||
* @param output buffer to copy into
|
||||
* @return void* pointer to buffer
|
||||
*/
|
||||
virtual void *copy(size_t pos, size_t size, void *output) const override
|
||||
{
|
||||
// incorporate the skipped bytes
|
||||
pos += _skip;
|
||||
|
||||
// number of bytes already copied
|
||||
size_t copied = 0;
|
||||
|
||||
// iterate over the parts
|
||||
for (const auto &buffer : _buffers)
|
||||
{
|
||||
// is the byte within this buffer?
|
||||
if (buffer.size() > pos)
|
||||
{
|
||||
// number of bytes to copy
|
||||
size_t tocopy = std::min(buffer.size() - pos, size);
|
||||
|
||||
// copy data to the buffer
|
||||
memcpy((char *)output + copied, buffer.data() + pos, tocopy);
|
||||
|
||||
// update counters
|
||||
copied += tocopy;
|
||||
|
||||
// for next iteration we can start on position zero of the next buffer
|
||||
pos = 0; size -= tocopy;
|
||||
|
||||
// are we alread done?
|
||||
if (size == 0) return output;
|
||||
}
|
||||
else
|
||||
{
|
||||
// prepare for next iteration
|
||||
pos -= buffer.size();
|
||||
}
|
||||
}
|
||||
|
||||
// normally unreachable
|
||||
return output;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add data to the buffer
|
||||
* @param buffer
|
||||
* @param size
|
||||
*/
|
||||
void add(const char *buffer, size_t size)
|
||||
{
|
||||
// add element
|
||||
_buffers.emplace_back(buffer, buffer + size);
|
||||
|
||||
// update total size
|
||||
_size += size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shrink the buffer with a number of bytes
|
||||
* @param toremove
|
||||
*/
|
||||
void shrink(size_t toremove)
|
||||
{
|
||||
// are we removing everything?
|
||||
if (toremove >= _size)
|
||||
{
|
||||
// reset all
|
||||
_buffers.clear();
|
||||
_skip = _size = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
// keep looping
|
||||
while (toremove > 0)
|
||||
{
|
||||
// access to the first buffer
|
||||
const auto &first = _buffers.front();
|
||||
|
||||
// actual used bytes in first buffer
|
||||
size_t bytes = first.size() - _skip;
|
||||
|
||||
// can we remove the first buffer completely?
|
||||
if (toremove >= bytes)
|
||||
{
|
||||
// we're going to remove the first item, update sizes
|
||||
_size -= bytes;
|
||||
_skip = 0;
|
||||
|
||||
// number of bytes that still have to be removed
|
||||
toremove -= bytes;
|
||||
|
||||
// remove first buffer
|
||||
_buffers.pop_front();
|
||||
}
|
||||
else
|
||||
{
|
||||
// we should remove the first buffer partially
|
||||
_skip += toremove;
|
||||
_size -= toremove;
|
||||
|
||||
// done
|
||||
toremove = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the buffer to a socket
|
||||
* @param socket
|
||||
*/
|
||||
void sendto(int socket)
|
||||
{
|
||||
// keep looping
|
||||
while (_size > 0)
|
||||
{
|
||||
// we're going to fill a lot of buffers (64 should normally be enough)
|
||||
struct iovec buffer[64];
|
||||
|
||||
// index counter
|
||||
size_t index = 0;
|
||||
|
||||
// iterate over the buffers
|
||||
for (const auto &str : _buffers)
|
||||
{
|
||||
// fill buffer
|
||||
buffer[index].iov_base = (void *)(index == 0 ? str.data() + _skip : str.data());
|
||||
buffer[index].iov_len = index == 0 ? str.size() - _skip : str.size();
|
||||
|
||||
// update counter for next iteration
|
||||
if (++index >= 64) break;
|
||||
}
|
||||
|
||||
// send the data
|
||||
auto result = writev(socket, (const struct iovec *)&buffer, index);
|
||||
|
||||
// skip on error
|
||||
if (result <= 0) return;
|
||||
|
||||
// shrink the buffer
|
||||
if (result > 0) shrink(result);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive data from a socket
|
||||
* @param socket
|
||||
* @return ssize_t
|
||||
*/
|
||||
ssize_t receivefrom(int socket)
|
||||
{
|
||||
// find out how many bytes are available
|
||||
int available = 0;
|
||||
|
||||
// check the number of bytes that are available - in case of an error or
|
||||
// when the buffer is very small, we use a lower limit of 512 bytes
|
||||
if (ioctl(socket, FIONREAD, &available) != 0 || available < 512) return available = 512;
|
||||
|
||||
// add a new buffer
|
||||
_buffers.emplace_back(available);
|
||||
|
||||
// read the actual buffer
|
||||
auto &buffer = _buffers.back();
|
||||
|
||||
// read data into the buffer
|
||||
return read(socket, buffer.data(), available);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* TcpClosed.h
|
||||
*
|
||||
* Class that is used when the TCP connection ends up in a closed state
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Set up namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class TcpClosed : public TcpState
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
* @param connection The parent TcpConnection object
|
||||
* @param handler User supplied handler
|
||||
*/
|
||||
TcpClosed(TcpConnection *connection, TcpHandler *handler) :
|
||||
TcpState(connection, handler) {}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param state The to-be-copied state
|
||||
*/
|
||||
TcpClosed(const TcpState *state) :
|
||||
TcpState(state) {}
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~TcpClosed() = default;
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,167 @@
|
|||
/**
|
||||
* TcpConnected.h
|
||||
*
|
||||
* The actual tcp connection - this is the "_impl" of a tcp-connection after
|
||||
* the hostname was resolved into an IP address
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Dependencies
|
||||
*/
|
||||
#include "tcpbuffer.h"
|
||||
|
||||
/**
|
||||
* Set up namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class TcpConnected : public TcpState
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* The socket file descriptor
|
||||
* @var int
|
||||
*/
|
||||
int _socket;
|
||||
|
||||
/**
|
||||
* The outgoing buffer
|
||||
* @var TcpBuffer
|
||||
*/
|
||||
TcpBuffer _out;
|
||||
|
||||
/**
|
||||
* An incoming buffer
|
||||
* @var TcpBuffer
|
||||
*/
|
||||
TcpBuffer _in;
|
||||
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
* @param connection Parent TCP connection object
|
||||
* @param socket The socket filedescriptor
|
||||
* @param buffer The buffer that was already built
|
||||
* @param handler User-supplied handler object
|
||||
*/
|
||||
TcpConnected(TcpConnection *connection, int socket, TcpBuffer &&buffer, TcpHandler *handler) :
|
||||
TcpState(connection, handler),
|
||||
_socket(socket),
|
||||
_out(std::move(buffer))
|
||||
{
|
||||
// if there is already an output buffer, we have to send out that first
|
||||
if (_out) _out.sendto(_socket);
|
||||
|
||||
// tell the handler to monitor the socket, if there is an out
|
||||
_handler->monitor(_connection, _socket, _out ? readable | writable : readable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~TcpConnected()
|
||||
{
|
||||
// we no longer have to monitor the socket
|
||||
_handler->monitor(_connection, _socket, 0);
|
||||
|
||||
// close the socket
|
||||
close(_socket);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the filedescriptor in the object
|
||||
* @param fd Filedescriptor that is active
|
||||
* @param flags AMQP::readable and/or AMQP::writable
|
||||
* @return New state object
|
||||
*/
|
||||
virtual TcpState *process(int fd, int flags) override
|
||||
{
|
||||
// must be the socket
|
||||
if (fd != _socket) return this;
|
||||
|
||||
// can we write more data to the socket?
|
||||
if (flags & writable)
|
||||
{
|
||||
// send out the buffered data
|
||||
_out.sendto(_socket);
|
||||
|
||||
// if buffer is empty by now, we no longer have to check for
|
||||
// writability, but only for readability
|
||||
if (!_out) _handler->monitor(_connection, _socket, readable);
|
||||
}
|
||||
|
||||
// should we check for readability too?
|
||||
if (flags & readable)
|
||||
{
|
||||
// read data from buffer
|
||||
auto result = _in.receivefrom(_socket);
|
||||
|
||||
// is the connection in an error state?
|
||||
if (result < 0 && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)
|
||||
{
|
||||
// we have an error - report this to the user
|
||||
_handler->onError(_connection, strerror(errno));
|
||||
|
||||
// @todo object might no longer exist after this error
|
||||
|
||||
// we have a new state
|
||||
return new TcpClosed(this);
|
||||
}
|
||||
else
|
||||
{
|
||||
// parse the buffer
|
||||
auto processed = _connection->parse(_in);
|
||||
|
||||
// shrink buffer
|
||||
_in.shrink(processed);
|
||||
}
|
||||
}
|
||||
|
||||
// keep same object
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data over the connection
|
||||
* @param buffer buffer to send
|
||||
* @param size size of the buffer
|
||||
*/
|
||||
virtual void send(const char *buffer, size_t size)
|
||||
{
|
||||
// is there already a buffer of data that can not be sent?
|
||||
if (_out) return _out.add(buffer, size);
|
||||
|
||||
// there is no buffer, send the data right away
|
||||
auto result = write(_socket, buffer, size);
|
||||
|
||||
// number of bytes sent
|
||||
size_t bytes = result < 0 ? 0 : result;
|
||||
|
||||
// ok if all data was sent
|
||||
if (bytes >= size) return;
|
||||
|
||||
// add the data to the buffer
|
||||
_out.add(buffer + bytes, size - bytes);
|
||||
|
||||
// start monitoring the socket to find out when it is writable
|
||||
_handler->monitor(_connection, _socket, readable | writable);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,122 @@
|
|||
/**
|
||||
* TcpConnection.cpp
|
||||
*
|
||||
* Implementation file for the TCP connection
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Dependencies
|
||||
*/
|
||||
#include "includes.h"
|
||||
#include "tcpresolver.h"
|
||||
|
||||
/**
|
||||
* Set up namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param handler User implemented handler object
|
||||
* @param address AMQP address object
|
||||
*/
|
||||
TcpConnection::TcpConnection(TcpHandler *handler, const Address &address) :
|
||||
Connection(this, address.login(), address.vhost()),
|
||||
_state(new TcpResolver(this, address.hostname(), address.port(), handler)) {}
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
TcpConnection::~TcpConnection()
|
||||
{
|
||||
// remove the state object
|
||||
delete _state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the TCP connection
|
||||
* This method should be called when the filedescriptor that is registered
|
||||
* in the event loop becomes active. You should pass in a flag holding the
|
||||
* flags AMQP::readable or AMQP::writable to indicate whether the descriptor
|
||||
* was readable or writable, or bitwise-or if it was both
|
||||
* @param fd The filedescriptor that became readable or writable
|
||||
* @param events What sort of events occured?
|
||||
*/
|
||||
void TcpConnection::process(int fd, int flags)
|
||||
{
|
||||
// pass on the the state, that returns a new impl
|
||||
auto *result = _state->process(fd, flags);
|
||||
|
||||
// skip if the same state is continued to be used
|
||||
if (result == _state) return;
|
||||
|
||||
// remove old state
|
||||
delete _state;
|
||||
|
||||
// replace it with the new implementation
|
||||
_state = result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that is called by the connection when data needs to be sent over the network
|
||||
* @param connection The connection that created this output
|
||||
* @param buffer Data to send
|
||||
* @param size Size of the buffer
|
||||
*/
|
||||
void TcpConnection::onData(Connection *connection, const char *buffer, size_t size)
|
||||
{
|
||||
// send the data over the connecction
|
||||
_state->send(buffer, size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method called when the connection ends up in an error state
|
||||
* @param connection The connection that entered the error state
|
||||
* @param message Error message
|
||||
*/
|
||||
void TcpConnection::onError(Connection *connection, const char *message)
|
||||
{
|
||||
// current object is going to be removed, wrap it in a unique pointer to enforce that
|
||||
std::unique_ptr<TcpState> ptr(_state);
|
||||
|
||||
// object is now in a closed state
|
||||
_state = new TcpClosed(_state);
|
||||
|
||||
// tell the implementation to report the error
|
||||
ptr->reportError(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that is called when the connection is established
|
||||
* @param connection The connection that can now be used
|
||||
*/
|
||||
void TcpConnection::onConnected(Connection *connection)
|
||||
{
|
||||
// tell the implementation to report the status
|
||||
_state->reportConnected();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that is called when the connection was closed.
|
||||
* @param connection The connection that was closed and that is now unusable
|
||||
*/
|
||||
void TcpConnection::onClosed(Connection *connection)
|
||||
{
|
||||
// current object is going to be removed, wrap it in a unique pointer to enforce that
|
||||
std::unique_ptr<TcpState> ptr(_state);
|
||||
|
||||
// object is now in a closed state
|
||||
_state = new TcpClosed(_state);
|
||||
|
||||
// tell the implementation to report the error
|
||||
ptr->reportClosed();
|
||||
}
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,197 @@
|
|||
/**
|
||||
* TcpResolver.h
|
||||
*
|
||||
* Class that is used for the DNS lookup of the hostname of the RabbitMQ
|
||||
* server, and to make the initial connection
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Dependencies
|
||||
*/
|
||||
#include "pipe.h"
|
||||
#include "tcpstate.h"
|
||||
#include "tcpclosed.h"
|
||||
#include "tcpconnected.h"
|
||||
#include <thread>
|
||||
|
||||
/**
|
||||
* Set up namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class TcpResolver : public TcpState
|
||||
{
|
||||
private:
|
||||
/**
|
||||
* The hostname that we're trying to resolve
|
||||
* @var std::string
|
||||
*/
|
||||
std::string _hostname;
|
||||
|
||||
/**
|
||||
* The portnumber to connect to
|
||||
* @var uint16_t
|
||||
*/
|
||||
uint16_t _port;
|
||||
|
||||
/**
|
||||
* A pipe that is used to send back the socket that is connected to RabbitMQ
|
||||
* @var Pipe
|
||||
*/
|
||||
Pipe _pipe;
|
||||
|
||||
/**
|
||||
* Non-blocking socket that is connected to RabbitMQ
|
||||
* @var int
|
||||
*/
|
||||
int _socket = -1;
|
||||
|
||||
/**
|
||||
* Possible error that occured
|
||||
* @var std::string
|
||||
*/
|
||||
std::string _error;
|
||||
|
||||
/**
|
||||
* Data that was sent to the connection, while busy resolving the hostname
|
||||
* @var TcpBuffer
|
||||
*/
|
||||
TcpBuffer _buffer;
|
||||
|
||||
/**
|
||||
* Thread in which the DNS lookup occurs
|
||||
* @var std::thread
|
||||
*/
|
||||
std::thread _thread;
|
||||
|
||||
|
||||
/**
|
||||
* Run the thread
|
||||
*/
|
||||
void run()
|
||||
{
|
||||
// prevent exceptions
|
||||
try
|
||||
{
|
||||
// get address info
|
||||
AddressInfo addresses(_hostname.data(), _port);
|
||||
|
||||
// iterate over the addresses
|
||||
for (size_t i = 0; i < addresses.size(); ++i)
|
||||
{
|
||||
// create the socket
|
||||
_socket = socket(addresses[i]->ai_family, addresses[i]->ai_socktype, addresses[i]->ai_protocol);
|
||||
|
||||
// move on on failure
|
||||
if (_socket < -1) continue;
|
||||
|
||||
// connect to the socket
|
||||
if (connect(_socket, addresses[i]->ai_addr, addresses[i]->ai_addrlen) == 0) break;
|
||||
|
||||
// log the error for the time being
|
||||
_error = strerror(errno);
|
||||
|
||||
// close socket because connect failed
|
||||
close(_socket);
|
||||
|
||||
// socket no longer is valid
|
||||
_socket = -1;
|
||||
}
|
||||
|
||||
// connection succeeded, mark socket as non-blocking
|
||||
if (_socket >= 0) fcntl(_socket, F_SETFL, O_NONBLOCK);
|
||||
}
|
||||
catch (const std::runtime_error &error)
|
||||
{
|
||||
// address could not be resolved, we ignore this for now, but store the error
|
||||
_error = error.what();
|
||||
}
|
||||
|
||||
// notify the master thread by sending a byte over the pipe
|
||||
_pipe.notify();
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
* @param connection Parent connection object
|
||||
* @param hostname The hostname for the lookup
|
||||
* @param portnumber The portnumber for the lookup
|
||||
* @param handler User implemented handler object
|
||||
*/
|
||||
TcpResolver(TcpConnection *connection, const std::string &hostname, uint16_t port, TcpHandler *handler) :
|
||||
TcpState(connection, handler),
|
||||
_hostname(hostname),
|
||||
_port(port)
|
||||
{
|
||||
// tell the event loop to monitor the filedescriptor of the pipe
|
||||
handler->monitor(connection, _pipe.in(), readable);
|
||||
|
||||
// we can now start the thread (must be started after filedescriptor is monitored!)
|
||||
std::thread thread(std::bind(&TcpResolver::run, this));
|
||||
|
||||
// store thread in member
|
||||
_thread.swap(thread);
|
||||
}
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~TcpResolver()
|
||||
{
|
||||
// stop monitoring the pipe filedescriptor
|
||||
_handler->monitor(_connection, _pipe.in(), 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the resolver to be ready
|
||||
* @param fd The filedescriptor that is active
|
||||
* @param flags Flags to indicate that fd is readable and/or writable
|
||||
* @return New implementation object
|
||||
*/
|
||||
virtual TcpState *process(int fd, int flags) override
|
||||
{
|
||||
// only works if the incoming pipe is readable
|
||||
if (fd != _pipe.in() || !(flags & readable)) return this;
|
||||
|
||||
// wait for the thread to be ready
|
||||
_thread.join();
|
||||
|
||||
// do we have a valid socket?
|
||||
if (_socket >= 0) return new TcpConnected(_connection, _socket, std::move(_buffer), _handler);
|
||||
|
||||
// report error
|
||||
_handler->onError(_connection, _error.data());
|
||||
|
||||
// create dummy implementation
|
||||
return new TcpClosed(_connection, _handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data over the connection
|
||||
* @param buffer buffer to send
|
||||
* @param size size of the buffer
|
||||
*/
|
||||
virtual void send(const char *buffer, size_t size)
|
||||
{
|
||||
// add data to buffer
|
||||
_buffer.add(buffer, size);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
* TcpState.h
|
||||
*
|
||||
* Base class / interface of the various states of the TCP connection
|
||||
*
|
||||
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
|
||||
* @copyright 2015 Copernica BV
|
||||
*/
|
||||
|
||||
/**
|
||||
* Include guard
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
/**
|
||||
* Set up namespace
|
||||
*/
|
||||
namespace AMQP {
|
||||
|
||||
/**
|
||||
* Class definition
|
||||
*/
|
||||
class TcpState
|
||||
{
|
||||
protected:
|
||||
/**
|
||||
* Parent TcpConnection object as is seen by the user
|
||||
* @var TcpConnection
|
||||
*/
|
||||
TcpConnection *_connection;
|
||||
|
||||
/**
|
||||
* User-supplied handler
|
||||
* @var TcpHandler
|
||||
*/
|
||||
TcpHandler *_handler;
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Protected constructor
|
||||
* @param connection Original TCP connection object
|
||||
* @param handler User-supplied handler class
|
||||
*/
|
||||
TcpState(TcpConnection *connection, TcpHandler *handler) :
|
||||
_connection(connection), _handler(handler) {}
|
||||
|
||||
/**
|
||||
* Protected "copy" constructor
|
||||
* @param state Original TcpState object
|
||||
*/
|
||||
TcpState(const TcpState *state) :
|
||||
_connection(state->_connection), _handler(state->_handler) {}
|
||||
|
||||
public:
|
||||
/**
|
||||
* Virtual destructor
|
||||
*/
|
||||
virtual ~TcpState() = default;
|
||||
|
||||
/**
|
||||
* Process the filedescriptor in the object
|
||||
* @param fd The filedescriptor that is active
|
||||
* @param flags AMQP::readable and/or AMQP::writable
|
||||
* @return New implementation object
|
||||
*/
|
||||
virtual TcpState *process(int fd, int flags)
|
||||
{
|
||||
// default implementation does nothing and preserves same implementation
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data over the connection
|
||||
* @param buffer buffer to send
|
||||
* @param size size of the buffer
|
||||
*/
|
||||
virtual void send(const char *buffer, size_t size)
|
||||
{
|
||||
// default does nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Report to the handler that the object is in an error state
|
||||
* @param error
|
||||
*/
|
||||
void reportError(const char *error)
|
||||
{
|
||||
// pass to handler
|
||||
_handler->onError(_connection, error);
|
||||
}
|
||||
|
||||
/**
|
||||
* Report to the handler that the connection is ready for use
|
||||
*/
|
||||
void reportConnected()
|
||||
{
|
||||
// pass to handler
|
||||
_handler->onConnected(_connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Report to the handler that the connection was nicely closed
|
||||
*/
|
||||
void reportClosed()
|
||||
{
|
||||
// pass to handler
|
||||
_handler->onClosed(_connection);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* End of namespace
|
||||
*/
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue