PowerMaster/source/measurementDataManager.cpp

411 lines
15 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "measurementDataManager.h"
#include "logger.h"
#include <QNetworkAccessManager>
#include <QNetworkRequest>
#include <QUrlQuery>
#include <QNetworkReply>
#include <QWebSocket>
#include <QTimer>
#include <QFile>
#include <QSettings>
#include <QJsonObject>
#include <QJsonDocument>
#include <QJsonArray>
#include <QJsonParseError>
#include <QCoreApplication>
MeasurementDataManager* MeasurementDataManager::instance()
{
static MeasurementDataManager instance;
return &instance;
}
MeasurementDataManager::MeasurementDataManager()
: m_clientID("")
, m_networkManager(new QNetworkAccessManager(this))
, m_webSocketClient(new WebSocketClient(this))
{
initialize();
connect(m_webSocketClient, &WebSocketClient::dataReceived, this, &MeasurementDataManager::onReceiveRealtimeData);
}
MeasurementDataManager::~MeasurementDataManager()
{}
void MeasurementDataManager::initialize()
{
//读取配置文件
QString configDir = QCoreApplication::applicationDirPath();
m_settingsFile = configDir + "/config/measurementDataService_config.ini";
QFile file(m_settingsFile);
if(file.open(QIODevice::ReadWrite))
{
m_isVaildSettingsFile = true;
m_settings = new QSettings(m_settingsFile, QSettings::IniFormat);
m_serviceConfig.realtimeCfg.host = m_settings->value("RealtimeData/host").toString();
m_serviceConfig.realtimeCfg.port = m_settings->value("RealtimeData/port").toInt();
m_serviceConfig.realtimeCfg.httpPath = m_settings->value("RealtimeData/httpPath").toString();
m_serviceConfig.realtimeCfg.websocketPath = m_settings->value("RealtimeData/websocketPath").toString();
m_serviceConfig.historicalCfg.baseUrl = m_settings->value("HistoricalData/baseUrl").toUrl();
}
else
{
m_isVaildSettingsFile = false;
LOG_ERROR("ReadSettingInfo", "load measurementDataService_config.ini failed");
}
}
void MeasurementDataManager::buildSubscriptionRequest(const QString& action, const QStringList& measurements)
{
if(!m_isVaildSettingsFile)
return;
QUrl url;
url.setScheme("http");
url.setHost(m_serviceConfig.realtimeCfg.host);
url.setPort(m_serviceConfig.realtimeCfg.port);
url.setPath(m_serviceConfig.realtimeCfg.httpPath);
if(!url.isValid())
return ;
//创建网络请求
QNetworkRequest request(url);
request.setTransferTimeout(m_serviceConfig.realtimeCfg.timeout);
request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json");
//创建json格式请求数据
QJsonArray targetsArray;
for(const QString& meaurement : measurements)
targetsArray.append(meaurement);
QJsonObject measurementObj;
measurementObj.insert("interval", "20ms");
measurementObj.insert("targets", targetsArray);
QJsonArray measurementArray;
measurementArray.append(measurementObj);
QJsonObject requestObj;
QString actionValue = "";
if(action == "start" || action == "add")
actionValue = "start";
else if(action == "stop")
actionValue = "stop";
requestObj.insert("action", actionValue);
if(action != "start")
requestObj.insert("client_id", m_clientID);
requestObj.insert("measurements", measurementArray);
QJsonDocument requestJson(requestObj);
//发送请求
QNetworkReply* reply = m_networkManager->post(request, requestJson.toJson());
connect(reply, &QNetworkReply::finished, this, [this, action, reply](){
processSubscriptionResponse(action, reply);
});
}
void MeasurementDataManager::startSubscription(const QStringList& measurements)
{
if(!m_clientID.isEmpty())
return;
buildSubscriptionRequest("start", measurements);
}
void MeasurementDataManager::addSubscriptionData(const QStringList& measurements)
{
if(m_clientID.isEmpty())
return;
buildSubscriptionRequest("add", measurements);
}
void MeasurementDataManager::removeSubscriptionData(const QStringList& measurements)
{
if(m_clientID.isEmpty())
return;
buildSubscriptionRequest("stop", measurements);
}
void MeasurementDataManager::stopSubscription()
{
//暂时不需要m_webSocketClient析构的时候会断开链接xigous
}
void MeasurementDataManager::processSubscriptionResponse(const QString& action, QNetworkReply* reply)
{
if(reply->error() == QNetworkReply::NoError)
{
QByteArray data = reply->readAll();
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if(parseError.error == QJsonParseError::NoError && doc.isObject())
{
QJsonObject docObj = doc.object();
QString msg = docObj.value("msg").toString();
if(msg == "success")
{
QJsonValue payloadValue = docObj.value("payload");
if(payloadValue.isObject())
{
QJsonObject payloadObj = payloadValue.toObject();
if(action == "start")
{
m_clientID = payloadObj.value("client_id").toString();
getSubscriptionRealtimeData();
}
QJsonValue targetsValue = payloadObj.value("targets");
if(targetsValue.isArray())
{
QJsonArray targetsArray = targetsValue.toArray();
for(const QJsonValue& targetValue : std::as_const(targetsArray))
{
if(!targetValue.isObject())
continue;
QJsonObject targetObj = targetValue.toObject();
QString code = targetObj.value("code").toString();
if(action == "start" || action == "add")
{
if(code == "1001")//订阅成功
{
QString measurementID = targetObj.value("id").toString();
}
else //if(code == "1002") //订阅失败
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
}
else if(action == "stop" )
{
if(code == "1005")//取消订阅成功
{
QString measurementID = targetObj.value("id").toString();
}
else //if(code == "1005") //取消订阅失败
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
}
}
}
}
else
{
QString errorMsg = QString("Subscription measurementData reply Json 'payload' is not object: %1").arg(QString(data));
LOG_ERROR("Http-MeasurementDat", errorMsg);
}
}
else
{
QString errorMsg = QString("Subscription measurementData failed: %1").arg(msg);
LOG_ERROR("Http-MeasurementData", errorMsg);
}
}
else
{
QString errorMsg = QString("Subscription measurementData reply Json parse error: %1").arg(parseError.errorString());
LOG_ERROR("Http-MeasurementData", errorMsg);
}
}
else
{
QString errorMsg = QString("Subscription measurementData reply Network error: %1").arg(reply->errorString());
LOG_ERROR("Http-MeasurementData", errorMsg);
}
reply->deleteLater();
}
QUrl MeasurementDataManager::buildHistoricalQueryUrl(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type)
{
QUrl url = m_serviceConfig.historicalCfg.baseUrl;
QUrlQuery query;
query.addQueryItem("mtag", dataID);
query.addQueryItem("type", type);
query.addQueryItem("step", step);
query.addQueryItem("begin", QString::number(startTime.toMSecsSinceEpoch()));
query.addQueryItem("end", QString::number(endTime.toMSecsSinceEpoch()));
url.setQuery(query);
return url;
}
void MeasurementDataManager::processHistoricalResponse(const QString& dataID, const QByteArray& data)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if(parseError.error != QJsonParseError::NoError)
{
QString errorMsg = QString("Historical Events JSON parse error: %1 at offset %2")
.arg(parseError.errorString())
.arg(parseError.offset);
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
return;
}
if(!doc.isObject())
{
QString errorMsg = "Historical Events JSON document is not an object";
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
return ;
}
QJsonObject obj = doc.object();
int code = obj.value("code").toInt();
if(code != 0)
{
QString msg = obj.value("msg").toString();
LOG_ERROR("Http-MeasurementData-Historical", msg);
return ;
}
QJsonValue dataValue = obj.value("data");
if(dataValue.isObject())
{
QJsonObject dataObj = dataValue.toObject();
QJsonValue values = dataObj.value(dataID);
if(values.isArray())
{
QJsonArray valueArray = values.toArray();
int size = valueArray.size();
for(int i = 0; i < size; ++i)
{
if(!valueArray.at(i).isObject())
continue;
QJsonObject valueObj = valueArray.at(i).toObject();
qint64 timeStamp = valueObj.value("time").toString().toLongLong();
double value = dataObj.value("value").toDouble();
}
}
}
}
void MeasurementDataManager::getSubscriptionRealtimeData()
{
if(m_clientID.isEmpty())
return;
QString strUrl = QString("ws:://%1:%2/%3/%4").arg(m_serviceConfig.realtimeCfg.host
, QString::number(m_serviceConfig.realtimeCfg.port)
, m_serviceConfig.realtimeCfg.websocketPath
, m_clientID);
QUrl url(strUrl);
if(!m_webSocketClient->connectToServer(url))
{
QString errorMsg = QString("Connect to WebSocketServer failed");
LOG_ERROR("WebSocket-MeasurementData", errorMsg);
}
}
MeasurementDataPoint MeasurementDataManager::getLatestRealtimeData(const QString& dataID)
{
if(!m_dataCache.contains(dataID))
return MeasurementDataPoint();
QVector<MeasurementDataPoint> result = m_dataCache.value(dataID)->getLatestData(1);
if(result.size() > 0)
return result.at(0);
else
return MeasurementDataPoint();
}
void MeasurementDataManager::queryHistoricalEvents(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type)
{
QUrl url = buildHistoricalQueryUrl(dataID, startTime, endTime, step, type);
if(!url.isValid())
return;
//创建网络请求
QNetworkRequest request(url);
request.setTransferTimeout(m_serviceConfig.historicalCfg.timeout);
//发送请求
QNetworkReply* reply = m_networkManager->get(request);
connect(reply, &QNetworkReply::finished, this, [this, dataID, reply](){
onHistoricalRequestFinished(dataID, reply);
});
}
void MeasurementDataManager::onReceiveRealtimeData(const QString& dataMsg)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(dataMsg.toUtf8(), &parseError);
if(parseError.error == QJsonParseError::NoError && doc.isObject())
{
QJsonObject docObj = doc.object();
QString msg = docObj.value("msg").toString();
if(msg == "success")
{
QJsonValue payloadValue = docObj.value("payload");
if(payloadValue.isObject())
{
QJsonObject payloadObj = payloadValue.toObject();
QJsonValue targetsValue = payloadObj.value("targets");
if(targetsValue.isArray())
{
QJsonArray targetsArray = targetsValue.toArray();
for(const QJsonValue& targetValue : std::as_const(targetsArray))
{
if(!targetValue.isObject())
continue;
QJsonObject targetObj = targetValue.toObject();
QString dataID = targetObj.value("id").toString();
QSharedPointer<MeasurementData> measurementData;
if(m_dataCache.contains(dataID))
measurementData = m_dataCache.value(dataID);
QJsonValue datasValue = targetObj.value("datas");
if(datasValue.isArray())
{
QJsonArray datasArray = datasValue.toArray();
QVector<MeasurementDataPoint> dataPoints;
dataPoints.reserve(datasArray.size());
for(const QJsonValue& dataValue : std::as_const(datasArray))
{
if(!dataValue.isObject())
continue;
QJsonObject dataObj = dataValue.toObject();
qint64 timestamp = dataObj.value("time").toString().toLongLong();
double value = dataObj.value("value").toDouble();
dataPoints.append(MeasurementDataPoint(timestamp, value));
}
//插入数据
measurementData->insertData(dataPoints);
}
}
}
}
else
{
QString errorMsg = QString("Realtime measurementData reply Json 'payload' is not object: %1").arg(dataMsg);
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
}
}
else
{
QString errorMsg = QString("Realtime measurementData get error: %1").arg(parseError.errorString());
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
}
}
else
{
QString errorMsg = QString("Realtime measurementData Json parse error: %1").arg(parseError.errorString());
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
}
}
void MeasurementDataManager::onHistoricalRequestFinished(const QString& dataID, QNetworkReply* reply)
{
if(reply->error() == QNetworkReply::NoError)
{
QByteArray data = reply->readAll();
processHistoricalResponse(dataID, data);
LOG_INFO("Http-MeasurementData-Historical", QString(data));
}
else
{
QString errorMsg = QString("Request HistoricalData error: %1").arg(/*reply->request().url().toDisplayString(),*/ reply->errorString());
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
}
reply->deleteLater();
}