202 lines
7.3 KiB
C++
202 lines
7.3 KiB
C++
#include "instance/dataAccessor.h"
|
||
#include "communicationManager.h"
|
||
#include "uiCommunicationBus.h"
|
||
#include "configManager.h"
|
||
#include "diagramCavas.h"
|
||
#include "diagramConnectSetting.h"
|
||
#include <QVariant>
|
||
#include <QJsonObject>
|
||
#include <QJsonArray>
|
||
#include "global.h"
|
||
|
||
DataAccessor::DataAccessor(QObject* parent)
|
||
: QObject(parent)
|
||
,_parentCavas(nullptr)
|
||
{
|
||
|
||
}
|
||
|
||
DataAccessor::~DataAccessor()
|
||
{
|
||
|
||
}
|
||
|
||
void DataAccessor::onReceiveHttpData(const QString& sType,const QVariant& data)
|
||
{
|
||
if(sType == "subscriptions"){
|
||
QMap<QString,QList<QPair<QString,QString>>>& tempRequest = UiCommunicationBus::instance()->getTempRequestMap();
|
||
QJsonObject dataObj = data.toJsonObject();
|
||
QString sClientId = dataObj.value("client_id").toString();
|
||
|
||
QMap<QString,QString> lstTarget;
|
||
QJsonArray measureArr = dataObj.value("measurements").toArray();
|
||
for(const QJsonValue& value : measureArr){
|
||
QJsonObject obj = value.toObject();
|
||
QString sId = obj["id"].toString();
|
||
QString sCode = obj["code"].toString();
|
||
|
||
if(!lstTarget.contains(sId)){
|
||
lstTarget.insert(sId,sCode);
|
||
}
|
||
}
|
||
|
||
QString sAction;
|
||
auto mapSesstion = UiCommunicationBus::instance()->getSesstionMap();
|
||
bool bClientExist = false;
|
||
for(auto& session:mapSesstion){
|
||
if(session.first == sClientId){ //在会话列表中已存在,是stop(暂只使用stop和start)
|
||
bClientExist = true;
|
||
sAction = "stop";
|
||
break;
|
||
}
|
||
}
|
||
|
||
if(sAction.isEmpty()){ //不是stop的情况
|
||
QStringList lstKeys = lstTarget.keys();
|
||
for(auto it = tempRequest.begin(); it != tempRequest.end(); ++it){
|
||
const QString& page = it.key();
|
||
QList<QPair<QString, QString>>& tempList = it.value();
|
||
|
||
// 从 tempList 提取所有 first 元素
|
||
QStringList firstElements;
|
||
firstElements.reserve(tempList.size());
|
||
for (const auto& pair : tempList)
|
||
{
|
||
firstElements.append(pair.first);
|
||
}
|
||
|
||
// 对两个列表进行排序(因为 moveMatchingRequests 内部会排序比较)
|
||
QStringList sortedFirstElements = firstElements;
|
||
QStringList sortedLstKeys = lstKeys;
|
||
|
||
std::sort(sortedFirstElements.begin(), sortedFirstElements.end());
|
||
std::sort(sortedLstKeys.begin(), sortedLstKeys.end());
|
||
|
||
// 比较两个列表是否相同
|
||
if (sortedFirstElements == sortedLstKeys)
|
||
{
|
||
// 这里需要一个 id 参数,你可以从 lstRequest 中获取或使用其他方式
|
||
QString id; // 你需要确定 id 从哪里来
|
||
|
||
// 调用 moveMatchingRequests
|
||
for(auto& pair:tempList){
|
||
pair.second = "connecting";
|
||
}
|
||
UiCommunicationBus::instance()->insertSesstionMap(id, lstTarget);
|
||
sAction = "start";
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
if(!lstTarget.isEmpty()){
|
||
if(sAction == "start"){
|
||
auto config = ConfigManager::instance()->getWebSocketConfig();
|
||
QString sPre = removeAfterStreamBySplit(config.endpoint); //手动移除
|
||
config.endpoint = sPre + "/" + sClientId;
|
||
CommunicationManager::instance()->updateWebSocketConfig(config,sClientId);
|
||
CommunicationManager::instance()->connectWebSocket(sClientId);
|
||
}
|
||
else if(sAction == "stop"){ //已经停止完毕,从session中移除会话
|
||
auto &map = UiCommunicationBus::instance()->getSesstionMap();
|
||
for(auto iter = map.begin();iter != map.end();++iter){
|
||
if(iter->first == sClientId){
|
||
iter = map.erase(iter);
|
||
break;
|
||
}
|
||
}
|
||
CommunicationManager::instance()->disconnectWebSocket(sClientId);
|
||
CommunicationManager::instance()->removeChannel(sClientId);
|
||
}
|
||
}
|
||
}
|
||
else if(sType == "recommend"){
|
||
QJsonArray dataArr = data.toJsonArray();
|
||
for(const QJsonValue& value:dataArr){
|
||
QJsonObject dataObj = value.toObject();
|
||
QString input = dataObj.value("input").toString();
|
||
int offSet = dataObj.value("offset").toInt();
|
||
QJsonArray recommendedList = dataObj.value("recommended_list").toArray();
|
||
|
||
HttpRecommandInfo info;
|
||
for(const QJsonValue& value : recommendedList){
|
||
QString content = value.toString();
|
||
info.lstRecommand.append(content);
|
||
}
|
||
info.sInput = input.left(offSet);
|
||
info.nOffset = offSet;
|
||
if(_parentCavas){
|
||
_parentCavas->passRecommmandHttpData(info);
|
||
}
|
||
}
|
||
}
|
||
if(_parentCavas){
|
||
auto pDlg = _parentCavas->getConnectSettingDlg();
|
||
if(pDlg){
|
||
QJsonObject jsonObj = data.value<QJsonObject>();
|
||
QJsonDocument doc(jsonObj);
|
||
QString compactJson = doc.toJson(QJsonDocument::Compact);
|
||
pDlg->updateHttpLog(sType,compactJson);
|
||
}
|
||
}
|
||
}
|
||
|
||
void DataAccessor::onReceiveWebsocketData(const QVariant& data)
|
||
{
|
||
QJsonObject dataObj = data.toJsonObject();
|
||
if(dataObj.contains("targets")){
|
||
QJsonArray arrTarget = dataObj.value("targets").toArray();
|
||
for (const QJsonValue& value : arrTarget) {
|
||
QJsonObject targetObj = value.toObject();
|
||
QString targetId = targetObj["id"].toString();
|
||
QJsonArray arrData = targetObj["datas"].toArray();
|
||
|
||
QMap<quint64, double> newInnerMap;
|
||
for (const QJsonValue& data : arrData){
|
||
QJsonObject dataObj = data.toObject();
|
||
QString sTime = dataObj["time"].toString();
|
||
double dVal = dataObj["value"].toDouble();
|
||
bool ok = false;
|
||
quint64 value = sTime.toULongLong(&ok);
|
||
if (ok) {
|
||
newInnerMap.insert(value,dVal);
|
||
}
|
||
}
|
||
|
||
QMutexLocker locker(&m_mutex);
|
||
if (_realTimeData.contains(targetId)) {
|
||
_realTimeData[targetId].insert(newInnerMap);
|
||
|
||
} else {
|
||
_realTimeData.insert(targetId, newInnerMap);
|
||
}
|
||
}
|
||
}
|
||
if(_parentCavas){
|
||
auto pDlg = _parentCavas->getConnectSettingDlg();
|
||
if(pDlg){
|
||
QJsonObject jsonObj = data.value<QJsonObject>();
|
||
QJsonDocument doc(jsonObj);
|
||
QString compactJson = doc.toJson(QJsonDocument::Compact);
|
||
pDlg->updateWebsocketLog(compactJson);
|
||
}
|
||
}
|
||
}
|
||
|
||
QString DataAccessor::removeAfterStreamBySplit(const QString& url)
|
||
{
|
||
QStringList parts = url.split('/');
|
||
QStringList resultParts;
|
||
|
||
for (int i = 0; i < parts.size(); i++) {
|
||
if (parts[i] == "stream") {
|
||
// 找到 "stream" 后,停止添加后续部分
|
||
resultParts.append("stream");
|
||
break;
|
||
}
|
||
resultParts.append(parts[i]);
|
||
}
|
||
|
||
return resultParts.join('/');
|
||
}
|