feat:创建实时数据本地缓存框架
This commit is contained in:
parent
83b6848734
commit
78437aee5e
|
|
@ -4,6 +4,56 @@
|
|||
#include "networkCommon.h"
|
||||
#include <QObject>
|
||||
#include <QWebSocket>
|
||||
|
||||
struct MeasurementDataPoint
|
||||
{
|
||||
qint64 timestamp;
|
||||
double value;
|
||||
|
||||
MeasurementDataPoint() : timestamp(0), value(0.0)
|
||||
{}
|
||||
MeasurementDataPoint(qint64 st, double val) : timestamp(st), value(val)
|
||||
{}
|
||||
|
||||
bool operator < (const MeasurementDataPoint& other) const
|
||||
{
|
||||
return timestamp < other.timestamp;
|
||||
}
|
||||
|
||||
bool operator == (const MeasurementDataPoint& other) const
|
||||
{
|
||||
return timestamp == other.timestamp;
|
||||
}
|
||||
};
|
||||
|
||||
// 用于QHash的哈希函数
|
||||
inline size_t qHash(const MeasurementDataPoint& dp, size_t seed = 0)
|
||||
{
|
||||
return qHash(dp.timestamp, seed);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief 本地数据存储
|
||||
*
|
||||
* 存储接收到的实时数据,具有去除重复数据功能
|
||||
*/
|
||||
class MeasurementData
|
||||
{
|
||||
public:
|
||||
explicit MeasurementData(const QString& id);
|
||||
|
||||
int insertData(const QVector<MeasurementDataPoint>& newData);
|
||||
QVector<MeasurementDataPoint> getLatestData(int count);
|
||||
QVector<MeasurementDataPoint> getDataInRange(qint64 startTime, qint64 endTime);
|
||||
void cleanupOldData(qint64 cutoffTime);
|
||||
|
||||
private:
|
||||
QString m_id;
|
||||
QSet<qint64> m_timestamps; //用于快速查找是否存在改时间戳的数据
|
||||
QMap<qint64, double> m_data; //按时间戳对数据点进行排序
|
||||
qint64 m_latestTimestamp;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief WebSocket数据客户端
|
||||
*
|
||||
|
|
|
|||
|
|
@ -146,7 +146,10 @@ void MeasurementDataManager::processSubscriptionResponse(const QString& action,
|
|||
{
|
||||
QJsonObject payloadObj = payloadValue.toObject();
|
||||
if(action == "start")
|
||||
{
|
||||
m_clientID = payloadObj.value("client_id").toString();
|
||||
getSubscriptionRealtimeData();
|
||||
}
|
||||
QJsonValue targetsValue = payloadObj.value("targets");
|
||||
if(targetsValue.isArray())
|
||||
{
|
||||
|
|
@ -208,7 +211,19 @@ void MeasurementDataManager::processSubscriptionResponse(const QString& action,
|
|||
|
||||
void MeasurementDataManager::getSubscriptionRealtimeData()
|
||||
{
|
||||
if(m_clientID.isEmpty())
|
||||
return;
|
||||
|
||||
QString strUrl = QString("ws:://%1:%2/%3/%4").arg(m_serviceConfig.realtimeCfg.host
|
||||
, QString::number(m_serviceConfig.realtimeCfg.port)
|
||||
, m_serviceConfig.realtimeCfg.websocketPath
|
||||
, m_clientID);
|
||||
QUrl url(strUrl);
|
||||
if(!m_webSocketClient->connectToServer(url))
|
||||
{
|
||||
QString errorMsg = QString("Connect to WebSocketServer failed.");
|
||||
LOG_ERROR("WebSocket", errorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void MeasurementDataManager::onReciveRealtimeData(const QString& dataMsg)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,65 @@
|
|||
#include <QTimer>
|
||||
#include <QRandomGenerator>
|
||||
|
||||
MeasurementData::MeasurementData(const QString&id)
|
||||
: m_id(id)
|
||||
{}
|
||||
|
||||
int MeasurementData::insertData(const QVector<MeasurementDataPoint>& newData)
|
||||
{
|
||||
int insertedCount = 0;
|
||||
|
||||
for(const auto& point : newData)
|
||||
{
|
||||
if (point.timestamp > m_latestTimestamp && !m_timestamps.contains(point.timestamp))
|
||||
{
|
||||
|
||||
m_timestamps.insert(point.timestamp);
|
||||
m_data.insert(point.timestamp, point.value);
|
||||
insertedCount++;
|
||||
|
||||
m_latestTimestamp = point.timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
return insertedCount;
|
||||
}
|
||||
|
||||
QVector<MeasurementDataPoint> MeasurementData::getLatestData(int count)
|
||||
{
|
||||
QVector<MeasurementDataPoint> result;
|
||||
|
||||
auto it = m_data.constEnd();
|
||||
while(it != m_data.constBegin() && result.size() < count)
|
||||
{
|
||||
--it;
|
||||
result.append(MeasurementDataPoint(it.key(), it.value()));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
QVector<MeasurementDataPoint> MeasurementData::getDataInRange(qint64 startTime, qint64 endTime)
|
||||
{
|
||||
QVector<MeasurementDataPoint> result;
|
||||
|
||||
auto end = m_data.upperBound(endTime);
|
||||
for(auto it = m_data.lowerBound(startTime); it != end; ++it)
|
||||
result.append(MeasurementDataPoint(it.key(), it.value()));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void MeasurementData::cleanupOldData(qint64 cutoffTime)
|
||||
{
|
||||
auto it = m_data.begin();
|
||||
while(it != m_data.end() && it.key() < cutoffTime)
|
||||
{
|
||||
m_timestamps.remove(it.key());
|
||||
it = m_data.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
WebSocketClient::WebSocketClient(QObject* parent)
|
||||
: QObject(parent)
|
||||
, m_webSocket(nullptr)
|
||||
|
|
|
|||
Loading…
Reference in New Issue