#include "dataManager.h" #include "httpRequestManager.h" #include DataManager* DataManager::instance() { static DataManager instance; return &instance; } DataManager::DataManager() :m_httpManager(new HttpRequestManager(this)) { connect(m_httpManager, &HttpRequestManager::dataReceived, this, &DataManager::handleHttpDataReceived); connect(m_httpManager, &HttpRequestManager::requestFailed, this, &DataManager::handleHttpRequsetFailed); m_requestTimer.setInterval(100); connect(&m_requestTimer, &QTimer::timeout, this, &DataManager::processRequestQueue); m_requestTimer.start(); } DataManager::~DataManager() {} void DataManager::triggerDataUpdate(const QString& dataKey) { bool isHttpSource = false; { QReadLocker locker(&m_cacheLock); isHttpSource = m_dataSources.contains(dataKey) && m_httpManager->hasVaildEndpoint(dataKey); } if(isHttpSource) m_httpManager->requestData(dataKey); else { QThreadPool::globalInstance()->start([weakThis = QPointer(this), dataKey]() { //多线程中采用QPointer可以防止垂悬指针的问题,此处可以不用,但是建议习惯性的这样写 if(weakThis.isNull()) return; QVariant newData; bool success = false; //获取数据源 std::function fetcher; { QReadLocker locker(&weakThis->m_cacheLock); if(!weakThis->m_dataSources.contains(dataKey)) { qWarning() << "Data source not found for: " << dataKey; weakThis->m_pendingUpdates.remove(dataKey); return; } fetcher = weakThis->m_dataSources[dataKey]; } try { newData = fetcher(); success = true; } catch (const std::exception& e) { qWarning() << "Data fetch error for" << dataKey << ": " << e.what(); } QWriteLocker locker(&weakThis->m_cacheLock); if(weakThis->m_cache.contains(dataKey)) { CacheEntry& entry = weakThis->m_cache[dataKey]; if(success) { entry.data = newData; entry.timestamp = QDateTime::currentDateTime(); emit weakThis->dataUpdated(dataKey, newData, entry.timestamp); } entry.isUpdating = false; } weakThis->m_pendingUpdates.remove(dataKey); }); } } HttpRequestManager* DataManager::httpManager() { return m_httpManager; } void DataManager::registerDataSource(const QString& dataKey, std::function fetcher) { if(dataKey.isEmpty() || !fetcher) { qWarning() << "Invalid data source registration"; return; } QWriteLocker locker(&m_cacheLock); if(!m_dataSources.contains(dataKey)) m_dataSources[dataKey] = fetcher; //初始化缓存条目 if(!m_cache.contains(dataKey)) m_cache[dataKey] = CacheEntry(); } void DataManager::registerHttpDataSource(const QString& dataKey, const QUrl& url, const QByteArray& method, const QByteArray& body, const QMap& headers) { m_httpManager->registerEndpoint(dataKey, url, method, body, headers); //调用registerDataSource初始化缓存 registerDataSource(dataKey, [](){return QVariant();}); //http的数据通过httpManager的信号进行接收,所以生成函数给空值即可 } QVariant DataManager::getCacheData(const QString& dataKey) { if(dataKey.isEmpty()) return QVariant(); QReadLocker locker(&m_cacheLock); if(!m_cache.contains(dataKey)) { qWarning() << "Unknown data key:" << dataKey; return QVariant(); } return m_cache[dataKey].data; } bool DataManager::hasData(const QString& dataKey) { QReadLocker locker(&m_cacheLock); return m_cache.contains(dataKey) && !m_cache[dataKey].data.isNull(); } void DataManager::requestData(const QString& dataKey, QObject* requester) { QMutexLocker lock(&m_requestMutex); m_requestQueue.enqueue({dataKey, requester}); } void DataManager::invalidateCache(const QString& dataKey) { QWriteLocker locker(&m_cacheLock); if(dataKey.isEmpty()) { m_cache.clear(); emit cacheInvalidated(""); } else { m_cache.remove(dataKey); emit cacheInvalidated(dataKey); } } void DataManager::setCacheTimeout(int seconds) { m_cacheTimeout = qMax(60, seconds); //最小1分钟 } void DataManager::processRequestQueue() { //收集到本地,减少互斥锁的持有时间,后续都是对本地数据进行操作,无需加锁 QList requests; { QMutexLocker lock(&m_requestMutex); while(!m_requestQueue.isEmpty()) requests.append(m_requestQueue.dequeue()); } //按数据类型进行分组,减少更新频率 QMap> groupedRequests; for(const auto& req : requests) { if(req.requester.isNull()) continue; groupedRequests[req.dataKey].insert(req.requester); } QSet needsUpdate; QDateTime curTime = QDateTime::currentDateTime(); //减少写锁范围 { QWriteLocker locker(&m_cacheLock); for(auto it = groupedRequests.begin(); it != groupedRequests.end(); ++it) { const QString& dataKey = it.key(); const QSet& requesters = it.value(); if(!m_cache.contains(dataKey)) { qWarning() << "Skipping request for unknown data key:" << dataKey; continue; } CacheEntry& entry = m_cache[dataKey]; //entry.pendingRequests.unite(it.value()); bool needsRefresh = false; if(entry.timestamp.isNull() || entry.timestamp.msecsTo(curTime) > m_minRefreshInterval) needsRefresh = true; if(needsRefresh) { entry.isUpdating = true; m_pendingUpdates.insert(dataKey); needsUpdate.insert(dataKey); //本地数据无需加锁,减少锁范围 } } } //写锁在这里自动释放 //触发更新 for(const QString& dataKey : needsUpdate) triggerDataUpdate(dataKey); } void DataManager::handleHttpDataReceived(const QString& dataKey, const QVariant& data) { QWriteLocker locker(&m_cacheLock); if(m_cache.contains(dataKey)) { CacheEntry& entry = m_cache[dataKey]; entry.data = data; entry.timestamp = QDateTime::currentDateTime(); entry.isUpdating = false; emit dataUpdated(dataKey, data, entry.timestamp); } m_pendingUpdates.remove(dataKey); } void DataManager::handleHttpRequsetFailed(const QString& dataKey, const QVariant& data) { QWriteLocker locker(&m_cacheLock); if(m_cache.contains(dataKey)) { CacheEntry& entry = m_cache[dataKey]; entry.isUpdating = false; } m_pendingUpdates.remove(dataKey); }