395 lines
15 KiB
C++
395 lines
15 KiB
C++
#include "measurementDataManager.h"
|
|
#include "logger.h"
|
|
#include "measurementDataUtils.h"
|
|
#include <QNetworkAccessManager>
|
|
#include <QNetworkRequest>
|
|
#include <QUrlQuery>
|
|
#include <QNetworkReply>
|
|
#include <QWebSocket>
|
|
#include <QTimer>
|
|
#include <QFile>
|
|
#include <QSettings>
|
|
#include <QJsonObject>
|
|
#include <QJsonDocument>
|
|
#include <QJsonArray>
|
|
#include <QJsonParseError>
|
|
#include <QCoreApplication>
|
|
|
|
MeasurementDataManager* MeasurementDataManager::instance()
|
|
{
|
|
static MeasurementDataManager instance;
|
|
return &instance;
|
|
}
|
|
|
|
MeasurementDataManager::MeasurementDataManager()
|
|
: m_clientID("")
|
|
, m_networkManager(new QNetworkAccessManager(this))
|
|
, m_webSocketClient(new WebSocketClient(this))
|
|
{
|
|
initialize();
|
|
connect(m_webSocketClient, &WebSocketClient::dataReceived, this, &MeasurementDataManager::onReceiveRealtimeData);
|
|
}
|
|
|
|
MeasurementDataManager::~MeasurementDataManager()
|
|
{}
|
|
|
|
void MeasurementDataManager::initialize()
|
|
{
|
|
//读取配置文件
|
|
QString configDir = QCoreApplication::applicationDirPath();
|
|
m_settingsFile = configDir + "/config/measurementDataService_config.ini";
|
|
QFile file(m_settingsFile);
|
|
if(file.open(QIODevice::ReadWrite))
|
|
{
|
|
m_isVaildSettingsFile = true;
|
|
m_settings = new QSettings(m_settingsFile, QSettings::IniFormat);
|
|
|
|
m_serviceConfig.realtimeCfg.host = m_settings->value("RealtimeData/host").toString();
|
|
m_serviceConfig.realtimeCfg.port = m_settings->value("RealtimeData/port").toInt();
|
|
m_serviceConfig.realtimeCfg.httpPath = m_settings->value("RealtimeData/httpPath").toString();
|
|
m_serviceConfig.realtimeCfg.websocketPath = m_settings->value("RealtimeData/websocketPath").toString();
|
|
|
|
m_serviceConfig.historicalCfg.baseUrl = m_settings->value("HistoricalData/baseUrl").toUrl();
|
|
}
|
|
else
|
|
{
|
|
m_isVaildSettingsFile = false;
|
|
LOG_ERROR("ReadSettingInfo", "load measurementDataService_config.ini failed");
|
|
}
|
|
}
|
|
|
|
void MeasurementDataManager::buildSubscriptionRequest(const QString& action, const QStringList& measurements)
|
|
{
|
|
if(!m_isVaildSettingsFile)
|
|
return;
|
|
|
|
QUrl url;
|
|
url.setScheme("http");
|
|
url.setHost(m_serviceConfig.realtimeCfg.host);
|
|
url.setPort(m_serviceConfig.realtimeCfg.port);
|
|
url.setPath(m_serviceConfig.realtimeCfg.httpPath);
|
|
if(!url.isValid())
|
|
return ;
|
|
|
|
//创建网络请求
|
|
QNetworkRequest request(url);
|
|
request.setTransferTimeout(m_serviceConfig.realtimeCfg.timeout);
|
|
request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json");
|
|
//创建json格式请求数据
|
|
QJsonArray targetsArray;
|
|
for(const QString& meaurement : measurements)
|
|
targetsArray.append(meaurement);
|
|
QJsonObject measurementObj;
|
|
measurementObj.insert("interval", "20ms");
|
|
measurementObj.insert("targets", targetsArray);
|
|
QJsonArray measurementArray;
|
|
measurementArray.append(measurementObj);
|
|
QJsonObject requestObj;
|
|
QString actionValue = "";
|
|
if(action == "start" || action == "add")
|
|
actionValue = "start";
|
|
else if(action == "stop")
|
|
actionValue = "stop";
|
|
requestObj.insert("action", actionValue);
|
|
if(action != "start")
|
|
requestObj.insert("client_id", m_clientID);
|
|
requestObj.insert("measurements", measurementArray);
|
|
QJsonDocument requestJson(requestObj);
|
|
//发送请求
|
|
QNetworkReply* reply = m_networkManager->post(request, requestJson.toJson());
|
|
connect(reply, &QNetworkReply::finished, this, [this, action, reply](){
|
|
processSubscriptionResponse(action, reply);
|
|
});
|
|
}
|
|
|
|
void MeasurementDataManager::startSubscription(const QStringList& measurements)
|
|
{
|
|
if(!m_clientID.isEmpty())
|
|
return;
|
|
|
|
buildSubscriptionRequest("start", measurements);
|
|
}
|
|
|
|
void MeasurementDataManager::addSubscriptionData(const QStringList& measurements)
|
|
{
|
|
if(m_clientID.isEmpty())
|
|
return;
|
|
|
|
buildSubscriptionRequest("add", measurements);
|
|
}
|
|
|
|
void MeasurementDataManager::removeSubscriptionData(const QStringList& measurements)
|
|
{
|
|
if(m_clientID.isEmpty())
|
|
return;
|
|
|
|
buildSubscriptionRequest("stop", measurements);
|
|
}
|
|
|
|
void MeasurementDataManager::processSubscriptionResponse(const QString& action, QNetworkReply* reply)
|
|
{
|
|
if(reply->error() == QNetworkReply::NoError)
|
|
{
|
|
QByteArray data = reply->readAll();
|
|
QJsonParseError parseError;
|
|
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
|
|
|
|
if(parseError.error == QJsonParseError::NoError && doc.isObject())
|
|
{
|
|
QJsonObject docObj = doc.object();
|
|
QString msg = docObj.value("msg").toString();
|
|
if(msg == "success")
|
|
{
|
|
QJsonValue payloadValue = docObj.value("payload");
|
|
if(payloadValue.isObject())
|
|
{
|
|
QJsonObject payloadObj = payloadValue.toObject();
|
|
if(action == "start")
|
|
{
|
|
m_clientID = payloadObj.value("client_id").toString();
|
|
getSubscriptionRealtimeData();
|
|
}
|
|
QJsonValue targetsValue = payloadObj.value("targets");
|
|
if(targetsValue.isArray())
|
|
{
|
|
QJsonArray targetsArray = targetsValue.toArray();
|
|
for(const QJsonValue& targetValue : std::as_const(targetsArray))
|
|
{
|
|
if(!targetValue.isObject())
|
|
continue;
|
|
|
|
QJsonObject targetObj = targetValue.toObject();
|
|
QString code = targetObj.value("code").toString();
|
|
if(action == "start" || action == "add")
|
|
{
|
|
if(code == "1001")//订阅成功
|
|
{
|
|
QString measurementID = targetObj.value("id").toString();
|
|
}
|
|
else //if(code == "1002") //订阅失败
|
|
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
|
|
}
|
|
else if(action == "stop" )
|
|
{
|
|
if(code == "1005")//取消订阅成功
|
|
{
|
|
QString measurementID = targetObj.value("id").toString();
|
|
}
|
|
else //if(code == "1005") //取消订阅失败
|
|
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
QString errorMsg = QString("Subscription measurementData reply Json 'payload' is not object: %1").arg(QString(data));
|
|
LOG_ERROR("Http-MeasurementDat", errorMsg);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
QString errorMsg = QString("Subscription measurementData failed: %1").arg(msg);
|
|
LOG_ERROR("Http-MeasurementData", errorMsg);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
QString errorMsg = QString("Subscription measurementData reply Json parse error: %1").arg(parseError.errorString());
|
|
LOG_ERROR("Http-MeasurementData", errorMsg);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
QString errorMsg = QString("Subscription measurementData reply Network error: %1").arg(reply->errorString());
|
|
LOG_ERROR("Http-MeasurementData", errorMsg);
|
|
}
|
|
|
|
reply->deleteLater();
|
|
}
|
|
|
|
QUrl MeasurementDataManager::buildHistoricalQueryUrl(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type)
|
|
{
|
|
QUrl url = m_serviceConfig.historicalCfg.baseUrl;
|
|
QUrlQuery query;
|
|
query.addQueryItem("mtag", dataID);
|
|
query.addQueryItem("type", type);
|
|
query.addQueryItem("step", step);
|
|
query.addQueryItem("begin", QString::number(startTime.toMSecsSinceEpoch()));
|
|
query.addQueryItem("end", QString::number(endTime.toMSecsSinceEpoch()));
|
|
url.setQuery(query);
|
|
return url;
|
|
}
|
|
|
|
void MeasurementDataManager::processHistoricalResponse(const QString& dataID, const QByteArray& data)
|
|
{
|
|
QJsonParseError parseError;
|
|
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
|
|
|
|
if(parseError.error != QJsonParseError::NoError)
|
|
{
|
|
QString errorMsg = QString("Historical Events JSON parse error: %1 at offset %2")
|
|
.arg(parseError.errorString())
|
|
.arg(parseError.offset);
|
|
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
|
|
return;
|
|
}
|
|
|
|
if(!doc.isObject())
|
|
{
|
|
QString errorMsg = "Historical Events JSON document is not an object";
|
|
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
|
|
return ;
|
|
}
|
|
|
|
QJsonObject obj = doc.object();
|
|
int code = obj.value("code").toInt();
|
|
if(code != 0)
|
|
{
|
|
QString msg = obj.value("msg").toString();
|
|
LOG_ERROR("Http-MeasurementData-Historical", msg);
|
|
return ;
|
|
}
|
|
|
|
QJsonValue dataValue = obj.value("data");
|
|
if(dataValue.isObject())
|
|
{
|
|
QJsonObject dataObj = dataValue.toObject();
|
|
QJsonValue values = dataObj.value(dataID);
|
|
if(values.isArray())
|
|
{
|
|
QJsonArray valueArray = values.toArray();
|
|
int size = valueArray.size();
|
|
for(int i = 0; i < size; ++i)
|
|
{
|
|
if(!valueArray.at(i).isObject())
|
|
continue;
|
|
|
|
QJsonObject valueObj = valueArray.at(i).toObject();
|
|
qint64 timeStamp = valueObj.value("time").toString().toLongLong();
|
|
double value = dataObj.value("value").toDouble();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void MeasurementDataManager::getSubscriptionRealtimeData()
|
|
{
|
|
if(m_clientID.isEmpty())
|
|
return;
|
|
|
|
QString strUrl = QString("ws:://%1:%2/%3/%4").arg(m_serviceConfig.realtimeCfg.host
|
|
, QString::number(m_serviceConfig.realtimeCfg.port)
|
|
, m_serviceConfig.realtimeCfg.websocketPath
|
|
, m_clientID);
|
|
QUrl url(strUrl);
|
|
if(!m_webSocketClient->connectToServer(url))
|
|
{
|
|
QString errorMsg = QString("Connect to WebSocketServer failed");
|
|
LOG_ERROR("WebSocket", errorMsg);
|
|
}
|
|
}
|
|
|
|
void MeasurementDataManager::queryHistoricalEvents(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type)
|
|
{
|
|
QUrl url = buildHistoricalQueryUrl(dataID, startTime, endTime, step, type);
|
|
if(!url.isValid())
|
|
return;
|
|
|
|
//创建网络请求
|
|
QNetworkRequest request(url);
|
|
request.setTransferTimeout(m_serviceConfig.historicalCfg.timeout);
|
|
//发送请求
|
|
QNetworkReply* reply = m_networkManager->get(request);
|
|
connect(reply, &QNetworkReply::finished, this, [this, dataID, reply](){
|
|
onHistoricalRequestFinished(dataID, reply);
|
|
});
|
|
}
|
|
|
|
void MeasurementDataManager::onReceiveRealtimeData(const QString& dataMsg)
|
|
{
|
|
QJsonParseError parseError;
|
|
QJsonDocument doc = QJsonDocument::fromJson(dataMsg.toUtf8(), &parseError);
|
|
if(parseError.error == QJsonParseError::NoError && doc.isObject())
|
|
{
|
|
QJsonObject docObj = doc.object();
|
|
QString msg = docObj.value("msg").toString();
|
|
if(msg == "success")
|
|
{
|
|
QJsonValue payloadValue = docObj.value("payload");
|
|
if(payloadValue.isObject())
|
|
{
|
|
QJsonObject payloadObj = payloadValue.toObject();
|
|
QJsonValue targetsValue = payloadObj.value("targets");
|
|
if(targetsValue.isArray())
|
|
{
|
|
QJsonArray targetsArray = targetsValue.toArray();
|
|
for(const QJsonValue& targetValue : std::as_const(targetsArray))
|
|
{
|
|
if(!targetValue.isObject())
|
|
continue;
|
|
|
|
QJsonObject targetObj = targetValue.toObject();
|
|
QString dataID = targetObj.value("id").toString();
|
|
QSharedPointer<MeasurementData> measurementData;
|
|
if(m_dataCache.contains(dataID))
|
|
measurementData = m_dataCache.value(dataID);
|
|
|
|
QJsonValue datasValue = targetObj.value("datas");
|
|
if(datasValue.isArray())
|
|
{
|
|
QJsonArray datasArray = datasValue.toArray();
|
|
QVector<MeasurementDataPoint> dataPoints;
|
|
dataPoints.reserve(datasArray.size());
|
|
for(const QJsonValue& dataValue : std::as_const(datasArray))
|
|
{
|
|
if(!dataValue.isObject())
|
|
continue;
|
|
|
|
QJsonObject dataObj = dataValue.toObject();
|
|
qint64 timestamp = dataObj.value("time").toString().toLongLong();
|
|
double value = dataObj.value("value").toDouble();
|
|
dataPoints.append(MeasurementDataPoint(timestamp, value));
|
|
}
|
|
//插入数据
|
|
measurementData->insertData(dataPoints);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
QString errorMsg = QString("Realtime measurementData reply Json 'payload' is not object: %1").arg(dataMsg);
|
|
LOG_ERROR("WebSockt-MeasurementDat", errorMsg);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
QString errorMsg = QString("Realtime measurementData get error: %1").arg(parseError.errorString());
|
|
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
QString errorMsg = QString("Realtime measurementData Json parse error: %1").arg(parseError.errorString());
|
|
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
|
|
}
|
|
}
|
|
|
|
void MeasurementDataManager::onHistoricalRequestFinished(const QString& dataID, QNetworkReply* reply)
|
|
{
|
|
if(reply->error() == QNetworkReply::NoError)
|
|
{
|
|
QByteArray data = reply->readAll();
|
|
processHistoricalResponse(dataID, data);
|
|
LOG_INFO("Http-MeasurementData-Historical", QString(data));
|
|
}
|
|
else
|
|
{
|
|
QString errorMsg = QString("Request HistoricalData error: %1").arg(/*reply->request().url().toDisplayString(),*/ reply->errorString());
|
|
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
|
|
}
|
|
|
|
reply->deleteLater();
|
|
}
|