174 lines
5.1 KiB
C++
174 lines
5.1 KiB
C++
|
|
#include "alarmEventDataService.h"
|
|||
|
|
#include <QJsonObject>
|
|||
|
|
#include <QJsonDocument>
|
|||
|
|
#include <QJsonArray>
|
|||
|
|
#include <QJsonParseError>
|
|||
|
|
|
|||
|
|
AlarmEventDataService* AlarmEventDataService::instance()
|
|||
|
|
{
|
|||
|
|
static AlarmEventDataService instance;
|
|||
|
|
return &instance;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
AlarmEventDataService::AlarmEventDataService()
|
|||
|
|
: m_serviceStatus(ServiceStatus::Uninitialized)
|
|||
|
|
//, m_realTimeConnectionStatus(ConnectionStatus::Disconnected)
|
|||
|
|
//, m_historicalConnectionStatus(ConnectionStatus::Disconnected)
|
|||
|
|
, m_amqpClient(nullptr)
|
|||
|
|
, m_amqpQueue(nullptr)
|
|||
|
|
, m_amqpExchange(nullptr)
|
|||
|
|
{}
|
|||
|
|
|
|||
|
|
AlarmEventDataService::~AlarmEventDataService()
|
|||
|
|
{}
|
|||
|
|
|
|||
|
|
bool AlarmEventDataService::initialize(const ServiceConfig& config)
|
|||
|
|
{
|
|||
|
|
if(m_serviceStatus != ServiceStatus::Uninitialized)
|
|||
|
|
return false;
|
|||
|
|
|
|||
|
|
if(!config.isValid())
|
|||
|
|
return false;
|
|||
|
|
|
|||
|
|
m_config = config;
|
|||
|
|
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void AlarmEventDataService::start()
|
|||
|
|
{
|
|||
|
|
if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Connecting)
|
|||
|
|
return;
|
|||
|
|
|
|||
|
|
m_serviceStatus = ServiceStatus::Connecting;
|
|||
|
|
startRealTimeDataService();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void AlarmEventDataService::stop()
|
|||
|
|
{}
|
|||
|
|
|
|||
|
|
void AlarmEventDataService::startRealTimeDataService()
|
|||
|
|
{
|
|||
|
|
cleanupRabbitMQConnection();
|
|||
|
|
|
|||
|
|
m_amqpClient = new QAmqpClient(this);
|
|||
|
|
m_amqpClient->setHost(m_config.rabbitMQConfig.host);
|
|||
|
|
m_amqpClient->setPort(m_config.rabbitMQConfig.port);
|
|||
|
|
m_amqpClient->setUsername(m_config.rabbitMQConfig.username);
|
|||
|
|
m_amqpClient->setPassword(m_config.rabbitMQConfig.password);
|
|||
|
|
m_amqpClient->setVirtualHost(m_config.rabbitMQConfig.virtualHost);
|
|||
|
|
|
|||
|
|
connect(m_amqpClient, &QAmqpClient::connected, this, &AlarmEventDataService::onRabbitMQConnected);
|
|||
|
|
connect(m_amqpClient, &QAmqpClient::disconnected, this, &AlarmEventDataService::onRabbitMQDisconnected);
|
|||
|
|
connect(m_amqpClient, QOverload<QAMQP::Error>::of(&QAmqpClient::error), this, &AlarmEventDataService::onRabbitMQError);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void AlarmEventDataService::cleanupRabbitMQConnection()
|
|||
|
|
{
|
|||
|
|
if(m_amqpQueue)
|
|||
|
|
{
|
|||
|
|
m_amqpQueue->deleteLater();
|
|||
|
|
m_amqpQueue = nullptr;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if(m_amqpExchange)
|
|||
|
|
{
|
|||
|
|
m_amqpExchange->deleteLater();
|
|||
|
|
m_amqpExchange = nullptr;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if(m_amqpClient)
|
|||
|
|
{
|
|||
|
|
m_amqpClient->disconnectFromHost();
|
|||
|
|
m_amqpClient->deleteLater();
|
|||
|
|
m_amqpClient = nullptr;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
MessageHandleResult AlarmEventDataService::processMessage(const QAmqpMessage& message)
|
|||
|
|
{
|
|||
|
|
QString errorString;
|
|||
|
|
EventData event = parseEventFromMessage(message.payload(), errorString);
|
|||
|
|
|
|||
|
|
if(!errorString.isEmpty())
|
|||
|
|
return MessageHandleResult::ParseError;
|
|||
|
|
|
|||
|
|
if(!validateEvent(event))
|
|||
|
|
return MessageHandleResult::ValidationError;
|
|||
|
|
|
|||
|
|
emit realTimeEventReceived(event);
|
|||
|
|
return MessageHandleResult::Success;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
EventData AlarmEventDataService::parseEventFromMessage(const QByteArray& data, QString& errorString)
|
|||
|
|
{
|
|||
|
|
QJsonParseError parseError;
|
|||
|
|
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
|
|||
|
|
|
|||
|
|
if(parseError.error != QJsonParseError::NoError)
|
|||
|
|
{
|
|||
|
|
errorString = QString("JSON parse error: %1 at offset %2")
|
|||
|
|
.arg(parseError.errorString())
|
|||
|
|
.arg(parseError.offset);
|
|||
|
|
return EventData();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if(!doc.isObject())
|
|||
|
|
{
|
|||
|
|
errorString = "JSON document is not an object";
|
|||
|
|
return EventData();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
QJsonObject obj = doc.object();
|
|||
|
|
EventData event = EventData::fromJson(obj);
|
|||
|
|
return event;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
bool AlarmEventDataService::validateEvent(const EventData& event)
|
|||
|
|
{
|
|||
|
|
if(event.id.isEmpty())
|
|||
|
|
return false;
|
|||
|
|
QDateTime eventTime = QDateTime::fromMSecsSinceEpoch(event.timestamp);
|
|||
|
|
if(!eventTime.isValid())
|
|||
|
|
return false;
|
|||
|
|
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ==================== 槽函数 ====================
|
|||
|
|
void AlarmEventDataService::onRabbitMQConnected()
|
|||
|
|
{
|
|||
|
|
m_serviceStatus = ServiceStatus::Connected;
|
|||
|
|
//Exchange
|
|||
|
|
m_amqpExchange = m_amqpClient->createExchange(m_config.rabbitMQConfig.exchangeName);
|
|||
|
|
m_amqpExchange->declare(QAmqpExchange::FanOut, QAmqpExchange::Durable);
|
|||
|
|
//Queue
|
|||
|
|
m_amqpQueue = m_amqpClient->createQueue(m_config.rabbitMQConfig.queueName);
|
|||
|
|
m_amqpQueue->declare(QAmqpQueue::AutoDelete);
|
|||
|
|
m_amqpQueue->bind(m_amqpExchange, m_config.rabbitMQConfig.routingKey); //对于广播类型的交换机,消息会发送给所有与之绑定的队列,所以此时routingKey无所谓
|
|||
|
|
connect(m_amqpQueue, &QAmqpQueue::messageReceived, this, &AlarmEventDataService::onMessageReceived);
|
|||
|
|
if(m_config.rabbitMQConfig.autoAck)
|
|||
|
|
m_amqpQueue->consume(QAmqpQueue::coNoAck);
|
|||
|
|
else
|
|||
|
|
m_amqpQueue->consume(0); //0表示不适用任何特殊选项,即手动确认
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void AlarmEventDataService::onRabbitMQDisconnected()
|
|||
|
|
{}
|
|||
|
|
|
|||
|
|
void AlarmEventDataService::onRabbitMQError(QAMQP::Error error)
|
|||
|
|
{
|
|||
|
|
qWarning() << m_amqpClient->errorString();
|
|||
|
|
m_serviceStatus = ServiceStatus::Error;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void AlarmEventDataService::onMessageReceived()
|
|||
|
|
{
|
|||
|
|
QAmqpMessage message = m_amqpQueue->dequeue();
|
|||
|
|
MessageHandleResult result = processMessage(message);
|
|||
|
|
|
|||
|
|
//手动确认消息
|
|||
|
|
if(!m_config.rabbitMQConfig.autoAck && result == MessageHandleResult::Success)
|
|||
|
|
m_amqpQueue->ack(message);
|
|||
|
|
}
|