241 lines
7.1 KiB
C++
241 lines
7.1 KiB
C++
#include "dataManager.h"
|
||
#include "httpRequestManager.h"
|
||
#include <QtConcurrent>
|
||
|
||
DataManager* DataManager::instance()
|
||
{
|
||
static DataManager instance;
|
||
return &instance;
|
||
}
|
||
|
||
DataManager::DataManager()
|
||
:m_httpManager(new HttpRequestManager(this))
|
||
{
|
||
connect(m_httpManager, &HttpRequestManager::dataReceived, this, &DataManager::handleHttpDataReceived);
|
||
connect(m_httpManager, &HttpRequestManager::requestFailed, this, &DataManager::handleHttpRequsetFailed);
|
||
|
||
m_requestTimer.setInterval(100);
|
||
connect(&m_requestTimer, &QTimer::timeout, this, &DataManager::processRequestQueue);
|
||
m_requestTimer.start();
|
||
}
|
||
DataManager::~DataManager()
|
||
{}
|
||
|
||
void DataManager::triggerDataUpdate(const QString& dataKey)
|
||
{
|
||
bool isHttpSource = false;
|
||
{
|
||
QReadLocker locker(&m_cacheLock);
|
||
isHttpSource = m_dataSources.contains(dataKey) && m_httpManager->hasVaildEndpoint(dataKey);
|
||
}
|
||
|
||
if(isHttpSource)
|
||
m_httpManager->requestData(dataKey);
|
||
else
|
||
{
|
||
QThreadPool::globalInstance()->start([weakThis = QPointer<DataManager>(this), dataKey]() { //多线程中采用QPointer可以防止垂悬指针的问题,此处可以不用,但是建议习惯性的这样写
|
||
if(weakThis.isNull())
|
||
return;
|
||
|
||
QVariant newData;
|
||
bool success = false;
|
||
//获取数据源
|
||
std::function<QVariant()> fetcher;
|
||
{
|
||
QReadLocker locker(&weakThis->m_cacheLock);
|
||
if(!weakThis->m_dataSources.contains(dataKey))
|
||
{
|
||
qWarning() << "Data source not found for: " << dataKey;
|
||
weakThis->m_pendingUpdates.remove(dataKey);
|
||
return;
|
||
}
|
||
fetcher = weakThis->m_dataSources[dataKey];
|
||
}
|
||
|
||
try {
|
||
newData = fetcher();
|
||
success = true;
|
||
} catch (const std::exception& e) {
|
||
qWarning() << "Data fetch error for" << dataKey << ": " << e.what();
|
||
}
|
||
|
||
QWriteLocker locker(&weakThis->m_cacheLock);
|
||
if(weakThis->m_cache.contains(dataKey))
|
||
{
|
||
CacheEntry& entry = weakThis->m_cache[dataKey];
|
||
if(success)
|
||
{
|
||
entry.data = newData;
|
||
entry.timestamp = QDateTime::currentDateTime();
|
||
emit weakThis->dataUpdated(dataKey, newData, entry.timestamp);
|
||
}
|
||
entry.isUpdating = false;
|
||
}
|
||
|
||
weakThis->m_pendingUpdates.remove(dataKey);
|
||
});
|
||
}
|
||
}
|
||
|
||
HttpRequestManager* DataManager::httpManager()
|
||
{
|
||
return m_httpManager;
|
||
}
|
||
|
||
void DataManager::registerDataSource(const QString& dataKey, std::function<QVariant()> fetcher)
|
||
{
|
||
if(dataKey.isEmpty() || !fetcher)
|
||
{
|
||
qWarning() << "Invalid data source registration";
|
||
return;
|
||
}
|
||
|
||
QWriteLocker locker(&m_cacheLock);
|
||
m_dataSources[dataKey] = fetcher;
|
||
|
||
//初始化缓存条目
|
||
if(!m_cache.contains(dataKey))
|
||
m_cache[dataKey] = CacheEntry();
|
||
}
|
||
|
||
void DataManager::registerHttpDataSource(const QString& dataKey, const QUrl& url, const QByteArray& method,
|
||
const QByteArray& body, const QMap<QByteArray, QByteArray>& headers)
|
||
{
|
||
m_httpManager->registerEndpoint(dataKey, url, method, body, headers);
|
||
//调用registerDataSource初始化缓存
|
||
registerDataSource(dataKey, [](){return QVariant();}); //http的数据通过httpManager的信号进行接收,所以生成函数给空值即可
|
||
}
|
||
|
||
QVariant DataManager::getCacheData(const QString& dataKey)
|
||
{
|
||
if(dataKey.isEmpty())
|
||
return QVariant();
|
||
|
||
QReadLocker locker(&m_cacheLock);
|
||
if(!m_cache.contains(dataKey))
|
||
{
|
||
qWarning() << "Unknown data key:" << dataKey;
|
||
return QVariant();
|
||
}
|
||
|
||
return m_cache[dataKey].data;
|
||
}
|
||
|
||
bool DataManager::hasData(const QString& dataKey)
|
||
{
|
||
QReadLocker locker(&m_cacheLock);
|
||
return m_cache.contains(dataKey) && !m_cache[dataKey].data.isNull();
|
||
}
|
||
|
||
void DataManager::requestData(const QString& dataKey, QObject* requester)
|
||
{
|
||
QMutexLocker lock(&m_requestMutex);
|
||
m_requestQueue.enqueue({dataKey, requester});
|
||
}
|
||
|
||
void DataManager::invalidateCache(const QString& dataKey)
|
||
{
|
||
QWriteLocker locker(&m_cacheLock);
|
||
if(dataKey.isEmpty())
|
||
{
|
||
m_cache.clear();
|
||
emit cacheInvalidated("");
|
||
}
|
||
else
|
||
{
|
||
m_cache.remove(dataKey);
|
||
emit cacheInvalidated(dataKey);
|
||
}
|
||
}
|
||
|
||
void DataManager::setCacheTimeout(int seconds)
|
||
{
|
||
m_cacheTimeout = qMax(60, seconds); //最小1分钟
|
||
}
|
||
|
||
void DataManager::processRequestQueue()
|
||
{
|
||
//收集到本地,减少互斥锁的持有时间,后续都是对本地数据进行操作,无需加锁
|
||
QList<RequestInfo> requests;
|
||
{
|
||
QMutexLocker lock(&m_requestMutex);
|
||
while(!m_requestQueue.isEmpty())
|
||
requests.append(m_requestQueue.dequeue());
|
||
}
|
||
|
||
//按数据类型进行分组,减少更新频率
|
||
QMap<QString, QSet<QObject*>> groupedRequests;
|
||
for(const auto& req : requests)
|
||
{
|
||
if(req.requester.isNull())
|
||
continue;
|
||
groupedRequests[req.dataKey].insert(req.requester);
|
||
}
|
||
|
||
QSet<QString> needsUpdate;
|
||
QDateTime curTime = QDateTime::currentDateTime();
|
||
|
||
//减少写锁范围
|
||
{
|
||
QWriteLocker locker(&m_cacheLock);
|
||
for(auto it = groupedRequests.begin(); it != groupedRequests.end(); ++it)
|
||
{
|
||
const QString& dataKey = it.key();
|
||
const QSet<QObject*>& requesters = it.value();
|
||
|
||
if(!m_cache.contains(dataKey))
|
||
{
|
||
qWarning() << "Skipping request for unknown data key:" << dataKey;
|
||
continue;
|
||
}
|
||
|
||
CacheEntry& entry = m_cache[dataKey];
|
||
//entry.pendingRequests.unite(it.value());
|
||
|
||
bool needsRefresh = false;
|
||
if(entry.timestamp.isNull() || entry.timestamp.msecsTo(curTime) > m_minRefreshInterval)
|
||
needsRefresh = true;
|
||
if(needsRefresh)
|
||
{
|
||
entry.isUpdating = true;
|
||
m_pendingUpdates.insert(dataKey);
|
||
needsUpdate.insert(dataKey); //本地数据无需加锁,减少锁范围
|
||
}
|
||
}
|
||
} //写锁在这里自动释放
|
||
|
||
//触发更新
|
||
for(const QString& dataKey : needsUpdate)
|
||
triggerDataUpdate(dataKey);
|
||
}
|
||
|
||
void DataManager::handleHttpDataReceived(const QString& dataKey, const QVariant& data)
|
||
{
|
||
QWriteLocker locker(&m_cacheLock);
|
||
|
||
if(m_cache.contains(dataKey))
|
||
{
|
||
CacheEntry& entry = m_cache[dataKey];
|
||
entry.data = data;
|
||
entry.timestamp = QDateTime::currentDateTime();
|
||
entry.isUpdating = false;
|
||
|
||
emit dataUpdated(dataKey, data, entry.timestamp);
|
||
}
|
||
|
||
m_pendingUpdates.remove(dataKey);
|
||
}
|
||
|
||
void DataManager::handleHttpRequsetFailed(const QString& dataKey, const QVariant& data)
|
||
{
|
||
QWriteLocker locker(&m_cacheLock);
|
||
|
||
if(m_cache.contains(dataKey))
|
||
{
|
||
CacheEntry& entry = m_cache[dataKey];
|
||
entry.isUpdating = false;
|
||
}
|
||
|
||
m_pendingUpdates.remove(dataKey);
|
||
}
|