#include "measurementDataManager.h" #include "logger.h" #include #include #include #include #include #include #include #include #include #include #include #include #include MeasurementDataManager* MeasurementDataManager::instance() { static MeasurementDataManager instance; return &instance; } MeasurementDataManager::MeasurementDataManager() : m_clientID("") , m_networkManager(new QNetworkAccessManager(this)) , m_webSocketClient(new WebSocketClient(this)) { initialize(); connect(m_webSocketClient, &WebSocketClient::dataReceived, this, &MeasurementDataManager::onReceiveRealtimeData); } MeasurementDataManager::~MeasurementDataManager() {} void MeasurementDataManager::initialize() { //读取配置文件 QString configDir = QCoreApplication::applicationDirPath(); m_settingsFile = configDir + "/config/measurementDataService_config.ini"; QFile file(m_settingsFile); if(file.open(QIODevice::ReadWrite)) { m_isVaildSettingsFile = true; m_settings = new QSettings(m_settingsFile, QSettings::IniFormat); m_serviceConfig.realtimeCfg.host = m_settings->value("RealtimeData/host").toString(); m_serviceConfig.realtimeCfg.port = m_settings->value("RealtimeData/port").toInt(); m_serviceConfig.realtimeCfg.httpPath = m_settings->value("RealtimeData/httpPath").toString(); m_serviceConfig.realtimeCfg.websocketPath = m_settings->value("RealtimeData/websocketPath").toString(); m_serviceConfig.historicalCfg.baseUrl = m_settings->value("HistoricalData/baseUrl").toUrl(); } else { m_isVaildSettingsFile = false; LOG_ERROR("ReadSettingInfo", "load measurementDataService_config.ini failed"); } //开启缓存数据的定期清理 m_dataCleanupInterval = 10 * 1000; m_dataCleanupTimer.setInterval(m_dataCleanupInterval); connect(&m_dataCleanupTimer, &QTimer::timeout, this, &MeasurementDataManager::cleanUpDataCache); } void MeasurementDataManager::buildSubscriptionRequest(const QString& action, const QStringList& measurements) { if(!m_isVaildSettingsFile) return; QUrl url; url.setScheme("http"); url.setHost(m_serviceConfig.realtimeCfg.host); url.setPort(m_serviceConfig.realtimeCfg.port); url.setPath(m_serviceConfig.realtimeCfg.httpPath); if(!url.isValid()) return ; //创建网络请求 QNetworkRequest request(url); request.setTransferTimeout(m_serviceConfig.realtimeCfg.timeout); request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json"); //创建json格式请求数据 QJsonArray targetsArray; for(const QString& meaurement : measurements) targetsArray.append(meaurement); QJsonObject measurementObj; measurementObj.insert("interval", "500ms"); measurementObj.insert("targets", targetsArray); QJsonArray measurementArray; measurementArray.append(measurementObj); QJsonObject requestObj; QString actionValue = ""; if(action == "start" || action == "add") actionValue = "start"; else if(action == "stop") actionValue = "stop"; requestObj.insert("action", actionValue); if(action != "start") requestObj.insert("client_id", m_clientID); requestObj.insert("measurements", measurementArray); QJsonDocument requestJson(requestObj); //发送请求 QNetworkReply* reply = m_networkManager->post(request, requestJson.toJson()); connect(reply, &QNetworkReply::finished, this, [this, action, reply](){ processSubscriptionResponse(action, reply); }); } void MeasurementDataManager::startSubscription(const QStringList& measurements) { if(!m_clientID.isEmpty()) return; buildSubscriptionRequest("start", measurements); } void MeasurementDataManager::addSubscriptionData(const QStringList& measurements) { if(m_clientID.isEmpty()) { buildSubscriptionRequest("start", measurements); return; } buildSubscriptionRequest("add", measurements); } void MeasurementDataManager::removeSubscriptionData(const QStringList& measurements) { if(m_clientID.isEmpty()) return; buildSubscriptionRequest("stop", measurements); } void MeasurementDataManager::stopSubscription() { m_webSocketClient->disconnectFromServer(); m_clientID.clear(); } void MeasurementDataManager::processSubscriptionResponse(const QString& action, QNetworkReply* reply) { if(reply->error() == QNetworkReply::NoError) { QByteArray data = reply->readAll(); QJsonParseError parseError; QJsonDocument doc = QJsonDocument::fromJson(data, &parseError); if(parseError.error == QJsonParseError::NoError && doc.isObject()) { QJsonObject docObj = doc.object(); QString msg = docObj.value("msg").toString(); if(msg == "success") { QJsonValue payloadValue = docObj.value("payload"); if(payloadValue.isObject()) { QJsonObject payloadObj = payloadValue.toObject(); if(action == "start") { m_clientID = payloadObj.value("client_id").toString(); getSubscriptionRealtimeData(); } QJsonValue targetsValue = payloadObj.value("targets"); if(targetsValue.isArray()) { QJsonArray targetsArray = targetsValue.toArray(); for(const QJsonValue& targetValue : std::as_const(targetsArray)) { if(!targetValue.isObject()) continue; QJsonObject targetObj = targetValue.toObject(); QString code = targetObj.value("code").toString(); if(action == "start" || action == "add") { if(code == "1001")//订阅成功 { QString measurementID = targetObj.value("id").toString(); } else //if(code == "1002") //订阅失败 LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString()); } else if(action == "stop" ) { if(code == "1005")//取消订阅成功 { QString measurementID = targetObj.value("id").toString(); } else //if(code == "1005") //取消订阅失败 LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString()); } } } } else { QString errorMsg = QString("Subscription measurementData reply Json 'payload' is not object: %1").arg(QString(data)); LOG_ERROR("Http-MeasurementDat", errorMsg); } } else { QString errorMsg = QString("Subscription measurementData failed: %1").arg(msg); LOG_ERROR("Http-MeasurementData", errorMsg); } } else { QString errorMsg = QString("Subscription measurementData reply Json parse error: %1").arg(parseError.errorString()); LOG_ERROR("Http-MeasurementData", errorMsg); } } else { QString errorMsg = QString("Subscription measurementData reply Network error: %1").arg(reply->errorString()); LOG_ERROR("Http-MeasurementData", errorMsg); } reply->deleteLater(); } QUrl MeasurementDataManager::buildHistoricalQueryUrl(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type) { QUrl url = m_serviceConfig.historicalCfg.baseUrl; QUrlQuery query; query.addQueryItem("mtag", dataID); query.addQueryItem("type", type); query.addQueryItem("step", step); query.addQueryItem("begin", QString::number(startTime.toMSecsSinceEpoch())); query.addQueryItem("end", QString::number(endTime.toMSecsSinceEpoch())); url.setQuery(query); return url; } void MeasurementDataManager::processHistoricalResponse(const QString& dataID, const QByteArray& data) { QJsonParseError parseError; QJsonDocument doc = QJsonDocument::fromJson(data, &parseError); if(parseError.error != QJsonParseError::NoError) { QString errorMsg = QString("Historical Events JSON parse error: %1 at offset %2") .arg(parseError.errorString()) .arg(parseError.offset); LOG_ERROR("Http-MeasurementData-Historical", errorMsg); return; } if(!doc.isObject()) { QString errorMsg = "Historical Events JSON document is not an object"; LOG_ERROR("Http-MeasurementData-Historical", errorMsg); return ; } QJsonObject obj = doc.object(); int code = obj.value("code").toInt(); if(code != 0) { QString msg = obj.value("msg").toString(); LOG_ERROR("Http-MeasurementData-Historical", msg); return ; } QJsonValue dataValue = obj.value("data"); if(dataValue.isObject()) { QJsonObject dataObj = dataValue.toObject(); QJsonValue values = dataObj.value(dataID); if(values.isArray()) { QJsonArray valueArray = values.toArray(); int size = valueArray.size(); for(int i = 0; i < size; ++i) { if(!valueArray.at(i).isObject()) continue; QJsonObject valueObj = valueArray.at(i).toObject(); qint64 timeStamp = valueObj.value("time").toString().toLongLong(); double value = dataObj.value("value").toDouble(); } } } } void MeasurementDataManager::getSubscriptionRealtimeData() { if(m_clientID.isEmpty()) return; QString strUrl = QString("ws:://%1:%2/%3/%4").arg(m_serviceConfig.realtimeCfg.host , QString::number(m_serviceConfig.realtimeCfg.port) , m_serviceConfig.realtimeCfg.websocketPath , m_clientID); QUrl url(strUrl); if(!m_webSocketClient->connectToServer(url)) { QString errorMsg = QString("Connect to WebSocketServer failed"); LOG_ERROR("WebSocket-MeasurementData", errorMsg); } } void MeasurementDataManager::requestData(const QStringList& measurements, QObject* requester) { QStringList newRequestDatas; for(const QString& dataID : measurements) { auto it = m_dataRequesetRecord.find(dataID); if(it != m_dataRequesetRecord.end() && !it.value().contains(requester)) it.value().insert(requester); else { QSet requesters; requesters.insert(requester); m_dataRequesetRecord.insert(dataID, requesters); newRequestDatas.push_back(dataID); } } if(!newRequestDatas.isEmpty()) addSubscriptionData(newRequestDatas); } void MeasurementDataManager::cancelRequest(const QStringList& measurements, QObject* requester) { QStringList removeRequestDatas; for(const QString& dataID : measurements) { auto it = m_dataRequesetRecord.find(dataID); if(it != m_dataRequesetRecord.end()) { it.value().remove(requester); if(it.value().isEmpty()) removeRequestDatas.push_back(dataID); } } if(!removeRequestDatas.isEmpty()) { removeSubscriptionData(removeRequestDatas); for(const QString& dataID : std::as_const(removeRequestDatas)) { m_dataRequesetRecord.remove(dataID); m_dataCache.remove(dataID); } } } MeasurementDataPoint MeasurementDataManager::getLastestRealtimeData(const QString& dataID) { if(!m_dataCache.contains(dataID)) return MeasurementDataPoint(); QVector result = m_dataCache.value(dataID)->getLastestData(1); if(result.size() > 0) return result.at(0); else return MeasurementDataPoint(); } void MeasurementDataManager::queryHistoricalEvents(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type) { QUrl url = buildHistoricalQueryUrl(dataID, startTime, endTime, step, type); if(!url.isValid()) return; //创建网络请求 QNetworkRequest request(url); request.setTransferTimeout(m_serviceConfig.historicalCfg.timeout); //发送请求 QNetworkReply* reply = m_networkManager->get(request); connect(reply, &QNetworkReply::finished, this, [this, dataID, reply](){ onHistoricalRequestFinished(dataID, reply); }); } void MeasurementDataManager::onReceiveRealtimeData(const QString& dataMsg) { QJsonParseError parseError; QJsonDocument doc = QJsonDocument::fromJson(dataMsg.toUtf8(), &parseError); if(parseError.error == QJsonParseError::NoError && doc.isObject()) { QJsonObject docObj = doc.object(); int code = docObj.value("code").toInt(); //QString msg = docObj.value("msg").toString(); if(code == 200) { QJsonValue payloadValue = docObj.value("payload"); if(payloadValue.isObject()) { QJsonObject payloadObj = payloadValue.toObject(); QJsonValue targetsValue = payloadObj.value("targets"); if(targetsValue.isArray()) { QJsonArray targetsArray = targetsValue.toArray(); for(const QJsonValue& targetValue : std::as_const(targetsArray)) { if(!targetValue.isObject()) continue; QJsonObject targetObj = targetValue.toObject(); QString dataID = targetObj.value("id").toString(); QSharedPointer measurementData; if(m_dataCache.contains(dataID)) measurementData = m_dataCache.value(dataID); else { measurementData = QSharedPointer::create(dataID); m_dataCache.insert(dataID, measurementData); } QJsonValue datasValue = targetObj.value("datas"); if(datasValue.isArray()) { QJsonArray datasArray = datasValue.toArray(); QVector dataPoints; dataPoints.reserve(datasArray.size()); for(const QJsonValue& dataValue : std::as_const(datasArray)) { if(!dataValue.isObject()) continue; QJsonObject dataObj = dataValue.toObject(); qint64 timestamp = dataObj.value("time").toString().toLongLong(); double value = dataObj.value("value").toDouble(); dataPoints.append(MeasurementDataPoint(timestamp, value)); } //插入数据 measurementData->insertData(dataPoints); } } } } else { QString errorMsg = QString("Realtime measurementData reply Json 'payload' is not object: %1").arg(dataMsg); LOG_ERROR("WebSockt-MeasurementData", errorMsg); } } else if(code == 2101) //服务端协程断开,此链接已无效 { m_webSocketClient->disconnectFromServer(); m_clientID.clear(); } } else { QString errorMsg = QString("Realtime measurementData Json parse error: %1").arg(parseError.errorString()); LOG_ERROR("WebSockt-MeasurementData", errorMsg); } } void MeasurementDataManager::onHistoricalRequestFinished(const QString& dataID, QNetworkReply* reply) { if(reply->error() == QNetworkReply::NoError) { QByteArray data = reply->readAll(); processHistoricalResponse(dataID, data); LOG_INFO("Http-MeasurementData-Historical", QString(data)); } else { QString errorMsg = QString("Request HistoricalData error: %1").arg(/*reply->request().url().toDisplayString(),*/ reply->errorString()); LOG_ERROR("Http-MeasurementData-Historical", errorMsg); } reply->deleteLater(); } void MeasurementDataManager::cleanUpDataCache() { qint64 cleanupTimestame = QDateTime::currentMSecsSinceEpoch() - (m_dataCleanupInterval * 1000); for(auto measurementData : std::as_const(m_dataCache)) { measurementData->cleanupOldData(cleanupTimestame); } }