#include "instance/dataAccessor.h" #include "communicationManager.h" #include "uiCommunicationBus.h" #include "configManager.h" #include "diagramCavas.h" #include "diagramConnectSetting.h" #include #include #include #include "global.h" const int DIAGRAM_MAX_DATA_COUNT = 1000; DataAccessor::DataAccessor(QObject* parent) : QObject(parent) ,_parentCavas(nullptr) { } DataAccessor::~DataAccessor() { } void DataAccessor::onReceiveHttpData(const QString& sType,const QVariant& data) { if(sType == "subscriptions"){ QMap>>& tempRequest = UiCommunicationBus::instance()->getTempRequestMap(); QJsonObject dataObj = data.toJsonObject(); QString sClientId = dataObj.value("client_id").toString(); QMap lstTarget; QJsonArray targetArr = dataObj.value("targets").toArray(); for(const QJsonValue& value : targetArr){ QJsonObject obj = value.toObject(); QString sId = obj["id"].toString(); QString sCode = obj["code"].toString(); qDebug() << "subscription:"+sId+"_"+sCode; if(!lstTarget.contains(sId)){ lstTarget.insert(sId,sCode); } } QString sAction; auto mapSesstion = UiCommunicationBus::instance()->getSesstionMap(); bool bClientExist = false; for(auto& session:mapSesstion){ if(session.first == sClientId){ //在会话列表中已存在,是stop(暂只使用stop和start) bClientExist = true; sAction = "stop"; break; } } if(sAction.isEmpty()){ //不是stop的情况 QStringList lstKeys = lstTarget.keys(); for(auto it = tempRequest.begin(); it != tempRequest.end(); ++it){ const QString& page = it.key(); QList>& tempList = it.value(); // 从 tempList 提取所有 first 元素 QStringList firstElements; firstElements.reserve(tempList.size()); for (const auto& pair : tempList) { firstElements.append(pair.first); } // 对两个列表进行排序(因为 moveMatchingRequests 内部会排序比较) QStringList sortedFirstElements = firstElements; QStringList sortedLstKeys = lstKeys; std::sort(sortedFirstElements.begin(), sortedFirstElements.end()); std::sort(sortedLstKeys.begin(), sortedLstKeys.end()); // 比较两个列表是否相同 if (sortedFirstElements == sortedLstKeys) { // 调用 moveMatchingRequests for(auto& pair:tempList){ pair.second = "connecting"; } UiCommunicationBus::instance()->insertSesstionMap(sClientId, lstTarget); sAction = "start"; break; } } } if(!lstTarget.isEmpty()){ if(sAction == "start"){ auto config = ConfigManager::instance()->getWebSocketConfig(); QString sPre = removeAfterStreamBySplit(config.endpoint); //手动移除 config.endpoint = sPre + "/" + sClientId; CommunicationManager::instance()->updateWebSocketConfig(config,sClientId); bool res = CommunicationManager::instance()->connectWebSocket(sClientId); int a = 1; } else if(sAction == "stop"){ //已经停止完毕,从session中移除会话 auto &map = UiCommunicationBus::instance()->getSesstionMap(); for(auto iter = map.begin();iter != map.end();++iter){ if(iter->first == sClientId){ iter = map.erase(iter); break; } } CommunicationManager::instance()->disconnectWebSocket(sClientId); CommunicationManager::instance()->removeChannel(sClientId); } } } else if(sType == "recommend"){ QJsonArray dataArr = data.toJsonArray(); for(const QJsonValue& value:dataArr){ QJsonObject dataObj = value.toObject(); QString input = dataObj.value("input").toString(); int offSet = dataObj.value("offset").toInt(); QJsonArray recommendedList = dataObj.value("recommended_list").toArray(); HttpRecommandInfo info; for(const QJsonValue& value : recommendedList){ QString content = value.toString(); info.lstRecommand.append(content); } info.sInput = input.left(offSet); info.nOffset = offSet; if(_parentCavas){ _parentCavas->passRecommmandHttpData(info); } } } else if(sType == "subscriptionTest"){ qDebug()<<"receive null data"; } if(_parentCavas){ auto pDlg = _parentCavas->getConnectSettingDlg(); if(pDlg){ QJsonObject jsonObj = data.value(); QJsonDocument doc(jsonObj); QString compactJson = doc.toJson(QJsonDocument::Compact); pDlg->updateHttpLog(sType,compactJson); } } } void DataAccessor::onReceiveWebsocketData(const QVariant& data) { QJsonObject dataObj = data.toJsonObject(); if(dataObj.contains("targets")){ QJsonArray arrTarget = dataObj.value("targets").toArray(); for (const QJsonValue& value : arrTarget) { QJsonObject targetObj = value.toObject(); QString targetId = targetObj["id"].toString(); QJsonArray arrData = targetObj["datas"].toArray(); QMap newInnerMap; for (const QJsonValue& data : arrData){ QJsonObject dataObj = data.toObject(); QString sTime = dataObj["time"].toString(); double dVal = dataObj["value"].toDouble(); bool ok = false; quint64 value = sTime.toULongLong(&ok); if (ok) { newInnerMap.insert(value,dVal); } } QMutexLocker locker(&m_mutex); auto& innerMap = _realTimeData[targetId]; // 自动创建或获取 // 批量插入 innerMap.insert(newInnerMap); // 如果数量超限,从开始处批量删除 int currentSize = innerMap.size(); if (currentSize > DIAGRAM_MAX_DATA_COUNT) { int toRemove = currentSize - DIAGRAM_MAX_DATA_COUNT; auto it = innerMap.begin(); // 批量删除最旧的toRemove个元素 for (int i = 0; i < toRemove && it != innerMap.end(); ++i) { it = innerMap.erase(it); } } } } if(_parentCavas){ auto pDlg = _parentCavas->getConnectSettingDlg(); if(pDlg){ QJsonObject jsonObj = data.value(); QJsonDocument doc(jsonObj); QString compactJson = doc.toJson(QJsonDocument::Compact); pDlg->updateWebsocketLog(compactJson); } } } QMap> DataAccessor::getTargetData(QStringList paraLst) { QMap> mapData; QMutexLocker locker(&m_mutex); for (const QString& key : paraLst) { if (_realTimeData.contains(key)) { mapData.insert(key, _realTimeData.value(key)); } } return mapData; } QString DataAccessor::removeAfterStreamBySplit(const QString& url) { QStringList parts = url.split('/'); QStringList resultParts; for (int i = 0; i < parts.size(); i++) { if (parts[i] == "stream") { // 找到 "stream" 后,停止添加后续部分 resultParts.append("stream"); break; } resultParts.append(parts[i]); } return resultParts.join('/'); }