PowerMaster/source/measurementDataManager.cpp

503 lines
18 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "measurementDataManager.h"
#include "logger.h"
#include <QNetworkAccessManager>
#include <QNetworkRequest>
#include <QUrlQuery>
#include <QNetworkReply>
#include <QWebSocket>
#include <QTimer>
#include <QFile>
#include <QSettings>
#include <QJsonObject>
#include <QJsonDocument>
#include <QJsonArray>
#include <QJsonParseError>
#include <QCoreApplication>
MeasurementDataManager* MeasurementDataManager::m_instance = nullptr;
MeasurementDataManager* MeasurementDataManager::instance()
{
// static MeasurementDataManager instance;
// return &instance;
//不采用静态局部变量的方式实现因为QTimer析构时需要依赖事件循环静态局部变量的释放实在main函数之后此时事件循环已退出(app.exec())
//所以增加destroyInstance()在main函数退出之前显示调用即可规避此问题
if(!m_instance)
m_instance = new MeasurementDataManager();
return m_instance;
}
void MeasurementDataManager::destroyInstance()
{
if(m_instance)
delete m_instance;
m_instance = nullptr;
}
MeasurementDataManager::MeasurementDataManager()
: m_clientID("")
, m_networkManager(new QNetworkAccessManager(this))
, m_webSocketClient(new WebSocketClient(this))
{
//static CGarbo garbo;
initialize();
connect(m_webSocketClient, &WebSocketClient::dataReceived, this, &MeasurementDataManager::onReceiveRealtimeData);
}
MeasurementDataManager::~MeasurementDataManager()
{
}
void MeasurementDataManager::initialize()
{
//读取配置文件
QString configDir = QCoreApplication::applicationDirPath();
m_settingsFile = configDir + "/config/measurementDataService_config.ini";
QFile file(m_settingsFile);
if(file.open(QIODevice::ReadWrite))
{
m_isVaildSettingsFile = true;
m_settings = new QSettings(m_settingsFile, QSettings::IniFormat);
m_serviceConfig.realtimeCfg.host = m_settings->value("RealtimeData/host").toString();
m_serviceConfig.realtimeCfg.port = m_settings->value("RealtimeData/port").toInt();
m_serviceConfig.realtimeCfg.httpPath = m_settings->value("RealtimeData/httpPath").toString();
m_serviceConfig.realtimeCfg.websocketPath = m_settings->value("RealtimeData/websocketPath").toString();
m_serviceConfig.historicalCfg.baseUrl = m_settings->value("HistoricalData/baseUrl").toUrl();
}
else
{
m_isVaildSettingsFile = false;
LOG_ERROR("ReadSettingInfo", "load measurementDataService_config.ini failed");
}
//开启缓存数据的定期清理
m_dataCleanupInterval = 10 * 1000;
m_dataCleanupTimer.setInterval(m_dataCleanupInterval);
m_dataCleanupTimer.start();
connect(&m_dataCleanupTimer, &QTimer::timeout, this, &MeasurementDataManager::cleanUpDataCache);
}
void MeasurementDataManager::buildSubscriptionRequest(const QString& action, const QStringList& measurements)
{
if(!m_isVaildSettingsFile)
return;
QUrl url;
url.setScheme("http");
url.setHost(m_serviceConfig.realtimeCfg.host);
url.setPort(m_serviceConfig.realtimeCfg.port);
url.setPath(m_serviceConfig.realtimeCfg.httpPath);
if(!url.isValid())
{
LOG_ERROR("Http-MeasurementData", QString("url:'%1' is not valid"));
return ;
}
//创建网络请求
QNetworkRequest request(url);
request.setTransferTimeout(m_serviceConfig.realtimeCfg.timeout);
request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json");
//创建json格式请求数据
QJsonArray targetsArray;
for(const QString& meaurement : measurements)
targetsArray.append(meaurement);
QJsonObject measurementObj;
measurementObj.insert("interval", "500ms");
measurementObj.insert("targets", targetsArray);
QJsonArray measurementArray;
measurementArray.append(measurementObj);
QJsonObject requestObj;
QString actionValue = "";
if(action == "start" || action == "add")
actionValue = "start";
else if(action == "stop")
actionValue = "stop";
requestObj.insert("action", actionValue);
if(action != "start")
requestObj.insert("client_id", m_clientID);
requestObj.insert("measurements", measurementArray);
QJsonDocument requestJson(requestObj);
//发送请求
QNetworkReply* reply = m_networkManager->post(request, requestJson.toJson());
connect(reply, &QNetworkReply::finished, this, [this, action, reply](){
processSubscriptionResponse(action, reply);
});
}
void MeasurementDataManager::startSubscription(const QStringList& measurements)
{
if(!m_clientID.isEmpty())
return;
buildSubscriptionRequest("start", measurements);
}
void MeasurementDataManager::addSubscriptionData(const QStringList& measurements)
{
if(m_clientID.isEmpty())
{
buildSubscriptionRequest("start", measurements);
return;
}
buildSubscriptionRequest("add", measurements);
}
void MeasurementDataManager::removeSubscriptionData(const QStringList& measurements)
{
if(m_clientID.isEmpty())
return;
buildSubscriptionRequest("stop", measurements);
}
void MeasurementDataManager::stopSubscription()
{
m_webSocketClient->disconnectFromServer();
m_clientID.clear();
}
void MeasurementDataManager::processSubscriptionResponse(const QString& action, QNetworkReply* reply)
{
if(reply->error() == QNetworkReply::NoError)
{
QByteArray data = reply->readAll();
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if(parseError.error == QJsonParseError::NoError && doc.isObject())
{
QJsonObject docObj = doc.object();
QString msg = docObj.value("msg").toString();
if(msg == "success")
{
QJsonValue payloadValue = docObj.value("payload");
if(payloadValue.isObject())
{
QJsonObject payloadObj = payloadValue.toObject();
if(action == "start")
{
m_clientID = payloadObj.value("client_id").toString();
getSubscriptionRealtimeData();
}
QJsonValue targetsValue = payloadObj.value("targets");
if(targetsValue.isArray())
{
QJsonArray targetsArray = targetsValue.toArray();
for(const QJsonValue& targetValue : std::as_const(targetsArray))
{
if(!targetValue.isObject())
continue;
QJsonObject targetObj = targetValue.toObject();
QString code = targetObj.value("code").toString();
if(action == "start" || action == "add")
{
if(code == "1001")//订阅成功
{
QString measurementID = targetObj.value("id").toString();
}
else //if(code == "1002") //订阅失败
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
}
else if(action == "stop" )
{
if(code == "1005")//取消订阅成功
{
QString measurementID = targetObj.value("id").toString();
}
else //if(code == "1005") //取消订阅失败
LOG_ERROR("Http-MeasurementData", targetObj.value("msg").toString());
}
}
}
}
else
{
QString errorMsg = QString("Subscription measurementData reply Json 'payload' is not object: %1").arg(QString(data));
LOG_ERROR("Http-MeasurementDat", errorMsg);
}
}
else
{
QString errorMsg = QString("Subscription measurementData failed: %1").arg(msg);
LOG_ERROR("Http-MeasurementData", errorMsg);
}
}
else
{
QString errorMsg = QString("Subscription measurementData reply Json parse error: %1").arg(parseError.errorString());
LOG_ERROR("Http-MeasurementData", errorMsg);
}
}
else
{
QString errorMsg = QString("Subscription measurementData reply Network error: %1").arg(reply->errorString());
LOG_ERROR("Http-MeasurementData", errorMsg);
}
reply->deleteLater();
}
QUrl MeasurementDataManager::buildHistoricalQueryUrl(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type)
{
QUrl url = m_serviceConfig.historicalCfg.baseUrl;
QUrlQuery query;
query.addQueryItem("mtag", dataID);
query.addQueryItem("type", type);
query.addQueryItem("step", step);
query.addQueryItem("begin", QString::number(startTime.toMSecsSinceEpoch()));
query.addQueryItem("end", QString::number(endTime.toMSecsSinceEpoch()));
url.setQuery(query);
return url;
}
void MeasurementDataManager::processHistoricalResponse(const QString& dataID, const QByteArray& data)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(data, &parseError);
if(parseError.error != QJsonParseError::NoError)
{
QString errorMsg = QString("Historical Events JSON parse error: %1 at offset %2")
.arg(parseError.errorString())
.arg(parseError.offset);
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
return;
}
if(!doc.isObject())
{
QString errorMsg = "Historical Events JSON document is not an object";
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
return ;
}
QJsonObject obj = doc.object();
int code = obj.value("code").toInt();
if(code != 0)
{
QString msg = obj.value("msg").toString();
LOG_ERROR("Http-MeasurementData-Historical", msg);
return ;
}
QJsonValue dataValue = obj.value("data");
if(dataValue.isObject())
{
QJsonObject dataObj = dataValue.toObject();
QJsonValue values = dataObj.value(dataID);
if(values.isArray())
{
QJsonArray valueArray = values.toArray();
int size = valueArray.size();
for(int i = 0; i < size; ++i)
{
if(!valueArray.at(i).isObject())
continue;
QJsonObject valueObj = valueArray.at(i).toObject();
qint64 timeStamp = valueObj.value("time").toString().toLongLong();
double value = dataObj.value("value").toDouble();
}
}
}
}
void MeasurementDataManager::getSubscriptionRealtimeData()
{
if(m_clientID.isEmpty())
return;
QString strUrl = QString("ws://%1:%2/%3/%4").arg(m_serviceConfig.realtimeCfg.host
, QString::number(m_serviceConfig.realtimeCfg.port)
, m_serviceConfig.realtimeCfg.websocketPath
, m_clientID);
QUrl url(strUrl);
m_webSocketClient->connectToServer(url);
// if(!m_webSocketClient->connectToServer(url))
// {
// QString errorMsg = QString("Connect to WebSocketServer failed");
// LOG_ERROR("WebSocket-MeasurementData", errorMsg);
// }
}
void MeasurementDataManager::requestData(const QStringList& measurements, QObject* requester)
{
QStringList newRequestDatas;
for(const QString& dataID : measurements)
{
auto it = m_dataRequesetRecord.find(dataID);
if(it != m_dataRequesetRecord.end()/* && !it.value().contains(requester)*/)
it.value().insert(requester);
else
{
QSet<QObject*> requesters;
requesters.insert(requester);
m_dataRequesetRecord.insert(dataID, requesters);
newRequestDatas.push_back(dataID);
}
}
if(!newRequestDatas.isEmpty())
addSubscriptionData(newRequestDatas);
}
void MeasurementDataManager::cancelRequest(const QStringList& measurements, QObject* requester)
{
QStringList removeRequestDatas;
for(const QString& dataID : measurements)
{
auto it = m_dataRequesetRecord.find(dataID);
if(it != m_dataRequesetRecord.end())
{
it.value().remove(requester);
if(it.value().isEmpty())
removeRequestDatas.push_back(dataID);
}
}
if(!removeRequestDatas.isEmpty())
{
removeSubscriptionData(removeRequestDatas);
for(const QString& dataID : std::as_const(removeRequestDatas))
{
m_dataRequesetRecord.remove(dataID);
m_dataCache.remove(dataID);
}
}
}
MeasurementDataPoint MeasurementDataManager::getLastestRealtimeData(const QString& dataID)
{
if(!m_dataCache.contains(dataID))
return MeasurementDataPoint();
QVector<MeasurementDataPoint> result = m_dataCache.value(dataID)->getLastestData(1);
if(result.size() > 0)
return result.at(0);
else
return MeasurementDataPoint();
}
void MeasurementDataManager::queryHistoricalEvents(const QString& dataID, const QDateTime& startTime, const QDateTime& endTime, const QString& step, const QString& type)
{
QUrl url = buildHistoricalQueryUrl(dataID, startTime, endTime, step, type);
if(!url.isValid())
return;
//创建网络请求
QNetworkRequest request(url);
request.setTransferTimeout(m_serviceConfig.historicalCfg.timeout);
//发送请求
QNetworkReply* reply = m_networkManager->get(request);
connect(reply, &QNetworkReply::finished, this, [this, dataID, reply](){
onHistoricalRequestFinished(dataID, reply);
});
}
void MeasurementDataManager::onReceiveRealtimeData(const QString& dataMsg)
{
QJsonParseError parseError;
QJsonDocument doc = QJsonDocument::fromJson(dataMsg.toUtf8(), &parseError);
if(parseError.error == QJsonParseError::NoError && doc.isObject())
{
QJsonObject docObj = doc.object();
int code = docObj.value("code").toInt();
//QString msg = docObj.value("msg").toString();
if(code == 200)
{
QJsonValue payloadValue = docObj.value("payload");
if(payloadValue.isObject())
{
QJsonObject payloadObj = payloadValue.toObject();
QJsonValue targetsValue = payloadObj.value("targets");
if(targetsValue.isArray())
{
QJsonArray targetsArray = targetsValue.toArray();
for(const QJsonValue& targetValue : std::as_const(targetsArray))
{
if(!targetValue.isObject())
continue;
QJsonObject targetObj = targetValue.toObject();
QString dataID = targetObj.value("id").toString();
QSharedPointer<MeasurementData> measurementData;
if(m_dataCache.contains(dataID))
measurementData = m_dataCache.value(dataID);
else
{
measurementData = QSharedPointer<MeasurementData>::create(dataID);
m_dataCache.insert(dataID, measurementData);
}
QJsonValue datasValue = targetObj.value("datas");
if(datasValue.isArray())
{
QJsonArray datasArray = datasValue.toArray();
QVector<MeasurementDataPoint> dataPoints;
dataPoints.reserve(datasArray.size());
for(const QJsonValue& dataValue : std::as_const(datasArray))
{
if(!dataValue.isObject())
continue;
QJsonObject dataObj = dataValue.toObject();
qint64 timestamp = dataObj.value("time").toString().toLongLong();
double value = dataObj.value("value").toDouble();
dataPoints.append(MeasurementDataPoint(timestamp, value));
}
//插入数据
measurementData->insertData(dataPoints);
}
}
}
}
else
{
QString errorMsg = QString("Realtime measurementData reply Json 'payload' is not object: %1").arg(dataMsg);
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
}
}
else if(code == 2101) //服务端协程断开,此链接已无效
{
m_webSocketClient->disconnectFromServer();
m_clientID.clear();
}
}
else
{
QString errorMsg = QString("Realtime measurementData Json parse error: %1").arg(parseError.errorString());
LOG_ERROR("WebSockt-MeasurementData", errorMsg);
}
}
void MeasurementDataManager::onHistoricalRequestFinished(const QString& dataID, QNetworkReply* reply)
{
if(reply->error() == QNetworkReply::NoError)
{
QByteArray data = reply->readAll();
processHistoricalResponse(dataID, data);
LOG_INFO("Http-MeasurementData-Historical", QString(data));
}
else
{
QString errorMsg = QString("Request HistoricalData error: %1").arg(/*reply->request().url().toDisplayString(),*/ reply->errorString());
LOG_ERROR("Http-MeasurementData-Historical", errorMsg);
}
reply->deleteLater();
}
void MeasurementDataManager::cleanUpDataCache()
{
qint64 cleanupTimestame = QDateTime::currentMSecsSinceEpoch() - (m_dataCleanupInterval * 1000);
for(auto measurementData : std::as_const(m_dataCache))
{
measurementData->cleanupOldData(cleanupTimestame);
}
}