From 83b6848734a00c9810175dcd329e654fd0478248 Mon Sep 17 00:00:00 2001 From: duanshengchao <519970194@qq.com> Date: Fri, 19 Dec 2025 09:45:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=B7=BB=E5=8A=A0=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E6=A8=A1=E5=9D=97(?= =?UTF-8?q?=E9=9B=86=E6=88=90=E6=9C=8D=E5=8A=A1=E9=80=9A=E4=BF=A1=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 9 +- include/alarmEventDataService.h | 1 + include/alarmEventUtils.h | 45 ----- include/measurementDataManager.h | 85 ++++++++++ include/measurementDataUtils.h | 60 +++++++ include/networkCommon.h | 53 ++++++ measurementDataService_config.ini | 9 + realTimeDataService_config.ini | 0 source/alarmEventDataService.cpp | 26 +-- source/alarmEventDataView.cpp | 8 +- source/measurementDataManager.cpp | 273 ++++++++++++++++++++++++++++++ source/measurementDataUtils.cpp | 154 +++++++++++++++++ 12 files changed, 660 insertions(+), 63 deletions(-) create mode 100644 include/measurementDataManager.h create mode 100644 include/measurementDataUtils.h create mode 100644 include/networkCommon.h create mode 100644 measurementDataService_config.ini delete mode 100644 realTimeDataService_config.ini create mode 100644 source/measurementDataManager.cpp create mode 100644 source/measurementDataUtils.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 7596019..f601abe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ find_package(QT NAMES Qt6 Qt5 REQUIRED COMPONENTS Widgets) find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Widgets) find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS PrintSupport) find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Network) +find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS WebSockets) find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Concurrent) set(ADS_VERSION 4.3.1) @@ -43,11 +44,14 @@ set(H_HEADER_FILES include/dataPanel.h include/dataLoader.h include/dataManager.h + include/measurementDataManager.h + include/measurementDataUtils.h include/panelSelectionDialog.h include/panelConfigurationWidget.h include/dateTimeWidget.h include/customCalendarWidget.h include/dateTimeSelectionPanel.h + include/networkCommon.h include/httpRequestManager.h include/alarmEventGlobal.h include/alarmEventMainDialog.h @@ -81,6 +85,8 @@ set(CPP_SOURCE_FILES source/dataPanel.cpp source/dataLoader.cpp source/dataManager.cpp + source/measurementDataManager.cpp + source/measurementDataUtils.cpp source/panelSelectionDialog.cpp source/panelConfigurationWidget.cpp source/dateTimeWidget.cpp @@ -189,6 +195,7 @@ target_include_directories(PowerMaster PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/incl target_include_directories(PowerMaster PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/qamqp/src") target_link_libraries(PowerMaster PRIVATE Qt${QT_VERSION_MAJOR}::PrintSupport) target_link_libraries(PowerMaster PRIVATE Qt${QT_VERSION_MAJOR}::Network) +target_link_libraries(PowerMaster PRIVATE Qt${QT_VERSION_MAJOR}::WebSockets) target_link_libraries(PowerMaster PRIVATE Qt${QT_VERSION_MAJOR}::Concurrent) target_link_libraries(PowerMaster PRIVATE qt${QT_VERSION_MAJOR}advanceddocking) target_link_libraries(PowerMaster PUBLIC Qt${QT_VERSION_MAJOR}::Core @@ -220,7 +227,7 @@ install(TARGETS PowerMaster # 定义配置文件和目标目录 set(CONFIG_FILES alarmEventService_config.ini - realTimeDataService_config.ini + measurementDataService_config.ini log_config.ini ) set(CONFIG_FILE_DIR "${CMAKE_BINARY_DIR}/config") diff --git a/include/alarmEventDataService.h b/include/alarmEventDataService.h index a69d649..ab1672d 100644 --- a/include/alarmEventDataService.h +++ b/include/alarmEventDataService.h @@ -1,6 +1,7 @@ #ifndef ALARMEVENTDATASERVICE_H #define ALARMEVENTDATASERVICE_H +#include "networkCommon.h" #include "alarmEventGlobal.h" #include "alarmEventUtils.h" #include "qamqpclient.h" diff --git a/include/alarmEventUtils.h b/include/alarmEventUtils.h index 6f4c2f6..071f7b3 100644 --- a/include/alarmEventUtils.h +++ b/include/alarmEventUtils.h @@ -161,51 +161,6 @@ struct ServiceConfig QJsonObject toJson() const; }; -///////------enums----- -//服务状态 -enum class ServiceStatus -{ - Uninitialized,//未初始化 - //Initialized, //已初始化 - Disconnected, //未连接 - Connecting, //连接中 - Connected, //已连接 - Error, //错误状态 - Reconnecting //重连中 -}; - -//连接状态 -enum class ConnectionStatus -{ - Disconnected, - Connecting, - Connected -}; - -//错误严重程度 -enum class ErrorSeverity -{ - Info, //信息 - Warning, //警告 - Error, //错误 - Critical //严重错误 -}; - -//消息处理结果 -enum class MessageHandleResult -{ - Success, - ParseError, //解析错误 - ValidationError, //验证错误 - Duplicate //重复消息 -}; - -//注册为元类型,用于信号槽的chuandi -Q_DECLARE_METATYPE(ServiceStatus) -Q_DECLARE_METATYPE(ConnectionStatus) -Q_DECLARE_METATYPE(ErrorSeverity) -Q_DECLARE_METATYPE(MessageHandleResult) - ///////------classes----- class AlarmEventDataFilter { diff --git a/include/measurementDataManager.h b/include/measurementDataManager.h new file mode 100644 index 0000000..9af4b92 --- /dev/null +++ b/include/measurementDataManager.h @@ -0,0 +1,85 @@ +#ifndef MEASUREMENTDATAMANAGER_H +#define MEASUREMENTDATAMANAGER_H + +#include +class QSettings; +class QNetworkAccessManager; +class QNetworkReply; +class WebSocketClient; + +class MeasurementDataManager : public QObject +{ + Q_OBJECT + Q_DISABLE_COPY(MeasurementDataManager) //禁止拷贝,等价于:DataManager(const DataManager&) = delete; DataManager& operator=(const DataManager&) = delete; + +public: + static MeasurementDataManager* instance(); + + void startSubscription(const QStringList& measurements); + void stopSubscription(); + void addSubscriptionData(const QStringList& measurements); + void removeSubscriptionData(const QStringList& measurements); + void getSubscriptionRealtimeData(); + +private slots: + void onReciveRealtimeData(const QString& dataMsg); + +private: + struct RealtimeConfig + { + QString host; + int port; + QString httpPath; + QString websocketPath; + int timeout; //请求超时(毫秒) + int maxRetries; //最大重试次数 + int retryInterval; //重试间隔 + + RealtimeConfig() + : timeout(3000) + , maxRetries(3) + , retryInterval(1000) + {} + }; + + struct HistoricalConfig + { + QString host; + int port; + int timeout; //请求超时(毫秒) + int maxRetries; //最大重试次数 + int retryInterval; //重试间隔 + + HistoricalConfig() + : timeout(3000) + , maxRetries(3) + , retryInterval(1000) + {} + }; + + struct ServiceConfig + { + RealtimeConfig realtimeCfg; + HistoricalConfig historicalCfg; + }; + + explicit MeasurementDataManager(); + ~MeasurementDataManager(); + + void initialize(); + void buildSubscriptionRequest(const QString& action, const QStringList& measurements); + void processSubscriptionResponse(const QString& action, QNetworkReply* reply); + + //配置相关 + ServiceConfig m_serviceConfig; + QSettings* m_settings; + QString m_settingsFile; + bool m_isVaildSettingsFile; + + //服务相关 + QString m_clientID; + QNetworkAccessManager* m_networkManager; + WebSocketClient* m_webSocketClient; +}; + +#endif diff --git a/include/measurementDataUtils.h b/include/measurementDataUtils.h new file mode 100644 index 0000000..829de15 --- /dev/null +++ b/include/measurementDataUtils.h @@ -0,0 +1,60 @@ +#ifndef MEASUREMENTDATAUTILS_H +#define MEASUREMENTDATAUTILS_H + +#include "networkCommon.h" +#include +#include +/** + * @brief WebSocket数据客户端 + * + * 负责与后台数据服务建立WebSocket连接,接收实时数据 + */ +class QTimer; +class WebSocketClient : public QObject +{ + Q_OBJECT + +public: + explicit WebSocketClient(QObject* parent = nullptr); + ~WebSocketClient(); + + //连接管理 + bool connectToServer(const QUrl& url); + void disconnectFromServer(); + //bool isConnected(); + + //数据配置 + void setReconnectInterval(int intervalMs); + void setMaxReconnectAttempts(int maxAttempts); + +signals: + void dataReceived(const QString& dataMsg); + +private slots: + void onConnected(); + void onDisconnected(); + void onError(QAbstractSocket::SocketError error); + void onTextMessageReceived(const QString& message); + void onReconnectTimeout(); + void resetReconnect(); + +private: + void setupWebSocket(); + void cleanupWebSocket(); + void scheduleReconnect(); + + QWebSocket* m_webSocket; + QUrl m_serverUrl; + + //重连相关 + QTimer* m_reconnectTimer; + int m_reconnectInterval; + int m_reconnectAttempts; + int m_maxReconnectAttempts; + + // 状态 + bool m_connected; + ConnectionStatus m_connectionStatus; +}; + +#endif diff --git a/include/networkCommon.h b/include/networkCommon.h new file mode 100644 index 0000000..52e8f80 --- /dev/null +++ b/include/networkCommon.h @@ -0,0 +1,53 @@ +#ifndef NETWORKCOMMON_H +#define NETWORKCOMMON_H + +#include + +///////------enums----- +//服务状态 +enum class ServiceStatus +{ + Uninitialized,//未初始化 + //Initialized, //已初始化 + Disconnected, //未连接 + Connecting, //连接中 + Connected, //已连接 + Error, //错误状态 + Reconnecting //重连中 +}; + +//连接状态 +enum class ConnectionStatus +{ + Connected, + Connecting, + Disconnected, + Reconncting, + Error +}; + +//错误严重程度 +enum class ErrorSeverity +{ + Info, //信息 + Warning, //警告 + Error, //错误 + Critical //严重错误 +}; + +//消息处理结果 +enum class MessageHandleResult +{ + Success, + ParseError, //解析错误 + ValidationError, //验证错误 + Duplicate //重复消息 +}; + +//注册为元类型,用于信号槽的传递 +Q_DECLARE_METATYPE(ServiceStatus) +Q_DECLARE_METATYPE(ConnectionStatus) +Q_DECLARE_METATYPE(ErrorSeverity) +Q_DECLARE_METATYPE(MessageHandleResult) + +#endif diff --git a/measurementDataService_config.ini b/measurementDataService_config.ini new file mode 100644 index 0000000..aa80e6b --- /dev/null +++ b/measurementDataService_config.ini @@ -0,0 +1,9 @@ +[RealtimeData] +host=192.168.46.100 +port=10080 +httpPath=monitors/data/subscriptions +websocketPath=monitors/data/realtime/stream + +[HistoricalData] +host=192.168.46.100 +port=10080 \ No newline at end of file diff --git a/realTimeDataService_config.ini b/realTimeDataService_config.ini deleted file mode 100644 index e69de29..0000000 diff --git a/source/alarmEventDataService.cpp b/source/alarmEventDataService.cpp index 415079e..15288fb 100644 --- a/source/alarmEventDataService.cpp +++ b/source/alarmEventDataService.cpp @@ -47,8 +47,8 @@ AlarmEventDataService::~AlarmEventDataService() void AlarmEventDataService::initialize() { //读取配置文件 - QString cofigDir = QCoreApplication::applicationDirPath(); - m_settingsFile = cofigDir + "/config/alarmEventService_config.ini"; + QString configDir = QCoreApplication::applicationDirPath(); + m_settingsFile = configDir + "/config/alarmEventService_config.ini"; QFile file(m_settingsFile); if(file.open(QIODevice::ReadWrite)) { @@ -185,11 +185,15 @@ void AlarmEventDataService::cleanupRabbitMQConnection() void AlarmEventDataService::scheduleReconnect() { + if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Reconnecting) + return; + if (m_reconnectAttempts < m_maxReconnectAttempts) { int delay = m_config.rabbitMQConfig.reconnectInterval * (1 /*<< m_reconnectAttempts*/); // 指数退避,<start(delay); m_reconnectAttempts++; + m_serviceStatus = ServiceStatus::Reconnecting; } else qInfo() << "Maximum reconnection attempts reached"; @@ -375,10 +379,6 @@ void AlarmEventDataService::onMessageReceived() void AlarmEventDataService::onReconnectTimeout() { - if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Reconnecting) - return; - - m_serviceStatus = ServiceStatus::Reconnecting; startRealTimeDataService(); } @@ -388,13 +388,13 @@ void AlarmEventDataService::onHistoricalRequestFinished(QNetworkReply* reply) { QByteArray data = reply->readAll(); processHistoricalResponse(data); - LOG_INFO("Http", QString(data)); + LOG_INFO("Http-QueryHistoricalEvents", QString(data)); } else { QString errorMsg = QString("Request HistoricalData error: %1").arg(/*reply->request().url().toDisplayString(),*/ reply->errorString()); emit historicalQueryError(errorMsg); - LOG_ERROR("Http", errorMsg); + LOG_ERROR("Http-QueryHistoricalEvents", errorMsg); } reply->deleteLater(); @@ -422,7 +422,7 @@ void AlarmEventDataService::onConfirmEventsRequestFinished(QNetworkReply* reply) if(uuidValue.isArray()) { QJsonArray uuidArray = uuidValue.toArray(); - for(const QJsonValue& value : uuidArray) + for(const QJsonValue& value : std::as_const(uuidArray)) { if(value.isString()) successUuids.append(value.toString()); @@ -433,28 +433,28 @@ void AlarmEventDataService::onConfirmEventsRequestFinished(QNetworkReply* reply) else { QString errorMsg = QString("ConfirmEvents reply error, msg: %1").arg(msg); - LOG_ERROR("Http", errorMsg); + LOG_ERROR("Http-ConfirmEvents", errorMsg); emit confirmEventsResult(false, QString("确认事件失败(%1)").arg(msg), successUuids, m_currentRequestId); } } else { QString errorMsg = QString("ConfirmEvents reply Json is not object: %1").arg(QString(data)); - LOG_ERROR("Http", errorMsg); + LOG_ERROR("Http-ConfirmEvents", errorMsg); emit confirmEventsResult(false, QString("确认事件失败(Json is not object)"), successUuids, m_currentRequestId); } } else { QString errorMsg = QString("ConfirmEvents reply Json parse error: %1").arg(parseError.errorString()); - LOG_ERROR("Http", errorMsg); + LOG_ERROR("Http-ConfirmEvents", errorMsg); emit confirmEventsResult(false, QString("确认事件失败(JsonParseError)"), successUuids, m_currentRequestId); } } else { QString errorMsg = QString("ConfirmEvents reply Network error: %1").arg(reply->errorString()); - LOG_ERROR("Http", errorMsg); + LOG_ERROR("Http-ConfirmEvents", errorMsg); emit confirmEventsResult(false, QString("确认事件失败(Reply Network error)"), successUuids, m_currentRequestId); } diff --git a/source/alarmEventDataView.cpp b/source/alarmEventDataView.cpp index 4eba270..6a2975b 100644 --- a/source/alarmEventDataView.cpp +++ b/source/alarmEventDataView.cpp @@ -42,9 +42,9 @@ AlarmEventDataModel::AlarmEventDataModel(AlarmDataMode mode, QObject* parent) connect(AlarmEventDataService::instance(), &AlarmEventDataService::confirmEventsResult, this, &AlarmEventDataModel::onConfirmEventsResult); //实时数据测试 - m_simulatedDataTimer = new QTimer(this); + /*m_simulatedDataTimer = new QTimer(this); connect(m_simulatedDataTimer, &QTimer::timeout, this, &AlarmEventDataModel::onTimeoutSimulateData); - m_simulatedDataTimer->start(5000); + m_simulatedDataTimer->start(5000);*/ } AlarmEventDataModel::~AlarmEventDataModel() @@ -503,12 +503,12 @@ void AlarmEventDataModel::onConfirmEventsResult(bool success, const QString& mes QModelIndex topLeft = createIndex(0, Status); QModelIndex bottomRight = createIndex(m_displayEvents.count() - 1, Status); emit dataChanged(topLeft, bottomRight, {Qt::DisplayRole, Qt::ForegroundRole}); - qDebug() << "realtime view confirm refresh"; + //qDebug() << "realtime view confirm refresh"; //endResetModel(); } else { - qDebug() << "historical view confirm refresh"; + //qDebug() << "historical view confirm refresh"; refresh(); } } diff --git a/source/measurementDataManager.cpp b/source/measurementDataManager.cpp new file mode 100644 index 0000000..a89f3f1 --- /dev/null +++ b/source/measurementDataManager.cpp @@ -0,0 +1,273 @@ +#include "measurementDataManager.h" +#include "logger.h" +#include "measurementDataUtils.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +MeasurementDataManager* MeasurementDataManager::instance() +{ + static MeasurementDataManager instance; + return &instance; +} + +MeasurementDataManager::MeasurementDataManager() + : m_clientID("") + , m_networkManager(new QNetworkAccessManager(this)) + , m_webSocketClient(new WebSocketClient(this)) +{ + initialize(); + connect(m_webSocketClient, &WebSocketClient::dataReceived, this, &MeasurementDataManager::onReciveRealtimeData); +} + +MeasurementDataManager::~MeasurementDataManager() +{} + +void MeasurementDataManager::initialize() +{ + //读取配置文件 + QString configDir = QCoreApplication::applicationDirPath(); + m_settingsFile = configDir + "/config/measurementDataService_config.ini"; + QFile file(m_settingsFile); + if(file.open(QIODevice::ReadWrite)) + { + m_isVaildSettingsFile = true; + m_settings = new QSettings(m_settingsFile, QSettings::IniFormat); + + m_serviceConfig.realtimeCfg.host = m_settings->value("RealtimeData/host").toString(); + m_serviceConfig.realtimeCfg.port = m_settings->value("RealtimeData/port").toInt(); + m_serviceConfig.realtimeCfg.httpPath = m_settings->value("RealtimeData/httpPath").toString(); + m_serviceConfig.realtimeCfg.websocketPath = m_settings->value("RealtimeData/websocketPath").toString(); + + m_serviceConfig.historicalCfg.host = m_settings->value("HistoricalData/host").toString(); + m_serviceConfig.historicalCfg.port = m_settings->value("HistoricalData/port").toInt(); + } + else + { + m_isVaildSettingsFile = false; + LOG_ERROR("ReadSettingInfo", "load measurementDataService_config.ini failed"); + } +} + +void MeasurementDataManager::buildSubscriptionRequest(const QString& action, const QStringList& measurements) +{ + if(!m_isVaildSettingsFile) + return; + + QUrl url; + url.setScheme("http"); + url.setHost(m_serviceConfig.realtimeCfg.host); + url.setPort(m_serviceConfig.realtimeCfg.port); + url.setPath(m_serviceConfig.realtimeCfg.httpPath); + if(!url.isValid()) + return ; + + //创建网络请求 + QNetworkRequest request(url); + request.setTransferTimeout(m_serviceConfig.realtimeCfg.timeout); + request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json"); + //创建json格式请求数据 + QJsonArray targetsArray; + for(const QString& meaurement : measurements) + targetsArray.append(meaurement); + QJsonObject measurementObj; + measurementObj.insert("interval", "20ms"); + measurementObj.insert("targets", targetsArray); + QJsonArray measurementArray; + measurementArray.append(measurementObj); + QJsonObject requestObj; + QString actionValue = ""; + if(action == "start" || action == "add") + actionValue = "start"; + else if(action == "stop") + actionValue = "stop"; + requestObj.insert("action", actionValue); + if(action != "start") + requestObj.insert("client_id", m_clientID); + requestObj.insert("measurements", measurementArray); + QJsonDocument requestJson(requestObj); + //发送请求 + QNetworkReply* reply = m_networkManager->post(request, requestJson.toJson()); + connect(reply, &QNetworkReply::finished, this, [this, action, reply](){ + processSubscriptionResponse(action, reply); + }); +} + +void MeasurementDataManager::startSubscription(const QStringList& measurements) +{ + if(!m_clientID.isEmpty()) + return; + + buildSubscriptionRequest("start", measurements); +} + +void MeasurementDataManager::addSubscriptionData(const QStringList& measurements) +{ + if(m_clientID.isEmpty()) + return; + + buildSubscriptionRequest("add", measurements); +} + +void MeasurementDataManager::removeSubscriptionData(const QStringList& measurements) +{ + if(m_clientID.isEmpty()) + return; + + buildSubscriptionRequest("stop", measurements); +} + +void MeasurementDataManager::processSubscriptionResponse(const QString& action, QNetworkReply* reply) +{ + if(reply->error() == QNetworkReply::NoError) + { + QByteArray data = reply->readAll(); + QJsonParseError parseError; + QJsonDocument doc = QJsonDocument::fromJson(data, &parseError); + + if(parseError.error == QJsonParseError::NoError && doc.isObject()) + { + QJsonObject docObj = doc.object(); + QString msg = docObj.value("msg").toString(); + if(msg == "success") + { + QJsonValue payloadValue = docObj.value("payload"); + if(payloadValue.isObject()) + { + QJsonObject payloadObj = payloadValue.toObject(); + if(action == "start") + m_clientID = payloadObj.value("client_id").toString(); + QJsonValue targetsValue = payloadObj.value("targets"); + if(targetsValue.isArray()) + { + QJsonArray targetsArray = targetsValue.toArray(); + for(const QJsonValue& targetValue : std::as_const(targetsArray)) + { + if(!targetValue.isObject()) + continue; + + QJsonObject targetObj = targetValue.toObject(); + QString code = targetObj.value("code").toString(); + if(action == "start" || action == "add") + { + if(code == "1001")//订阅成功 + { + QString measurementID = targetObj.value("id").toString(); + } + else //if(code == "1002") //订阅失败 + LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString()); + } + else if(action == "stop" ) + { + if(code == "1005")//取消订阅成功 + { + QString measurementID = targetObj.value("id").toString(); + } + else //if(code == "1005") //取消订阅失败 + LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString()); + } + } + } + } + else + { + QString errorMsg = QString("Subscription measurementData reply Json 'payload' is not object: %1").arg(QString(data)); + LOG_ERROR("Http-MeasurementDat", errorMsg); + } + } + else + { + QString errorMsg = QString("Subscription measurementData failed: %1").arg(msg); + LOG_ERROR("Http-MeasurementData", errorMsg); + } + } + else + { + QString errorMsg = QString("Subscription measurementData reply Json parse error: %1").arg(parseError.errorString()); + LOG_ERROR("Http-MeasurementData", errorMsg); + } + } + else + { + QString errorMsg = QString("Subscription measurementData reply Network error: %1").arg(reply->errorString()); + LOG_ERROR("Http-MeasurementData", errorMsg); + } + + reply->deleteLater(); +} + +void MeasurementDataManager::getSubscriptionRealtimeData() +{ + +} + +void MeasurementDataManager::onReciveRealtimeData(const QString& dataMsg) +{ + QJsonParseError parseError; + QJsonDocument doc = QJsonDocument::fromJson(dataMsg.toUtf8(), &parseError); + if(parseError.error == QJsonParseError::NoError && doc.isObject()) + { + QJsonObject docObj = doc.object(); + QString msg = docObj.value("msg").toString(); + if(msg == "success") + { + QJsonValue payloadValue = docObj.value("payload"); + if(payloadValue.isObject()) + { + QJsonObject payloadObj = payloadValue.toObject(); + QJsonValue targetsValue = payloadObj.value("targets"); + if(targetsValue.isArray()) + { + QJsonArray targetsArray = targetsValue.toArray(); + for(const QJsonValue& targetValue : std::as_const(targetsArray)) + { + if(!targetValue.isObject()) + continue; + + QJsonObject targetObj = targetValue.toObject(); + QString dataID = targetObj.value("id").toString(); + QJsonValue datasValue = targetObj.value("datas"); + if(datasValue.isArray()) + { + QJsonArray datasArray = datasValue.toArray(); + for(const QJsonValue& dataValue : std::as_const(datasArray)) + { + if(!dataValue.isObject()) + continue; + + QJsonObject dataObj = dataValue.toObject(); + QString timestamp = dataObj.value("time").toString(); + QString value = dataObj.value("value").toString(); + } + } + } + } + } + else + { + QString errorMsg = QString("Realtime measurementData reply Json 'payload' is not object: %1").arg(dataMsg); + LOG_ERROR("WebSockt-MeasurementDat", errorMsg); + } + } + else + { + QString errorMsg = QString("Realtime measurementData get error: %1").arg(parseError.errorString()); + LOG_ERROR("WebSockt-MeasurementData", errorMsg); + } + } + else + { + QString errorMsg = QString("Realtime measurementData Json parse error: %1").arg(parseError.errorString()); + LOG_ERROR("WebSockt-MeasurementData", errorMsg); + } +} diff --git a/source/measurementDataUtils.cpp b/source/measurementDataUtils.cpp new file mode 100644 index 0000000..bc3862f --- /dev/null +++ b/source/measurementDataUtils.cpp @@ -0,0 +1,154 @@ +#include "measurementDataUtils.h" +#include "logger.h" +#include +#include + +WebSocketClient::WebSocketClient(QObject* parent) + : QObject(parent) + , m_webSocket(nullptr) + , m_connected(false) + , m_connectionStatus(ConnectionStatus::Disconnected) + , m_reconnectTimer(new QTimer(this)) + , m_reconnectInterval(5000) //默认5秒重连 + , m_reconnectAttempts(0) + , m_maxReconnectAttempts(10) //默认最多重连10次 +{ + setupWebSocket(); + + m_reconnectTimer->setSingleShot(true); + connect(m_reconnectTimer, &QTimer::timeout, this, &WebSocketClient::onReconnectTimeout); +} + +WebSocketClient::~WebSocketClient() +{ + disconnectFromServer(); + cleanupWebSocket(); +} + +bool WebSocketClient::connectToServer(const QUrl& url) +{ + if(!url.isValid()) + { + QString errorMsg = QString("Invalid server URL: %1").arg(url.toString()); + LOG_ERROR("WebSocket", errorMsg); + return false; + } + + //if(m_webSocket && (m_webSocket->state() == QAbstractSocket::ConnectedState || m_webSocket->state() == QAbstractSocket::ConnectingState)) + if(m_connectionStatus == ConnectionStatus::Connecting || m_connectionStatus == ConnectionStatus::Connected) + return true; + + m_serverUrl = url; + m_connectionStatus = ConnectionStatus::Connecting; + + if(!m_webSocket) + setupWebSocket(); + m_webSocket->open(url); + return true; +} + +void WebSocketClient::disconnectFromServer() +{ + if(m_webSocket) + { + m_webSocket->close(); + m_connected = false; + m_connectionStatus = ConnectionStatus::Disconnected; + + resetReconnect(); + } +} + +void WebSocketClient::setReconnectInterval(int intervalMs) +{ + if(intervalMs > 0 && intervalMs != m_reconnectInterval) + m_reconnectInterval = intervalMs; +} + +void WebSocketClient::setMaxReconnectAttempts(int maxAttempts) +{ + if(maxAttempts >=0 && maxAttempts != m_maxReconnectAttempts) + m_maxReconnectAttempts = maxAttempts; +} + +void WebSocketClient::setupWebSocket() +{ + cleanupWebSocket(); + + m_webSocket = new QWebSocket(); + connect(m_webSocket, &QWebSocket::connected, this, &WebSocketClient::onConnected); + connect(m_webSocket, &QWebSocket::disconnected, this, &WebSocketClient::onDisconnected); + connect(m_webSocket, &QWebSocket::errorOccurred, this, &WebSocketClient::onError); + connect(m_webSocket, &QWebSocket::textMessageReceived, this, &WebSocketClient::onTextMessageReceived); +} + +void WebSocketClient::cleanupWebSocket() +{ + if(m_webSocket) + { + m_webSocket->disconnected(); + m_webSocket->deleteLater(); + m_webSocket = nullptr; + } +} + +void WebSocketClient::scheduleReconnect() +{ + if(m_connectionStatus == ConnectionStatus::Disconnected || m_connectionStatus == ConnectionStatus::Error) + return; + + if(m_reconnectAttempts < m_maxReconnectAttempts) + { + int delay = m_reconnectInterval * (1 /*<< m_reconnectAttempts*/);// 指数退避,<bounded(0, 1000); + delay += jitter; + // 限制最大延迟为5分钟 + delay = qMin(delay, 300000);*/ + + m_reconnectTimer->start(delay); + m_reconnectAttempts++; + m_connectionStatus = ConnectionStatus::Reconncting; + } +} + +void WebSocketClient::resetReconnect() +{ + if(m_reconnectTimer->isActive()) + m_reconnectTimer->stop(); + m_reconnectAttempts = 0; +} + +///槽函数 +void WebSocketClient::onConnected() +{ + m_connected = true; + m_connectionStatus = ConnectionStatus::Connected; + resetReconnect(); +} + +void WebSocketClient::onDisconnected() +{ + m_connected = false; + m_connectionStatus = ConnectionStatus::Disconnected; + scheduleReconnect(); +} + +void WebSocketClient::onError(QAbstractSocket::SocketError error) +{ + QString errorMsg = m_webSocket ? m_webSocket->errorString() : "Unknown WebSocket error"; + LOG_ERROR("WebSocket", errorMsg); + scheduleReconnect(); +} + +void WebSocketClient::onTextMessageReceived(const QString& message) +{ + emit dataReceived(message); +} + +void WebSocketClient::onReconnectTimeout() +{ + if(!m_webSocket) + setupWebSocket(); + m_webSocket->open(m_serverUrl); +}