PowerMaster/source/alarmEventDataService.cpp

365 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "alarmEventDataService.h"
#include <QNetworkAccessManager>
#include <QNetworkRequest>
#include <QUrlQuery>
#include <QNetworkReply>
#include <QTimer>
#include <QFile>
#include <QSettings>
#include <QJsonObject>
#include <QJsonDocument>
#include <QJsonArray>
#include <QJsonParseError>
#include <QCoreApplication>
AlarmEventDataService* AlarmEventDataService::instance()
{
static AlarmEventDataService instance;
return &instance;
}
AlarmEventDataService::AlarmEventDataService()
: m_serviceStatus(ServiceStatus::Uninitialized)
//, m_realTimeConnectionStatus(ConnectionStatus::Disconnected)
//, m_historicalConnectionStatus(ConnectionStatus::Disconnected)
, m_amqpClient(nullptr)
, m_amqpQueue(nullptr)
, m_amqpExchange(nullptr)
, m_reconnectAttempts(0)
, m_maxReconnectAttempts(10)
{
m_reconnectTimer = new QTimer(this);
m_reconnectTimer->setSingleShot(true);
connect(m_reconnectTimer, &QTimer::timeout, this, &AlarmEventDataService::onReconnectTimeout);
m_networkManager = new QNetworkAccessManager(this);
initialize();
}
AlarmEventDataService::~AlarmEventDataService()
{
stop();
}
void AlarmEventDataService::initialize()
{
//读取配置文件
QString cofigDir = QCoreApplication::applicationDirPath();
m_settingsFile = cofigDir + "/config/alarmEventService_config.ini";
QFile file(m_settingsFile);
if(file.open(QIODevice::ReadWrite))
{
m_isVaildSettingsFile = true;
m_settings = new QSettings(m_settingsFile, QSettings::IniFormat);
//RabbitMQConfig
m_config.rabbitMQConfig.host = m_settings->value("RabbitMQConfig/host").toString();
m_config.rabbitMQConfig.port = m_settings->value("RabbitMQConfig/port").toInt();
m_config.rabbitMQConfig.username = m_settings->value("RabbitMQConfig/username").toString();
m_config.rabbitMQConfig.password = m_settings->value("RabbitMQConfig/password").toString();
m_config.rabbitMQConfig.virtualHost = m_settings->value("RabbitMQConfig/virtualHost").toString();
m_config.rabbitMQConfig.exchangeName = m_settings->value("RabbitMQConfig/exchangeName").toString();
m_config.rabbitMQConfig.queueName = m_settings->value("RabbitMQConfig/queueName").toString();
m_config.rabbitMQConfig.routingKey = m_settings->value("RabbitMQConfig/routingKey").toString();
m_config.rabbitMQConfig.reconnectInterval = m_settings->value("RabbitMQConfig/reconnectInterval").toInt();
m_config.rabbitMQConfig.heartbeat = m_settings->value("RabbitMQConfig/heartbeat").toInt();
m_config.rabbitMQConfig.autoAck = m_settings->value("RabbitMQConfig/autoAck").toBool();
//HistoricalDataConfig
m_config.historicalConfig.baseUrl = m_settings->value("HistoricalDataConfig/baseUrl").toString();
m_config.historicalConfig.timeout = m_settings->value("HistoricalDataConfig/timeout").toInt();
m_config.historicalConfig.maxRetries = m_settings->value("HistoricalDataConfig/maxRetries").toInt();
m_config.historicalConfig.retryInterval = m_settings->value("HistoricalDataConfig/retryInterval").toInt();
m_serviceStatus = ServiceStatus::Disconnected;
}
else
m_isVaildSettingsFile = false;
}
void AlarmEventDataService::start()
{
if(m_serviceStatus != ServiceStatus::Disconnected)
return;
m_serviceStatus = ServiceStatus::Connecting;
//启动实时数据服务
startRealTimeDataService();
}
void AlarmEventDataService::stop()
{
cancelReconnect();
cleanupRabbitMQConnection();
m_serviceStatus = ServiceStatus::Disconnected;
}
void AlarmEventDataService::queryHistoricalEvents(const QDateTime& startTime, const QDateTime& endTime, int confirmStatus)
{
QUrl url = bulidHistoricalQueryUrl(startTime, endTime, confirmStatus);
//QUrl url = m_config.historicalConfig.baseUrl;
if(!url.isValid())
return;
//创建网络请求
QNetworkRequest request(url);
request.setTransferTimeout(m_config.historicalConfig.timeout);
//发送请求
QNetworkReply* reply = m_networkManager->get(request);
connect(reply, &QNetworkReply::finished, this, [this, reply](){
onHistoricalRequestFinished(reply);
});
}
void AlarmEventDataService::startRealTimeDataService()
{
cleanupRabbitMQConnection();
m_amqpClient = new QAmqpClient(this);
/*connect(m_amqpClient, &QAmqpClient::connected, this, &AlarmEventDataService::onRabbitMQConnected);
connect(m_amqpClient, &QAmqpClient::disconnected, this, &AlarmEventDataService::onRabbitMQDisconnected);
connect(m_amqpClient, QOverload<QAMQP::Error>::of(&QAmqpClient::error), this, &AlarmEventDataService::onRabbitMQError);*/
//采用QT4的连接方式新的连接方式无法识别
connect(m_amqpClient, SIGNAL(connected()), this, SLOT(onRabbitMQConnected()));
connect(m_amqpClient, SIGNAL(disconnected()), this, SLOT(onRabbitMQDisconnected()));
connect(m_amqpClient, SIGNAL(error(QAMQP::Error)), this, SLOT(onRabbitMQError(QAMQP::Error)));
connect(m_amqpClient, SIGNAL(socketErrorOccurred(QAbstractSocket::SocketError)), this, SLOT(onRabbitMQSocketError(QAbstractSocket::SocketError)));
m_amqpClient->setHost(m_config.rabbitMQConfig.host);
m_amqpClient->setPort(m_config.rabbitMQConfig.port);
m_amqpClient->setUsername(m_config.rabbitMQConfig.username);
m_amqpClient->setPassword(m_config.rabbitMQConfig.password);
m_amqpClient->setVirtualHost(m_config.rabbitMQConfig.virtualHost);
m_amqpClient->connectToHost();
}
void AlarmEventDataService::cleanupRabbitMQConnection()
{
if(m_amqpQueue)
{
m_amqpQueue->deleteLater();
m_amqpQueue = nullptr;
}
if(m_amqpExchange)
{
m_amqpExchange->deleteLater();
m_amqpExchange = nullptr;
}
if(m_amqpClient)
{
m_amqpClient->disconnectFromHost();
m_amqpClient->deleteLater();
m_amqpClient = nullptr;
}
}
void AlarmEventDataService::scheduleReconnect()
{
if (m_reconnectAttempts < m_maxReconnectAttempts)
{
int delay = m_config.rabbitMQConfig.reconnectInterval * (1 /*<< m_reconnectAttempts*/); // 指数退避,<<n相当于乘以2的n次方
m_reconnectTimer->start(delay);
m_reconnectAttempts++;
}
else
qInfo() << "Maximum reconnection attempts reached";
}
void AlarmEventDataService::cancelReconnect()
{
if(m_reconnectTimer->isActive())
m_reconnectTimer->stop();
m_reconnectAttempts = 0;
}
MessageHandleResult AlarmEventDataService::processMessage(const QAmqpMessage& message)
{
QString errorString;
EventData event = parseEventFromMessage(message.payload(), errorString);
if(!errorString.isEmpty())
return MessageHandleResult::ParseError;
if(!validateEvent(event))
return MessageHandleResult::ValidationError;
emit realTimeEventReceived(event);
return MessageHandleResult::Success;
}
EventData AlarmEventDataService::parseEventFromMessage(const QByteArray& data, QString& errorString)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if(parseError.error != QJsonParseError::NoError)
{
errorString = QString("JSON parse error: %1 at offset %2")
.arg(parseError.errorString())
.arg(parseError.offset);
return EventData();
}
if(!doc.isObject())
{
errorString = "JSON document is not an object";
return EventData();
}
QJsonObject obj = doc.object();
EventData event = EventData::fromJson(obj);
return event;
}
bool AlarmEventDataService::validateEvent(const EventData& event)
{
if(event.id.isEmpty())
return false;
QDateTime eventTime = QDateTime::fromMSecsSinceEpoch(event.timestamp);
if(!eventTime.isValid())
return false;
return true;
}
QUrl AlarmEventDataService::bulidHistoricalQueryUrl(const QDateTime& startTime, const QDateTime& endTime, int confirmStatus)
{
QUrl url = m_config.historicalConfig.baseUrl;
QUrlQuery query;
query.addQueryItem("begin", QString::number(startTime.toMSecsSinceEpoch()));
query.addQueryItem("end", QString::number(endTime.toMSecsSinceEpoch()));
if(confirmStatus == 0) //未确认
query.addQueryItem("status", "0,1,2");
if(confirmStatus == 1) //已确认
query.addQueryItem("status", "3,4,5");
url.setQuery(query);
return url;
}
void AlarmEventDataService::processHistoricalResponse(const QByteArray& data)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if(parseError.error != QJsonParseError::NoError)
{
QString errorMsg = QString("Historical Events JSON parse error: %1 at offset %2")
.arg(parseError.errorString())
.arg(parseError.offset);
emit historicalQueryError(errorMsg);
}
if(!doc.isObject())
{
QString errorMsg = "Historical Events JSON document is not an object";
emit historicalQueryError(errorMsg);
return ;
}
QList<EventData> historicalEvents;
QJsonObject obj = doc.object();
QJsonValue dataValue = obj.value("data");
if(dataValue.isArray())
{
QJsonArray eventArray = dataValue.toArray();
int size = eventArray.size();
for(int i = 0; i < size; ++i)
{
QJsonValue eventValue = eventArray.at(i);
if(!eventValue.isObject())
continue;
QJsonObject eventObj = eventValue.toObject();
EventData event = EventData::fromJson(eventObj);
if(validateEvent(event))
historicalEvents.append(event);
}
}
emit historicalQueryData(historicalEvents);
}
// ==================== 槽函数 ====================
void AlarmEventDataService::onRabbitMQConnected()
{
//qInfo() << "RabbitMQ connectecd";
m_serviceStatus = ServiceStatus::Connected;
//Exchange
m_amqpExchange = m_amqpClient->createExchange(m_config.rabbitMQConfig.exchangeName);
m_amqpExchange->declare(QAmqpExchange::FanOut, QAmqpExchange::Durable);
//Queue
m_amqpQueue = m_amqpClient->createQueue(m_config.rabbitMQConfig.queueName);
connect(m_amqpQueue, SIGNAL(opened()), this, SLOT(onAmqpQueueOpend()));
}
void AlarmEventDataService::onRabbitMQDisconnected()
{
qWarning() << "Disconnected to RabbitMQ";
m_serviceStatus = ServiceStatus::Disconnected;
scheduleReconnect();
}
void AlarmEventDataService::onRabbitMQError(QAMQP::Error error)
{
qWarning() << m_amqpClient->errorString();
m_serviceStatus = ServiceStatus::Error;
scheduleReconnect();
}
void AlarmEventDataService::onRabbitMQSocketError(QAbstractSocket::SocketError error)
{
qWarning() << "RabbitMQ connection socket error: " << error;
m_serviceStatus = ServiceStatus::Error;
scheduleReconnect();
}
void AlarmEventDataService::onAmqpQueueOpend()
{
//qInfo() << "AmqpQueue opend";
m_amqpQueue->declare(QAmqpQueue::AutoDelete);
m_amqpQueue->bind(m_amqpExchange, m_config.rabbitMQConfig.routingKey); //对于广播类型的交换机消息会发送给所有与之绑定的队列所以此时routingKey无所谓
connect(m_amqpQueue, SIGNAL(messageReceived()), this, SLOT(onMessageReceived()));
if(m_config.rabbitMQConfig.autoAck)
m_amqpQueue->consume(QAmqpQueue::coNoAck);
else
m_amqpQueue->consume(0); //0表示不适用任何特殊选项即手动确认
}
void AlarmEventDataService::onMessageReceived()
{
QAmqpMessage message = m_amqpQueue->dequeue();
//qDebug() << message.payload();
MessageHandleResult result = processMessage(message);
//手动确认消息
if(!m_config.rabbitMQConfig.autoAck && result == MessageHandleResult::Success)
m_amqpQueue->ack(message);
}
void AlarmEventDataService::onReconnectTimeout()
{
if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Reconnecting)
return;
m_serviceStatus = ServiceStatus::Reconnecting;
startRealTimeDataService();
}
void AlarmEventDataService::onHistoricalRequestFinished(QNetworkReply* reply)
{
if(reply->error() == QNetworkReply::NoError)
{
QByteArray data = reply->readAll();
processHistoricalResponse(data);
}
else
{
QString errorMsg = QString("Request HistoricalData error: %1").arg(/*reply->request().url().toDisplayString(),*/ reply->errorString());
emit historicalQueryError(errorMsg);
}
reply->deleteLater();
}