cleanup frame processing and payload alignment

- remove a level of indentation making frame processing easier to read
- use qFromBigEndian(const uchar *src) to ensure alignment of payload size
This commit is contained in:
Adam Majer 2014-09-12 12:58:15 -05:00 committed by Matt Broadstone
parent 4ed42b74d5
commit 1eea37cb6f
1 changed files with 74 additions and 77 deletions

View File

@ -189,90 +189,87 @@ void ClientPrivate::_q_socketError(QAbstractSocket::SocketError error)
void ClientPrivate::_q_readyRead() void ClientPrivate::_q_readyRead()
{ {
while (socket->bytesAvailable() >= Frame::HEADER_SIZE) { while (socket->bytesAvailable() >= Frame::HEADER_SIZE) {
char headerData[Frame::HEADER_SIZE]; unsigned char headerData[Frame::HEADER_SIZE];
socket->peek(headerData, Frame::HEADER_SIZE); socket->peek((char*)headerData, Frame::HEADER_SIZE);
// FIXME: This is not the correct way of reading payloadSize const quint32 payloadSize = qFromBigEndian<quint32>(headerData + 3);
// gcc: dereferencing type-punned pointer will break strict-aliasing rules [-Wstrict-aliasing]
const quint32 payloadSize = qFromBigEndian<quint32>(*(quint32*)&headerData[3]);
const qint64 readSize = Frame::HEADER_SIZE + payloadSize + Frame::FRAME_END_SIZE; const qint64 readSize = Frame::HEADER_SIZE + payloadSize + Frame::FRAME_END_SIZE;
if (socket->bytesAvailable() >= readSize) { if (socket->bytesAvailable() < readSize)
buffer.resize(readSize); return;
socket->read(buffer.data(), readSize);
const char *bufferData = buffer.constData(); buffer.resize(readSize);
const quint8 type = *(quint8*)&bufferData[0]; socket->read(buffer.data(), readSize);
const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize]; const char *bufferData = buffer.constData();
if (magic != Frame::FRAME_END) { const quint8 type = *(quint8*)&bufferData[0];
close(UnexpectedFrameError, "wrong end of frame"); const quint8 magic = *(quint8*)&bufferData[Frame::HEADER_SIZE + payloadSize];
if (magic != Frame::FRAME_END) {
close(UnexpectedFrameError, "wrong end of frame");
return;
}
QDataStream streamB(&buffer, QIODevice::ReadOnly);
switch(Frame::Type(type)) {
case Frame::ftMethod:
{
Frame::Method frame(streamB);
if (frame.size() > frameMax) {
close(FrameError, "frame size too large");
return; return;
} }
QDataStream streamB(&buffer, QIODevice::ReadOnly); if (frame.methodClass() == Frame::fcConnection) {
switch(Frame::Type(type)) { _q_method(frame);
case Frame::ftMethod: } else {
{ foreach (Frame::MethodHandler *methodHandler, methodHandlersByChannel[frame.channel()])
Frame::Method frame(streamB); methodHandler->_q_method(frame);
if (frame.size() > frameMax) {
close(FrameError, "frame size too large");
return;
}
if (frame.methodClass() == Frame::fcConnection) {
_q_method(frame);
} else {
foreach (Frame::MethodHandler *methodHandler, methodHandlersByChannel[frame.channel()])
methodHandler->_q_method(frame);
}
} }
break; }
case Frame::ftHeader:
{
Frame::Content frame(streamB);
if (frame.size() > frameMax) {
close(FrameError, "frame size too large");
return;
} else if (frame.channel() <= 0) {
close(ChannelError, "channel number must be greater than zero");
return;
}
foreach (Frame::ContentHandler *methodHandler, contentHandlerByChannel[frame.channel()])
methodHandler->_q_content(frame);
}
break;
case Frame::ftBody:
{
Frame::ContentBody frame(streamB);
if (frame.size() > frameMax) {
close(FrameError, "frame size too large");
return;
} else if (frame.channel() <= 0) {
close(ChannelError, "channel number must be greater than zero");
return;
}
foreach (Frame::ContentBodyHandler *methodHandler, bodyHandlersByChannel[frame.channel()])
methodHandler->_q_body(frame);
}
break;
case Frame::ftHeartbeat:
{
Frame::Method frame(streamB);
if (frame.channel() != 0) {
close(FrameError, "heartbeat must have channel id zero");
return;
}
qAmqpDebug("AMQP: Heartbeat");
}
break;
default:
qAmqpDebug() << "AMQP: Unknown frame type: " << type;
close(FrameError, "invalid frame type");
return;
}
} else {
break; break;
case Frame::ftHeader:
{
Frame::Content frame(streamB);
if (frame.size() > frameMax) {
close(FrameError, "frame size too large");
return;
} else if (frame.channel() <= 0) {
close(ChannelError, "channel number must be greater than zero");
return;
}
foreach (Frame::ContentHandler *methodHandler, contentHandlerByChannel[frame.channel()])
methodHandler->_q_content(frame);
}
break;
case Frame::ftBody:
{
Frame::ContentBody frame(streamB);
if (frame.size() > frameMax) {
close(FrameError, "frame size too large");
return;
} else if (frame.channel() <= 0) {
close(ChannelError, "channel number must be greater than zero");
return;
}
foreach (Frame::ContentBodyHandler *methodHandler, bodyHandlersByChannel[frame.channel()])
methodHandler->_q_body(frame);
}
break;
case Frame::ftHeartbeat:
{
Frame::Method frame(streamB);
if (frame.channel() != 0) {
close(FrameError, "heartbeat must have channel id zero");
return;
}
qAmqpDebug("AMQP: Heartbeat");
}
break;
default:
qAmqpDebug() << "AMQP: Unknown frame type: " << type;
close(FrameError, "invalid frame type");
return;
} }
} }
} }