feat:添加实时数据处理模块(集成服务通信接口)
This commit is contained in:
parent
6a68212a9a
commit
83b6848734
|
|
@ -13,6 +13,7 @@ find_package(QT NAMES Qt6 Qt5 REQUIRED COMPONENTS Widgets)
|
|||
find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Widgets)
|
||||
find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS PrintSupport)
|
||||
find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Network)
|
||||
find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS WebSockets)
|
||||
find_package(Qt${QT_VERSION_MAJOR} REQUIRED COMPONENTS Concurrent)
|
||||
|
||||
set(ADS_VERSION 4.3.1)
|
||||
|
|
@ -43,11 +44,14 @@ set(H_HEADER_FILES
|
|||
include/dataPanel.h
|
||||
include/dataLoader.h
|
||||
include/dataManager.h
|
||||
include/measurementDataManager.h
|
||||
include/measurementDataUtils.h
|
||||
include/panelSelectionDialog.h
|
||||
include/panelConfigurationWidget.h
|
||||
include/dateTimeWidget.h
|
||||
include/customCalendarWidget.h
|
||||
include/dateTimeSelectionPanel.h
|
||||
include/networkCommon.h
|
||||
include/httpRequestManager.h
|
||||
include/alarmEventGlobal.h
|
||||
include/alarmEventMainDialog.h
|
||||
|
|
@ -81,6 +85,8 @@ set(CPP_SOURCE_FILES
|
|||
source/dataPanel.cpp
|
||||
source/dataLoader.cpp
|
||||
source/dataManager.cpp
|
||||
source/measurementDataManager.cpp
|
||||
source/measurementDataUtils.cpp
|
||||
source/panelSelectionDialog.cpp
|
||||
source/panelConfigurationWidget.cpp
|
||||
source/dateTimeWidget.cpp
|
||||
|
|
@ -189,6 +195,7 @@ target_include_directories(PowerMaster PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/incl
|
|||
target_include_directories(PowerMaster PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/qamqp/src")
|
||||
target_link_libraries(PowerMaster PRIVATE Qt${QT_VERSION_MAJOR}::PrintSupport)
|
||||
target_link_libraries(PowerMaster PRIVATE Qt${QT_VERSION_MAJOR}::Network)
|
||||
target_link_libraries(PowerMaster PRIVATE Qt${QT_VERSION_MAJOR}::WebSockets)
|
||||
target_link_libraries(PowerMaster PRIVATE Qt${QT_VERSION_MAJOR}::Concurrent)
|
||||
target_link_libraries(PowerMaster PRIVATE qt${QT_VERSION_MAJOR}advanceddocking)
|
||||
target_link_libraries(PowerMaster PUBLIC Qt${QT_VERSION_MAJOR}::Core
|
||||
|
|
@ -220,7 +227,7 @@ install(TARGETS PowerMaster
|
|||
# 定义配置文件和目标目录
|
||||
set(CONFIG_FILES
|
||||
alarmEventService_config.ini
|
||||
realTimeDataService_config.ini
|
||||
measurementDataService_config.ini
|
||||
log_config.ini
|
||||
)
|
||||
set(CONFIG_FILE_DIR "${CMAKE_BINARY_DIR}/config")
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
#ifndef ALARMEVENTDATASERVICE_H
|
||||
#define ALARMEVENTDATASERVICE_H
|
||||
|
||||
#include "networkCommon.h"
|
||||
#include "alarmEventGlobal.h"
|
||||
#include "alarmEventUtils.h"
|
||||
#include "qamqpclient.h"
|
||||
|
|
|
|||
|
|
@ -161,51 +161,6 @@ struct ServiceConfig
|
|||
QJsonObject toJson() const;
|
||||
};
|
||||
|
||||
///////------enums-----
|
||||
//服务状态
|
||||
enum class ServiceStatus
|
||||
{
|
||||
Uninitialized,//未初始化
|
||||
//Initialized, //已初始化
|
||||
Disconnected, //未连接
|
||||
Connecting, //连接中
|
||||
Connected, //已连接
|
||||
Error, //错误状态
|
||||
Reconnecting //重连中
|
||||
};
|
||||
|
||||
//连接状态
|
||||
enum class ConnectionStatus
|
||||
{
|
||||
Disconnected,
|
||||
Connecting,
|
||||
Connected
|
||||
};
|
||||
|
||||
//错误严重程度
|
||||
enum class ErrorSeverity
|
||||
{
|
||||
Info, //信息
|
||||
Warning, //警告
|
||||
Error, //错误
|
||||
Critical //严重错误
|
||||
};
|
||||
|
||||
//消息处理结果
|
||||
enum class MessageHandleResult
|
||||
{
|
||||
Success,
|
||||
ParseError, //解析错误
|
||||
ValidationError, //验证错误
|
||||
Duplicate //重复消息
|
||||
};
|
||||
|
||||
//注册为元类型,用于信号槽的chuandi
|
||||
Q_DECLARE_METATYPE(ServiceStatus)
|
||||
Q_DECLARE_METATYPE(ConnectionStatus)
|
||||
Q_DECLARE_METATYPE(ErrorSeverity)
|
||||
Q_DECLARE_METATYPE(MessageHandleResult)
|
||||
|
||||
///////------classes-----
|
||||
class AlarmEventDataFilter
|
||||
{
|
||||
|
|
|
|||
|
|
@ -0,0 +1,85 @@
|
|||
#ifndef MEASUREMENTDATAMANAGER_H
|
||||
#define MEASUREMENTDATAMANAGER_H
|
||||
|
||||
#include <QObject>
|
||||
class QSettings;
|
||||
class QNetworkAccessManager;
|
||||
class QNetworkReply;
|
||||
class WebSocketClient;
|
||||
|
||||
class MeasurementDataManager : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
Q_DISABLE_COPY(MeasurementDataManager) //禁止拷贝,等价于:DataManager(const DataManager&) = delete; DataManager& operator=(const DataManager&) = delete;
|
||||
|
||||
public:
|
||||
static MeasurementDataManager* instance();
|
||||
|
||||
void startSubscription(const QStringList& measurements);
|
||||
void stopSubscription();
|
||||
void addSubscriptionData(const QStringList& measurements);
|
||||
void removeSubscriptionData(const QStringList& measurements);
|
||||
void getSubscriptionRealtimeData();
|
||||
|
||||
private slots:
|
||||
void onReciveRealtimeData(const QString& dataMsg);
|
||||
|
||||
private:
|
||||
struct RealtimeConfig
|
||||
{
|
||||
QString host;
|
||||
int port;
|
||||
QString httpPath;
|
||||
QString websocketPath;
|
||||
int timeout; //请求超时(毫秒)
|
||||
int maxRetries; //最大重试次数
|
||||
int retryInterval; //重试间隔
|
||||
|
||||
RealtimeConfig()
|
||||
: timeout(3000)
|
||||
, maxRetries(3)
|
||||
, retryInterval(1000)
|
||||
{}
|
||||
};
|
||||
|
||||
struct HistoricalConfig
|
||||
{
|
||||
QString host;
|
||||
int port;
|
||||
int timeout; //请求超时(毫秒)
|
||||
int maxRetries; //最大重试次数
|
||||
int retryInterval; //重试间隔
|
||||
|
||||
HistoricalConfig()
|
||||
: timeout(3000)
|
||||
, maxRetries(3)
|
||||
, retryInterval(1000)
|
||||
{}
|
||||
};
|
||||
|
||||
struct ServiceConfig
|
||||
{
|
||||
RealtimeConfig realtimeCfg;
|
||||
HistoricalConfig historicalCfg;
|
||||
};
|
||||
|
||||
explicit MeasurementDataManager();
|
||||
~MeasurementDataManager();
|
||||
|
||||
void initialize();
|
||||
void buildSubscriptionRequest(const QString& action, const QStringList& measurements);
|
||||
void processSubscriptionResponse(const QString& action, QNetworkReply* reply);
|
||||
|
||||
//配置相关
|
||||
ServiceConfig m_serviceConfig;
|
||||
QSettings* m_settings;
|
||||
QString m_settingsFile;
|
||||
bool m_isVaildSettingsFile;
|
||||
|
||||
//服务相关
|
||||
QString m_clientID;
|
||||
QNetworkAccessManager* m_networkManager;
|
||||
WebSocketClient* m_webSocketClient;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
#ifndef MEASUREMENTDATAUTILS_H
|
||||
#define MEASUREMENTDATAUTILS_H
|
||||
|
||||
#include "networkCommon.h"
|
||||
#include <QObject>
|
||||
#include <QWebSocket>
|
||||
/**
|
||||
* @brief WebSocket数据客户端
|
||||
*
|
||||
* 负责与后台数据服务建立WebSocket连接,接收实时数据
|
||||
*/
|
||||
class QTimer;
|
||||
class WebSocketClient : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
explicit WebSocketClient(QObject* parent = nullptr);
|
||||
~WebSocketClient();
|
||||
|
||||
//连接管理
|
||||
bool connectToServer(const QUrl& url);
|
||||
void disconnectFromServer();
|
||||
//bool isConnected();
|
||||
|
||||
//数据配置
|
||||
void setReconnectInterval(int intervalMs);
|
||||
void setMaxReconnectAttempts(int maxAttempts);
|
||||
|
||||
signals:
|
||||
void dataReceived(const QString& dataMsg);
|
||||
|
||||
private slots:
|
||||
void onConnected();
|
||||
void onDisconnected();
|
||||
void onError(QAbstractSocket::SocketError error);
|
||||
void onTextMessageReceived(const QString& message);
|
||||
void onReconnectTimeout();
|
||||
void resetReconnect();
|
||||
|
||||
private:
|
||||
void setupWebSocket();
|
||||
void cleanupWebSocket();
|
||||
void scheduleReconnect();
|
||||
|
||||
QWebSocket* m_webSocket;
|
||||
QUrl m_serverUrl;
|
||||
|
||||
//重连相关
|
||||
QTimer* m_reconnectTimer;
|
||||
int m_reconnectInterval;
|
||||
int m_reconnectAttempts;
|
||||
int m_maxReconnectAttempts;
|
||||
|
||||
// 状态
|
||||
bool m_connected;
|
||||
ConnectionStatus m_connectionStatus;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
#ifndef NETWORKCOMMON_H
|
||||
#define NETWORKCOMMON_H
|
||||
|
||||
#include <QObject>
|
||||
|
||||
///////------enums-----
|
||||
//服务状态
|
||||
enum class ServiceStatus
|
||||
{
|
||||
Uninitialized,//未初始化
|
||||
//Initialized, //已初始化
|
||||
Disconnected, //未连接
|
||||
Connecting, //连接中
|
||||
Connected, //已连接
|
||||
Error, //错误状态
|
||||
Reconnecting //重连中
|
||||
};
|
||||
|
||||
//连接状态
|
||||
enum class ConnectionStatus
|
||||
{
|
||||
Connected,
|
||||
Connecting,
|
||||
Disconnected,
|
||||
Reconncting,
|
||||
Error
|
||||
};
|
||||
|
||||
//错误严重程度
|
||||
enum class ErrorSeverity
|
||||
{
|
||||
Info, //信息
|
||||
Warning, //警告
|
||||
Error, //错误
|
||||
Critical //严重错误
|
||||
};
|
||||
|
||||
//消息处理结果
|
||||
enum class MessageHandleResult
|
||||
{
|
||||
Success,
|
||||
ParseError, //解析错误
|
||||
ValidationError, //验证错误
|
||||
Duplicate //重复消息
|
||||
};
|
||||
|
||||
//注册为元类型,用于信号槽的传递
|
||||
Q_DECLARE_METATYPE(ServiceStatus)
|
||||
Q_DECLARE_METATYPE(ConnectionStatus)
|
||||
Q_DECLARE_METATYPE(ErrorSeverity)
|
||||
Q_DECLARE_METATYPE(MessageHandleResult)
|
||||
|
||||
#endif
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
[RealtimeData]
|
||||
host=192.168.46.100
|
||||
port=10080
|
||||
httpPath=monitors/data/subscriptions
|
||||
websocketPath=monitors/data/realtime/stream
|
||||
|
||||
[HistoricalData]
|
||||
host=192.168.46.100
|
||||
port=10080
|
||||
|
|
@ -47,8 +47,8 @@ AlarmEventDataService::~AlarmEventDataService()
|
|||
void AlarmEventDataService::initialize()
|
||||
{
|
||||
//读取配置文件
|
||||
QString cofigDir = QCoreApplication::applicationDirPath();
|
||||
m_settingsFile = cofigDir + "/config/alarmEventService_config.ini";
|
||||
QString configDir = QCoreApplication::applicationDirPath();
|
||||
m_settingsFile = configDir + "/config/alarmEventService_config.ini";
|
||||
QFile file(m_settingsFile);
|
||||
if(file.open(QIODevice::ReadWrite))
|
||||
{
|
||||
|
|
@ -185,11 +185,15 @@ void AlarmEventDataService::cleanupRabbitMQConnection()
|
|||
|
||||
void AlarmEventDataService::scheduleReconnect()
|
||||
{
|
||||
if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Reconnecting)
|
||||
return;
|
||||
|
||||
if (m_reconnectAttempts < m_maxReconnectAttempts)
|
||||
{
|
||||
int delay = m_config.rabbitMQConfig.reconnectInterval * (1 /*<< m_reconnectAttempts*/); // 指数退避,<<n相当于乘以2的n次方
|
||||
m_reconnectTimer->start(delay);
|
||||
m_reconnectAttempts++;
|
||||
m_serviceStatus = ServiceStatus::Reconnecting;
|
||||
}
|
||||
else
|
||||
qInfo() << "Maximum reconnection attempts reached";
|
||||
|
|
@ -375,10 +379,6 @@ void AlarmEventDataService::onMessageReceived()
|
|||
|
||||
void AlarmEventDataService::onReconnectTimeout()
|
||||
{
|
||||
if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Reconnecting)
|
||||
return;
|
||||
|
||||
m_serviceStatus = ServiceStatus::Reconnecting;
|
||||
startRealTimeDataService();
|
||||
}
|
||||
|
||||
|
|
@ -388,13 +388,13 @@ void AlarmEventDataService::onHistoricalRequestFinished(QNetworkReply* reply)
|
|||
{
|
||||
QByteArray data = reply->readAll();
|
||||
processHistoricalResponse(data);
|
||||
LOG_INFO("Http", QString(data));
|
||||
LOG_INFO("Http-QueryHistoricalEvents", QString(data));
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("Request HistoricalData error: %1").arg(/*reply->request().url().toDisplayString(),*/ reply->errorString());
|
||||
emit historicalQueryError(errorMsg);
|
||||
LOG_ERROR("Http", errorMsg);
|
||||
LOG_ERROR("Http-QueryHistoricalEvents", errorMsg);
|
||||
}
|
||||
|
||||
reply->deleteLater();
|
||||
|
|
@ -422,7 +422,7 @@ void AlarmEventDataService::onConfirmEventsRequestFinished(QNetworkReply* reply)
|
|||
if(uuidValue.isArray())
|
||||
{
|
||||
QJsonArray uuidArray = uuidValue.toArray();
|
||||
for(const QJsonValue& value : uuidArray)
|
||||
for(const QJsonValue& value : std::as_const(uuidArray))
|
||||
{
|
||||
if(value.isString())
|
||||
successUuids.append(value.toString());
|
||||
|
|
@ -433,28 +433,28 @@ void AlarmEventDataService::onConfirmEventsRequestFinished(QNetworkReply* reply)
|
|||
else
|
||||
{
|
||||
QString errorMsg = QString("ConfirmEvents reply error, msg: %1").arg(msg);
|
||||
LOG_ERROR("Http", errorMsg);
|
||||
LOG_ERROR("Http-ConfirmEvents", errorMsg);
|
||||
emit confirmEventsResult(false, QString("确认事件失败(%1)").arg(msg), successUuids, m_currentRequestId);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("ConfirmEvents reply Json is not object: %1").arg(QString(data));
|
||||
LOG_ERROR("Http", errorMsg);
|
||||
LOG_ERROR("Http-ConfirmEvents", errorMsg);
|
||||
emit confirmEventsResult(false, QString("确认事件失败(Json is not object)"), successUuids, m_currentRequestId);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("ConfirmEvents reply Json parse error: %1").arg(parseError.errorString());
|
||||
LOG_ERROR("Http", errorMsg);
|
||||
LOG_ERROR("Http-ConfirmEvents", errorMsg);
|
||||
emit confirmEventsResult(false, QString("确认事件失败(JsonParseError)"), successUuids, m_currentRequestId);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("ConfirmEvents reply Network error: %1").arg(reply->errorString());
|
||||
LOG_ERROR("Http", errorMsg);
|
||||
LOG_ERROR("Http-ConfirmEvents", errorMsg);
|
||||
emit confirmEventsResult(false, QString("确认事件失败(Reply Network error)"), successUuids, m_currentRequestId);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -42,9 +42,9 @@ AlarmEventDataModel::AlarmEventDataModel(AlarmDataMode mode, QObject* parent)
|
|||
connect(AlarmEventDataService::instance(), &AlarmEventDataService::confirmEventsResult, this, &AlarmEventDataModel::onConfirmEventsResult);
|
||||
|
||||
//实时数据测试
|
||||
m_simulatedDataTimer = new QTimer(this);
|
||||
/*m_simulatedDataTimer = new QTimer(this);
|
||||
connect(m_simulatedDataTimer, &QTimer::timeout, this, &AlarmEventDataModel::onTimeoutSimulateData);
|
||||
m_simulatedDataTimer->start(5000);
|
||||
m_simulatedDataTimer->start(5000);*/
|
||||
}
|
||||
|
||||
AlarmEventDataModel::~AlarmEventDataModel()
|
||||
|
|
@ -503,12 +503,12 @@ void AlarmEventDataModel::onConfirmEventsResult(bool success, const QString& mes
|
|||
QModelIndex topLeft = createIndex(0, Status);
|
||||
QModelIndex bottomRight = createIndex(m_displayEvents.count() - 1, Status);
|
||||
emit dataChanged(topLeft, bottomRight, {Qt::DisplayRole, Qt::ForegroundRole});
|
||||
qDebug() << "realtime view confirm refresh";
|
||||
//qDebug() << "realtime view confirm refresh";
|
||||
//endResetModel();
|
||||
}
|
||||
else
|
||||
{
|
||||
qDebug() << "historical view confirm refresh";
|
||||
//qDebug() << "historical view confirm refresh";
|
||||
refresh();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,273 @@
|
|||
#include "measurementDataManager.h"
|
||||
#include "logger.h"
|
||||
#include "measurementDataUtils.h"
|
||||
#include <QNetworkAccessManager>
|
||||
#include <QNetworkRequest>
|
||||
#include <QUrlQuery>
|
||||
#include <QNetworkReply>
|
||||
#include <QWebSocket>
|
||||
#include <QTimer>
|
||||
#include <QFile>
|
||||
#include <QSettings>
|
||||
#include <QJsonObject>
|
||||
#include <QJsonDocument>
|
||||
#include <QJsonArray>
|
||||
#include <QJsonParseError>
|
||||
#include <QCoreApplication>
|
||||
|
||||
MeasurementDataManager* MeasurementDataManager::instance()
|
||||
{
|
||||
static MeasurementDataManager instance;
|
||||
return &instance;
|
||||
}
|
||||
|
||||
MeasurementDataManager::MeasurementDataManager()
|
||||
: m_clientID("")
|
||||
, m_networkManager(new QNetworkAccessManager(this))
|
||||
, m_webSocketClient(new WebSocketClient(this))
|
||||
{
|
||||
initialize();
|
||||
connect(m_webSocketClient, &WebSocketClient::dataReceived, this, &MeasurementDataManager::onReciveRealtimeData);
|
||||
}
|
||||
|
||||
MeasurementDataManager::~MeasurementDataManager()
|
||||
{}
|
||||
|
||||
void MeasurementDataManager::initialize()
|
||||
{
|
||||
//读取配置文件
|
||||
QString configDir = QCoreApplication::applicationDirPath();
|
||||
m_settingsFile = configDir + "/config/measurementDataService_config.ini";
|
||||
QFile file(m_settingsFile);
|
||||
if(file.open(QIODevice::ReadWrite))
|
||||
{
|
||||
m_isVaildSettingsFile = true;
|
||||
m_settings = new QSettings(m_settingsFile, QSettings::IniFormat);
|
||||
|
||||
m_serviceConfig.realtimeCfg.host = m_settings->value("RealtimeData/host").toString();
|
||||
m_serviceConfig.realtimeCfg.port = m_settings->value("RealtimeData/port").toInt();
|
||||
m_serviceConfig.realtimeCfg.httpPath = m_settings->value("RealtimeData/httpPath").toString();
|
||||
m_serviceConfig.realtimeCfg.websocketPath = m_settings->value("RealtimeData/websocketPath").toString();
|
||||
|
||||
m_serviceConfig.historicalCfg.host = m_settings->value("HistoricalData/host").toString();
|
||||
m_serviceConfig.historicalCfg.port = m_settings->value("HistoricalData/port").toInt();
|
||||
}
|
||||
else
|
||||
{
|
||||
m_isVaildSettingsFile = false;
|
||||
LOG_ERROR("ReadSettingInfo", "load measurementDataService_config.ini failed");
|
||||
}
|
||||
}
|
||||
|
||||
void MeasurementDataManager::buildSubscriptionRequest(const QString& action, const QStringList& measurements)
|
||||
{
|
||||
if(!m_isVaildSettingsFile)
|
||||
return;
|
||||
|
||||
QUrl url;
|
||||
url.setScheme("http");
|
||||
url.setHost(m_serviceConfig.realtimeCfg.host);
|
||||
url.setPort(m_serviceConfig.realtimeCfg.port);
|
||||
url.setPath(m_serviceConfig.realtimeCfg.httpPath);
|
||||
if(!url.isValid())
|
||||
return ;
|
||||
|
||||
//创建网络请求
|
||||
QNetworkRequest request(url);
|
||||
request.setTransferTimeout(m_serviceConfig.realtimeCfg.timeout);
|
||||
request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json");
|
||||
//创建json格式请求数据
|
||||
QJsonArray targetsArray;
|
||||
for(const QString& meaurement : measurements)
|
||||
targetsArray.append(meaurement);
|
||||
QJsonObject measurementObj;
|
||||
measurementObj.insert("interval", "20ms");
|
||||
measurementObj.insert("targets", targetsArray);
|
||||
QJsonArray measurementArray;
|
||||
measurementArray.append(measurementObj);
|
||||
QJsonObject requestObj;
|
||||
QString actionValue = "";
|
||||
if(action == "start" || action == "add")
|
||||
actionValue = "start";
|
||||
else if(action == "stop")
|
||||
actionValue = "stop";
|
||||
requestObj.insert("action", actionValue);
|
||||
if(action != "start")
|
||||
requestObj.insert("client_id", m_clientID);
|
||||
requestObj.insert("measurements", measurementArray);
|
||||
QJsonDocument requestJson(requestObj);
|
||||
//发送请求
|
||||
QNetworkReply* reply = m_networkManager->post(request, requestJson.toJson());
|
||||
connect(reply, &QNetworkReply::finished, this, [this, action, reply](){
|
||||
processSubscriptionResponse(action, reply);
|
||||
});
|
||||
}
|
||||
|
||||
void MeasurementDataManager::startSubscription(const QStringList& measurements)
|
||||
{
|
||||
if(!m_clientID.isEmpty())
|
||||
return;
|
||||
|
||||
buildSubscriptionRequest("start", measurements);
|
||||
}
|
||||
|
||||
void MeasurementDataManager::addSubscriptionData(const QStringList& measurements)
|
||||
{
|
||||
if(m_clientID.isEmpty())
|
||||
return;
|
||||
|
||||
buildSubscriptionRequest("add", measurements);
|
||||
}
|
||||
|
||||
void MeasurementDataManager::removeSubscriptionData(const QStringList& measurements)
|
||||
{
|
||||
if(m_clientID.isEmpty())
|
||||
return;
|
||||
|
||||
buildSubscriptionRequest("stop", measurements);
|
||||
}
|
||||
|
||||
void MeasurementDataManager::processSubscriptionResponse(const QString& action, QNetworkReply* reply)
|
||||
{
|
||||
if(reply->error() == QNetworkReply::NoError)
|
||||
{
|
||||
QByteArray data = reply->readAll();
|
||||
QJsonParseError parseError;
|
||||
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
|
||||
|
||||
if(parseError.error == QJsonParseError::NoError && doc.isObject())
|
||||
{
|
||||
QJsonObject docObj = doc.object();
|
||||
QString msg = docObj.value("msg").toString();
|
||||
if(msg == "success")
|
||||
{
|
||||
QJsonValue payloadValue = docObj.value("payload");
|
||||
if(payloadValue.isObject())
|
||||
{
|
||||
QJsonObject payloadObj = payloadValue.toObject();
|
||||
if(action == "start")
|
||||
m_clientID = payloadObj.value("client_id").toString();
|
||||
QJsonValue targetsValue = payloadObj.value("targets");
|
||||
if(targetsValue.isArray())
|
||||
{
|
||||
QJsonArray targetsArray = targetsValue.toArray();
|
||||
for(const QJsonValue& targetValue : std::as_const(targetsArray))
|
||||
{
|
||||
if(!targetValue.isObject())
|
||||
continue;
|
||||
|
||||
QJsonObject targetObj = targetValue.toObject();
|
||||
QString code = targetObj.value("code").toString();
|
||||
if(action == "start" || action == "add")
|
||||
{
|
||||
if(code == "1001")//订阅成功
|
||||
{
|
||||
QString measurementID = targetObj.value("id").toString();
|
||||
}
|
||||
else //if(code == "1002") //订阅失败
|
||||
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
|
||||
}
|
||||
else if(action == "stop" )
|
||||
{
|
||||
if(code == "1005")//取消订阅成功
|
||||
{
|
||||
QString measurementID = targetObj.value("id").toString();
|
||||
}
|
||||
else //if(code == "1005") //取消订阅失败
|
||||
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("Subscription measurementData reply Json 'payload' is not object: %1").arg(QString(data));
|
||||
LOG_ERROR("Http-MeasurementDat", errorMsg);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("Subscription measurementData failed: %1").arg(msg);
|
||||
LOG_ERROR("Http-MeasurementData", errorMsg);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("Subscription measurementData reply Json parse error: %1").arg(parseError.errorString());
|
||||
LOG_ERROR("Http-MeasurementData", errorMsg);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("Subscription measurementData reply Network error: %1").arg(reply->errorString());
|
||||
LOG_ERROR("Http-MeasurementData", errorMsg);
|
||||
}
|
||||
|
||||
reply->deleteLater();
|
||||
}
|
||||
|
||||
void MeasurementDataManager::getSubscriptionRealtimeData()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void MeasurementDataManager::onReciveRealtimeData(const QString& dataMsg)
|
||||
{
|
||||
QJsonParseError parseError;
|
||||
QJsonDocument doc = QJsonDocument::fromJson(dataMsg.toUtf8(), &parseError);
|
||||
if(parseError.error == QJsonParseError::NoError && doc.isObject())
|
||||
{
|
||||
QJsonObject docObj = doc.object();
|
||||
QString msg = docObj.value("msg").toString();
|
||||
if(msg == "success")
|
||||
{
|
||||
QJsonValue payloadValue = docObj.value("payload");
|
||||
if(payloadValue.isObject())
|
||||
{
|
||||
QJsonObject payloadObj = payloadValue.toObject();
|
||||
QJsonValue targetsValue = payloadObj.value("targets");
|
||||
if(targetsValue.isArray())
|
||||
{
|
||||
QJsonArray targetsArray = targetsValue.toArray();
|
||||
for(const QJsonValue& targetValue : std::as_const(targetsArray))
|
||||
{
|
||||
if(!targetValue.isObject())
|
||||
continue;
|
||||
|
||||
QJsonObject targetObj = targetValue.toObject();
|
||||
QString dataID = targetObj.value("id").toString();
|
||||
QJsonValue datasValue = targetObj.value("datas");
|
||||
if(datasValue.isArray())
|
||||
{
|
||||
QJsonArray datasArray = datasValue.toArray();
|
||||
for(const QJsonValue& dataValue : std::as_const(datasArray))
|
||||
{
|
||||
if(!dataValue.isObject())
|
||||
continue;
|
||||
|
||||
QJsonObject dataObj = dataValue.toObject();
|
||||
QString timestamp = dataObj.value("time").toString();
|
||||
QString value = dataObj.value("value").toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("Realtime measurementData reply Json 'payload' is not object: %1").arg(dataMsg);
|
||||
LOG_ERROR("WebSockt-MeasurementDat", errorMsg);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("Realtime measurementData get error: %1").arg(parseError.errorString());
|
||||
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
QString errorMsg = QString("Realtime measurementData Json parse error: %1").arg(parseError.errorString());
|
||||
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,154 @@
|
|||
#include "measurementDataUtils.h"
|
||||
#include "logger.h"
|
||||
#include <QTimer>
|
||||
#include <QRandomGenerator>
|
||||
|
||||
WebSocketClient::WebSocketClient(QObject* parent)
|
||||
: QObject(parent)
|
||||
, m_webSocket(nullptr)
|
||||
, m_connected(false)
|
||||
, m_connectionStatus(ConnectionStatus::Disconnected)
|
||||
, m_reconnectTimer(new QTimer(this))
|
||||
, m_reconnectInterval(5000) //默认5秒重连
|
||||
, m_reconnectAttempts(0)
|
||||
, m_maxReconnectAttempts(10) //默认最多重连10次
|
||||
{
|
||||
setupWebSocket();
|
||||
|
||||
m_reconnectTimer->setSingleShot(true);
|
||||
connect(m_reconnectTimer, &QTimer::timeout, this, &WebSocketClient::onReconnectTimeout);
|
||||
}
|
||||
|
||||
WebSocketClient::~WebSocketClient()
|
||||
{
|
||||
disconnectFromServer();
|
||||
cleanupWebSocket();
|
||||
}
|
||||
|
||||
bool WebSocketClient::connectToServer(const QUrl& url)
|
||||
{
|
||||
if(!url.isValid())
|
||||
{
|
||||
QString errorMsg = QString("Invalid server URL: %1").arg(url.toString());
|
||||
LOG_ERROR("WebSocket", errorMsg);
|
||||
return false;
|
||||
}
|
||||
|
||||
//if(m_webSocket && (m_webSocket->state() == QAbstractSocket::ConnectedState || m_webSocket->state() == QAbstractSocket::ConnectingState))
|
||||
if(m_connectionStatus == ConnectionStatus::Connecting || m_connectionStatus == ConnectionStatus::Connected)
|
||||
return true;
|
||||
|
||||
m_serverUrl = url;
|
||||
m_connectionStatus = ConnectionStatus::Connecting;
|
||||
|
||||
if(!m_webSocket)
|
||||
setupWebSocket();
|
||||
m_webSocket->open(url);
|
||||
return true;
|
||||
}
|
||||
|
||||
void WebSocketClient::disconnectFromServer()
|
||||
{
|
||||
if(m_webSocket)
|
||||
{
|
||||
m_webSocket->close();
|
||||
m_connected = false;
|
||||
m_connectionStatus = ConnectionStatus::Disconnected;
|
||||
|
||||
resetReconnect();
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketClient::setReconnectInterval(int intervalMs)
|
||||
{
|
||||
if(intervalMs > 0 && intervalMs != m_reconnectInterval)
|
||||
m_reconnectInterval = intervalMs;
|
||||
}
|
||||
|
||||
void WebSocketClient::setMaxReconnectAttempts(int maxAttempts)
|
||||
{
|
||||
if(maxAttempts >=0 && maxAttempts != m_maxReconnectAttempts)
|
||||
m_maxReconnectAttempts = maxAttempts;
|
||||
}
|
||||
|
||||
void WebSocketClient::setupWebSocket()
|
||||
{
|
||||
cleanupWebSocket();
|
||||
|
||||
m_webSocket = new QWebSocket();
|
||||
connect(m_webSocket, &QWebSocket::connected, this, &WebSocketClient::onConnected);
|
||||
connect(m_webSocket, &QWebSocket::disconnected, this, &WebSocketClient::onDisconnected);
|
||||
connect(m_webSocket, &QWebSocket::errorOccurred, this, &WebSocketClient::onError);
|
||||
connect(m_webSocket, &QWebSocket::textMessageReceived, this, &WebSocketClient::onTextMessageReceived);
|
||||
}
|
||||
|
||||
void WebSocketClient::cleanupWebSocket()
|
||||
{
|
||||
if(m_webSocket)
|
||||
{
|
||||
m_webSocket->disconnected();
|
||||
m_webSocket->deleteLater();
|
||||
m_webSocket = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketClient::scheduleReconnect()
|
||||
{
|
||||
if(m_connectionStatus == ConnectionStatus::Disconnected || m_connectionStatus == ConnectionStatus::Error)
|
||||
return;
|
||||
|
||||
if(m_reconnectAttempts < m_maxReconnectAttempts)
|
||||
{
|
||||
int delay = m_reconnectInterval * (1 /*<< m_reconnectAttempts*/);// 指数退避,<<n相当于乘以2的n次方
|
||||
// 添加随机抖动避免多个客户端同时重连
|
||||
/*int jitter = QRandomGenerator::global()->bounded(0, 1000);
|
||||
delay += jitter;
|
||||
// 限制最大延迟为5分钟
|
||||
delay = qMin(delay, 300000);*/
|
||||
|
||||
m_reconnectTimer->start(delay);
|
||||
m_reconnectAttempts++;
|
||||
m_connectionStatus = ConnectionStatus::Reconncting;
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketClient::resetReconnect()
|
||||
{
|
||||
if(m_reconnectTimer->isActive())
|
||||
m_reconnectTimer->stop();
|
||||
m_reconnectAttempts = 0;
|
||||
}
|
||||
|
||||
///槽函数
|
||||
void WebSocketClient::onConnected()
|
||||
{
|
||||
m_connected = true;
|
||||
m_connectionStatus = ConnectionStatus::Connected;
|
||||
resetReconnect();
|
||||
}
|
||||
|
||||
void WebSocketClient::onDisconnected()
|
||||
{
|
||||
m_connected = false;
|
||||
m_connectionStatus = ConnectionStatus::Disconnected;
|
||||
scheduleReconnect();
|
||||
}
|
||||
|
||||
void WebSocketClient::onError(QAbstractSocket::SocketError error)
|
||||
{
|
||||
QString errorMsg = m_webSocket ? m_webSocket->errorString() : "Unknown WebSocket error";
|
||||
LOG_ERROR("WebSocket", errorMsg);
|
||||
scheduleReconnect();
|
||||
}
|
||||
|
||||
void WebSocketClient::onTextMessageReceived(const QString& message)
|
||||
{
|
||||
emit dataReceived(message);
|
||||
}
|
||||
|
||||
void WebSocketClient::onReconnectTimeout()
|
||||
{
|
||||
if(!m_webSocket)
|
||||
setupWebSocket();
|
||||
m_webSocket->open(m_serverUrl);
|
||||
}
|
||||
Loading…
Reference in New Issue