PowerMaster/source/measurementDataManager.cpp

429 lines
16 KiB
C++

#include "measurementDataManager.h"
#include "logger.h"
#include <QNetworkAccessManager>
#include <QNetworkRequest>
#include <QUrlQuery>
#include <QNetworkReply>
#include <QWebSocket>
#include <QTimer>
#include <QFile>
#include <QSettings>
#include <QJsonObject>
#include <QJsonDocument>
#include <QJsonArray>
#include <QJsonParseError>
#include <QCoreApplication>
MeasurementDataManager* MeasurementDataManager::instance()
{
static MeasurementDataManager instance;
return &instance;
}
MeasurementDataManager::MeasurementDataManager()
: m_clientID("")
, m_networkManager(new QNetworkAccessManager(this))
, m_webSocketClient(new WebSocketClient(this))
{
initialize();
connect(m_webSocketClient, &WebSocketClient::dataReceived, this, &MeasurementDataManager::onReceiveRealtimeData);
}
MeasurementDataManager::~MeasurementDataManager()
{}
void MeasurementDataManager::initialize()
{
//读取配置文件
QString configDir = QCoreApplication::applicationDirPath();
m_settingsFile = configDir + "/config/measurementDataService_config.ini";
QFile file(m_settingsFile);
if(file.open(QIODevice::ReadWrite))
{
m_isVaildSettingsFile = true;
m_settings = new QSettings(m_settingsFile, QSettings::IniFormat);
m_serviceConfig.realtimeCfg.host = m_settings->value("RealtimeData/host").toString();
m_serviceConfig.realtimeCfg.port = m_settings->value("RealtimeData/port").toInt();
m_serviceConfig.realtimeCfg.httpPath = m_settings->value("RealtimeData/httpPath").toString();
m_serviceConfig.realtimeCfg.websocketPath = m_settings->value("RealtimeData/websocketPath").toString();
m_serviceConfig.historicalCfg.baseUrl = m_settings->value("HistoricalData/baseUrl").toUrl();
}
else
{
m_isVaildSettingsFile = false;
LOG_ERROR("ReadSettingInfo", "load measurementDataService_config.ini failed");
}
}
void MeasurementDataManager::buildSubscriptionRequest(const QString& action, const QStringList& measurements)
{
if(!m_isVaildSettingsFile)
return;
QUrl url;
url.setScheme("http");
url.setHost(m_serviceConfig.realtimeCfg.host);
url.setPort(m_serviceConfig.realtimeCfg.port);
url.setPath(m_serviceConfig.realtimeCfg.httpPath);
if(!url.isValid())
return ;
//创建网络请求
QNetworkRequest request(url);
request.setTransferTimeout(m_serviceConfig.realtimeCfg.timeout);
request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json");
//创建json格式请求数据
QJsonArray targetsArray;
for(const QString& meaurement : measurements)
targetsArray.append(meaurement);
QJsonObject measurementObj;
measurementObj.insert("interval", "500ms");
measurementObj.insert("targets", targetsArray);
QJsonArray measurementArray;
measurementArray.append(measurementObj);
QJsonObject requestObj;
QString actionValue = "";
if(action == "start" || action == "add")
actionValue = "start";
else if(action == "stop")
actionValue = "stop";
requestObj.insert("action", actionValue);
if(action != "start")
requestObj.insert("client_id", m_clientID);
requestObj.insert("measurements", measurementArray);
QJsonDocument requestJson(requestObj);
//发送请求
QNetworkReply* reply = m_networkManager->post(request, requestJson.toJson());
connect(reply, &QNetworkReply::finished, this, [this, action, reply](){
processSubscriptionResponse(action, reply);
});
}
void MeasurementDataManager::startSubscription(const QStringList& measurements)
{
if(!m_clientID.isEmpty())
return;
buildSubscriptionRequest("start", measurements);
}
void MeasurementDataManager::addSubscriptionData(const QStringList& measurements)
{
if(m_clientID.isEmpty())
{
buildSubscriptionRequest("start", measurements);
return;
}
buildSubscriptionRequest("add", measurements);
}
void MeasurementDataManager::removeSubscriptionData(const QStringList& measurements)
{
if(m_clientID.isEmpty())
return;
buildSubscriptionRequest("stop", measurements);
}
void MeasurementDataManager::stopSubscription()
{
m_webSocketClient->disconnectFromServer();
m_clientID.clear();
}
void MeasurementDataManager::processSubscriptionResponse(const QString& action, QNetworkReply* reply)
{
if(reply->error() == QNetworkReply::NoError)
{
QByteArray data = reply->readAll();
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if(parseError.error == QJsonParseError::NoError && doc.isObject())
{
QJsonObject docObj = doc.object();
QString msg = docObj.value("msg").toString();
if(msg == "success")
{
QJsonValue payloadValue = docObj.value("payload");
if(payloadValue.isObject())
{
QJsonObject payloadObj = payloadValue.toObject();
if(action == "start")
{
m_clientID = payloadObj.value("client_id").toString();
getSubscriptionRealtimeData();
}
QJsonValue targetsValue = payloadObj.value("targets");
if(targetsValue.isArray())
{
QJsonArray targetsArray = targetsValue.toArray();
for(const QJsonValue& targetValue : std::as_const(targetsArray))
{
if(!targetValue.isObject())
continue;
QJsonObject targetObj = targetValue.toObject();
QString code = targetObj.value("code").toString();
if(action == "start" || action == "add")
{
if(code == "1001")//订阅成功
{
QString measurementID = targetObj.value("id").toString();
}
else //if(code == "1002") //订阅失败
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
}
else if(action == "stop" )
{
if(code == "1005")//取消订阅成功
{
QString measurementID = targetObj.value("id").toString();
}
else //if(code == "1005") //取消订阅失败
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
}
}
}
}
else
{
QString errorMsg = QString("Subscription measurementData reply Json 'payload' is not object: %1").arg(QString(data));
LOG_ERROR("Http-MeasurementDat", errorMsg);
}
}
else
{
QString errorMsg = QString("Subscription measurementData failed: %1").arg(msg);
LOG_ERROR("Http-MeasurementData", errorMsg);
}
}
else
{
QString errorMsg = QString("Subscription measurementData reply Json parse error: %1").arg(parseError.errorString());
LOG_ERROR("Http-MeasurementData", errorMsg);
}
}
else
{
QString errorMsg = QString("Subscription measurementData reply Network error: %1").arg(reply->errorString());
LOG_ERROR("Http-MeasurementData", errorMsg);
}
reply->deleteLater();
}
QUrl MeasurementDataManager::buildHistoricalQueryUrl(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type)
{
QUrl url = m_serviceConfig.historicalCfg.baseUrl;
QUrlQuery query;
query.addQueryItem("mtag", dataID);
query.addQueryItem("type", type);
query.addQueryItem("step", step);
query.addQueryItem("begin", QString::number(startTime.toMSecsSinceEpoch()));
query.addQueryItem("end", QString::number(endTime.toMSecsSinceEpoch()));
url.setQuery(query);
return url;
}
void MeasurementDataManager::processHistoricalResponse(const QString& dataID, const QByteArray& data)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if(parseError.error != QJsonParseError::NoError)
{
QString errorMsg = QString("Historical Events JSON parse error: %1 at offset %2")
.arg(parseError.errorString())
.arg(parseError.offset);
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
return;
}
if(!doc.isObject())
{
QString errorMsg = "Historical Events JSON document is not an object";
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
return ;
}
QJsonObject obj = doc.object();
int code = obj.value("code").toInt();
if(code != 0)
{
QString msg = obj.value("msg").toString();
LOG_ERROR("Http-MeasurementData-Historical", msg);
return ;
}
QJsonValue dataValue = obj.value("data");
if(dataValue.isObject())
{
QJsonObject dataObj = dataValue.toObject();
QJsonValue values = dataObj.value(dataID);
if(values.isArray())
{
QJsonArray valueArray = values.toArray();
int size = valueArray.size();
for(int i = 0; i < size; ++i)
{
if(!valueArray.at(i).isObject())
continue;
QJsonObject valueObj = valueArray.at(i).toObject();
qint64 timeStamp = valueObj.value("time").toString().toLongLong();
double value = dataObj.value("value").toDouble();
}
}
}
}
void MeasurementDataManager::getSubscriptionRealtimeData()
{
if(m_clientID.isEmpty())
return;
QString strUrl = QString("ws:://%1:%2/%3/%4").arg(m_serviceConfig.realtimeCfg.host
, QString::number(m_serviceConfig.realtimeCfg.port)
, m_serviceConfig.realtimeCfg.websocketPath
, m_clientID);
QUrl url(strUrl);
if(!m_webSocketClient->connectToServer(url))
{
QString errorMsg = QString("Connect to WebSocketServer failed");
LOG_ERROR("WebSocket-MeasurementData", errorMsg);
}
}
void MeasurementDataManager::requestData(const QString& dataID, QObject* requester)
{
//if()
}
void MeasurementDataManager::cancelRequest(const QString& dataID, QObject* requester)
{}
MeasurementDataPoint MeasurementDataManager::getLastestRealtimeData(const QString& dataID)
{
if(!m_dataCache.contains(dataID))
return MeasurementDataPoint();
QVector<MeasurementDataPoint> result = m_dataCache.value(dataID)->getLastestData(1);
if(result.size() > 0)
return result.at(0);
else
return MeasurementDataPoint();
}
void MeasurementDataManager::queryHistoricalEvents(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type)
{
QUrl url = buildHistoricalQueryUrl(dataID, startTime, endTime, step, type);
if(!url.isValid())
return;
//创建网络请求
QNetworkRequest request(url);
request.setTransferTimeout(m_serviceConfig.historicalCfg.timeout);
//发送请求
QNetworkReply* reply = m_networkManager->get(request);
connect(reply, &QNetworkReply::finished, this, [this, dataID, reply](){
onHistoricalRequestFinished(dataID, reply);
});
}
void MeasurementDataManager::onReceiveRealtimeData(const QString& dataMsg)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(dataMsg.toUtf8(), &parseError);
if(parseError.error == QJsonParseError::NoError && doc.isObject())
{
QJsonObject docObj = doc.object();
int code = docObj.value("code").toInt();
//QString msg = docObj.value("msg").toString();
if(code == 200)
{
QJsonValue payloadValue = docObj.value("payload");
if(payloadValue.isObject())
{
QJsonObject payloadObj = payloadValue.toObject();
QJsonValue targetsValue = payloadObj.value("targets");
if(targetsValue.isArray())
{
QJsonArray targetsArray = targetsValue.toArray();
for(const QJsonValue& targetValue : std::as_const(targetsArray))
{
if(!targetValue.isObject())
continue;
QJsonObject targetObj = targetValue.toObject();
QString dataID = targetObj.value("id").toString();
QSharedPointer<MeasurementData> measurementData;
if(m_dataCache.contains(dataID))
measurementData = m_dataCache.value(dataID);
else
{
measurementData = QSharedPointer<MeasurementData>::create(dataID);
m_dataCache.insert(dataID, measurementData);
}
QJsonValue datasValue = targetObj.value("datas");
if(datasValue.isArray())
{
QJsonArray datasArray = datasValue.toArray();
QVector<MeasurementDataPoint> dataPoints;
dataPoints.reserve(datasArray.size());
for(const QJsonValue& dataValue : std::as_const(datasArray))
{
if(!dataValue.isObject())
continue;
QJsonObject dataObj = dataValue.toObject();
qint64 timestamp = dataObj.value("time").toString().toLongLong();
double value = dataObj.value("value").toDouble();
dataPoints.append(MeasurementDataPoint(timestamp, value));
}
//插入数据
measurementData->insertData(dataPoints);
}
}
}
}
else
{
QString errorMsg = QString("Realtime measurementData reply Json 'payload' is not object: %1").arg(dataMsg);
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
}
}
else if(code == 2101) //服务端协程断开,此链接已无效
{
m_webSocketClient->disconnectFromServer();
m_clientID.clear();
}
}
else
{
QString errorMsg = QString("Realtime measurementData Json parse error: %1").arg(parseError.errorString());
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
}
}
void MeasurementDataManager::onHistoricalRequestFinished(const QString& dataID, QNetworkReply* reply)
{
if(reply->error() == QNetworkReply::NoError)
{
QByteArray data = reply->readAll();
processHistoricalResponse(dataID, data);
LOG_INFO("Http-MeasurementData-Historical", QString(data));
}
else
{
QString errorMsg = QString("Request HistoricalData error: %1").arg(/*reply->request().url().toDisplayString(),*/ reply->errorString());
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
}
reply->deleteLater();
}