PowerMaster/source/alarmEventDataService.cpp

219 lines
6.4 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "alarmEventDataService.h"
#include <QTimer>
#include <QJsonObject>
#include <QJsonDocument>
#include <QJsonArray>
#include <QJsonParseError>
AlarmEventDataService* AlarmEventDataService::instance()
{
static AlarmEventDataService instance;
return &instance;
}
AlarmEventDataService::AlarmEventDataService()
: m_serviceStatus(ServiceStatus::Uninitialized)
//, m_realTimeConnectionStatus(ConnectionStatus::Disconnected)
//, m_historicalConnectionStatus(ConnectionStatus::Disconnected)
, m_amqpClient(nullptr)
, m_amqpQueue(nullptr)
, m_amqpExchange(nullptr)
, m_reconnectAttempts(0)
, m_maxReconnectAttempts(10)
{
m_reconnectTimer = new QTimer(this);
m_reconnectTimer->setSingleShot(true);
connect(m_reconnectTimer, &QTimer::timeout, this, &AlarmEventDataService::onReconnectTimeout);
}
AlarmEventDataService::~AlarmEventDataService()
{}
bool AlarmEventDataService::initialize(const ServiceConfig& config)
{
if(m_serviceStatus != ServiceStatus::Uninitialized)
return false;
if(!config.isValid())
return false;
m_config = config;
m_serviceStatus = ServiceStatus::Disconnected;
return true;
}
void AlarmEventDataService::start()
{
if(m_serviceStatus != ServiceStatus::Disconnected)
return;
m_serviceStatus = ServiceStatus::Connecting;
//启动实时数据服务
startRealTimeDataService();
}
void AlarmEventDataService::stop()
{
cancelReconnect();
cleanupRabbitMQConnection();
m_serviceStatus = ServiceStatus::Disconnected;
}
void AlarmEventDataService::startRealTimeDataService()
{
cleanupRabbitMQConnection();
m_amqpClient = new QAmqpClient(this);
m_amqpClient->setHost(m_config.rabbitMQConfig.host);
m_amqpClient->setPort(m_config.rabbitMQConfig.port);
m_amqpClient->setUsername(m_config.rabbitMQConfig.username);
m_amqpClient->setPassword(m_config.rabbitMQConfig.password);
m_amqpClient->setVirtualHost(m_config.rabbitMQConfig.virtualHost);
connect(m_amqpClient, &QAmqpClient::connected, this, &AlarmEventDataService::onRabbitMQConnected);
connect(m_amqpClient, &QAmqpClient::disconnected, this, &AlarmEventDataService::onRabbitMQDisconnected);
connect(m_amqpClient, QOverload<QAMQP::Error>::of(&QAmqpClient::error), this, &AlarmEventDataService::onRabbitMQError);
}
void AlarmEventDataService::cleanupRabbitMQConnection()
{
if(m_amqpQueue)
{
m_amqpQueue->deleteLater();
m_amqpQueue = nullptr;
}
if(m_amqpExchange)
{
m_amqpExchange->deleteLater();
m_amqpExchange = nullptr;
}
if(m_amqpClient)
{
m_amqpClient->disconnectFromHost();
m_amqpClient->deleteLater();
m_amqpClient = nullptr;
}
}
void AlarmEventDataService::scheduleReconnect()
{
if (m_reconnectAttempts < m_maxReconnectAttempts)
{
int delay = m_config.rabbitMQConfig.reconnectInterval * (1 << m_reconnectAttempts); // 指数退避,<<n相当于乘以2的n次方
m_reconnectTimer->start(delay);
m_reconnectAttempts++;
}
else
qInfo() << "Maximum reconnection attempts reached";
}
void AlarmEventDataService::cancelReconnect()
{
m_reconnectTimer->stop();
m_reconnectAttempts = 0;
}
MessageHandleResult AlarmEventDataService::processMessage(const QAmqpMessage& message)
{
QString errorString;
EventData event = parseEventFromMessage(message.payload(), errorString);
if(!errorString.isEmpty())
return MessageHandleResult::ParseError;
if(!validateEvent(event))
return MessageHandleResult::ValidationError;
emit realTimeEventReceived(event);
return MessageHandleResult::Success;
}
EventData AlarmEventDataService::parseEventFromMessage(const QByteArray& data, QString& errorString)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if(parseError.error != QJsonParseError::NoError)
{
errorString = QString("JSON parse error: %1 at offset %2")
.arg(parseError.errorString())
.arg(parseError.offset);
return EventData();
}
if(!doc.isObject())
{
errorString = "JSON document is not an object";
return EventData();
}
QJsonObject obj = doc.object();
EventData event = EventData::fromJson(obj);
return event;
}
bool AlarmEventDataService::validateEvent(const EventData& event)
{
if(event.id.isEmpty())
return false;
QDateTime eventTime = QDateTime::fromMSecsSinceEpoch(event.timestamp);
if(!eventTime.isValid())
return false;
return true;
}
// ==================== 槽函数 ====================
void AlarmEventDataService::onRabbitMQConnected()
{
m_serviceStatus = ServiceStatus::Connected;
//Exchange
m_amqpExchange = m_amqpClient->createExchange(m_config.rabbitMQConfig.exchangeName);
m_amqpExchange->declare(QAmqpExchange::FanOut, QAmqpExchange::Durable);
//Queue
m_amqpQueue = m_amqpClient->createQueue(m_config.rabbitMQConfig.queueName);
m_amqpQueue->declare(QAmqpQueue::AutoDelete);
m_amqpQueue->bind(m_amqpExchange, m_config.rabbitMQConfig.routingKey); //对于广播类型的交换机消息会发送给所有与之绑定的队列所以此时routingKey无所谓
connect(m_amqpQueue, &QAmqpQueue::messageReceived, this, &AlarmEventDataService::onMessageReceived);
if(m_config.rabbitMQConfig.autoAck)
m_amqpQueue->consume(QAmqpQueue::coNoAck);
else
m_amqpQueue->consume(0); //0表示不适用任何特殊选项即手动确认
}
void AlarmEventDataService::onRabbitMQDisconnected()
{
qWarning() << "Disconnected to RabbitMQ";
m_serviceStatus = ServiceStatus::Disconnected;
scheduleReconnect();
}
void AlarmEventDataService::onRabbitMQError(QAMQP::Error error)
{
qWarning() << m_amqpClient->errorString();
m_serviceStatus = ServiceStatus::Error;
scheduleReconnect();
}
void AlarmEventDataService::onMessageReceived()
{
QAmqpMessage message = m_amqpQueue->dequeue();
MessageHandleResult result = processMessage(message);
//手动确认消息
if(!m_config.rabbitMQConfig.autoAck && result == MessageHandleResult::Success)
m_amqpQueue->ack(message);
}
void AlarmEventDataService::onReconnectTimeout()
{
if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Reconnecting)
return;
m_serviceStatus = ServiceStatus::Reconnecting;
startRealTimeDataService();
}