PowerMaster/source/dataManager.cpp

237 lines
6.8 KiB
C++
Raw Normal View History

#include "dataManager.h"
#include "httpRequestManager.h"
#include <QtConcurrent>
DataManager* DataManager::instance()
{
static DataManager instance;
return &instance;
}
DataManager::DataManager()
:m_httpManager(new HttpRequestManager(this))
{
connect(m_httpManager, &HttpRequestManager::dataReceived, this, &DataManager::handleHttpDataReceived);
connect(m_httpManager, &HttpRequestManager::requestFailed, this, &DataManager::handleHttpRequsetFailed);
m_requestTimer.setInterval(100);
connect(&m_requestTimer, &QTimer::timeout, this, &DataManager::processRequestQueue);
m_requestTimer.start();
}
DataManager::~DataManager()
{}
void DataManager::triggerDataUpdate(const QString& dataKey)
{
bool isHttpSource = false;
{
QReadLocker locker(&m_cacheLock);
isHttpSource = m_dataSources.contains(dataKey) && m_httpManager->hasVaildEndpoint(dataKey);
}
if(isHttpSource)
m_httpManager->requestData(dataKey);
else
{
QThreadPool::globalInstance()->start([weakThis = QPointer<DataManager>(this), dataKey]() {
if(weakThis.isNull())
return;
QVariant newData;
bool success = false;
//获取数据源
std::function<QVariant()> fetcher;
{
QReadLocker locker(&weakThis->m_cacheLock);
if(!weakThis->m_dataSources.contains(dataKey))
{
qWarning() << "Data source not found for: " << dataKey;
weakThis->m_pendingUpdates.remove(dataKey);
return;
}
fetcher = weakThis->m_dataSources[dataKey];
}
try {
newData = fetcher();
success = true;
} catch (const std::exception& e) {
qWarning() << "Data fetch error for" << dataKey << ": " << e.what();
}
QWriteLocker locker(&weakThis->m_cacheLock);
if(weakThis->m_cache.contains(dataKey))
{
CacheEntry& entry = weakThis->m_cache[dataKey];
if(success)
{
entry.data = newData;
entry.timestamp = QDateTime::currentDateTime();
emit weakThis->dataUpdated(dataKey, newData);
}
entry.isUpdating = false;
}
weakThis->m_pendingUpdates.remove(dataKey);
});
}
}
HttpRequestManager* DataManager::httpManager()
{
return m_httpManager;
}
void DataManager::registerDataSource(const QString& dataKey, std::function<QVariant()> fetcher)
{
if(dataKey.isEmpty() || !fetcher)
{
qWarning() << "Invalid data source registration";
return;
}
QWriteLocker locker(&m_cacheLock);
m_dataSources[dataKey] = fetcher;
//初始化缓存条目
if(!m_cache.contains(dataKey))
m_cache[dataKey] = CacheEntry();
}
void DataManager::registerHttpDataSource(const QString& dataKey, const QUrl& url, const QByteArray& method,
const QByteArray& body, const QMap<QByteArray, QByteArray>& headers)
{
m_httpManager->registerEndpoint(dataKey, url, method, body, headers);
//调用registerDataSource初始化缓存
registerDataSource(dataKey, [](){return QVariant();}); //http的数据通过httpManager的信号进行接收所以生成函数给空值即可
}
QVariant DataManager::getCacheData(const QString& dataKey)
{
if(dataKey.isEmpty())
return QVariant();
QReadLocker locker(&m_cacheLock);
if(!m_cache.contains(dataKey))
{
qWarning() << "Unknown data key:" << dataKey;
return QVariant();
}
return m_cache[dataKey].data;
}
bool DataManager::hasData(const QString& dataKey)
{
QReadLocker locker(&m_cacheLock);
return m_cache.contains(dataKey) && !m_cache[dataKey].data.isNull();
}
void DataManager::requestData(const QString& dataKey, QObject* requester)
{
QMutexLocker lock(&m_requestMutex);
m_requestQueue.enqueue({dataKey, requester});
}
void DataManager::invalidateCache(const QString& dataKey)
{
QWriteLocker locker(&m_cacheLock);
if(dataKey.isEmpty())
{
m_cache.clear();
emit cacheInvalidated("");
}
else
{
m_cache.remove(dataKey);
emit cacheInvalidated(dataKey);
}
}
void DataManager::setCacheTimeout(int seconds)
{
m_cacheTimeout = qMax(60, seconds); //最小1分钟
}
void DataManager::processRequestQueue()
{
//收集到本地,减少互斥锁的持有时间,后续都是对本地数据进行操作,无需加锁
QList<RequestInfo> requests;
{
QMutexLocker lock(&m_requestMutex);
while(!m_requestQueue.isEmpty())
requests.append(m_requestQueue.dequeue());
}
//按数据类型进行分组,减少更新频率
QMap<QString, QSet<QObject*>> groupedRequests;
for(const auto& req : requests)
groupedRequests[req.dataKey].insert(req.requester);
QSet<QString> needsUpdate;
QDateTime curTime = QDateTime::currentDateTime();
//减少写锁范围
{
QWriteLocker locker(&m_cacheLock);
for(auto it = groupedRequests.begin(); it != groupedRequests.end(); ++it)
{
const QString& dataKey = it.key();
const QSet<QObject*>& requesters = it.value();
if(!m_cache.contains(dataKey))
{
qWarning() << "Skipping request for unknown data key:" << dataKey;
continue;
}
CacheEntry& entry = m_cache[dataKey];
//entry.pendingRequests.unite(it.value());
bool needsRefresh = false;
if(entry.timestamp.isNull() || entry.timestamp.msecsTo(curTime) > m_minRefreshInterval)
needsRefresh = true;
if(needsRefresh)
{
entry.isUpdating = true;
m_pendingUpdates.insert(dataKey);
needsUpdate.insert(dataKey);
}
}
} //写锁在这里自动释放
//触发更新
for(const QString& dataKey : needsUpdate)
triggerDataUpdate(dataKey);
}
void DataManager::handleHttpDataReceived(const QString& dataKey, const QVariant& data)
{
QWriteLocker locker(&m_cacheLock);
if(m_cache.contains(dataKey))
{
CacheEntry& entry = m_cache[dataKey];
entry.data = data;
entry.timestamp = QDateTime::currentDateTime();
entry.isUpdating = false;
emit dataUpdated(dataKey, data);
}
m_pendingUpdates.remove(dataKey);
}
void DataManager::handleHttpRequsetFailed(const QString& dataKey, const QVariant& data)
{
QWriteLocker locker(&m_cacheLock);
if(m_cache.contains(dataKey))
{
CacheEntry& entry = m_cache[dataKey];
entry.isUpdating = false;
}
m_pendingUpdates.remove(dataKey);
}