#include "alarmEventDataService.h" #include #include #include #include #include #include #include #include #include AlarmEventDataService* AlarmEventDataService::instance() { static AlarmEventDataService instance; return &instance; } AlarmEventDataService::AlarmEventDataService() : m_serviceStatus(ServiceStatus::Uninitialized) //, m_realTimeConnectionStatus(ConnectionStatus::Disconnected) //, m_historicalConnectionStatus(ConnectionStatus::Disconnected) , m_amqpClient(nullptr) , m_amqpQueue(nullptr) , m_amqpExchange(nullptr) , m_reconnectAttempts(0) , m_maxReconnectAttempts(10) { m_reconnectTimer = new QTimer(this); m_reconnectTimer->setSingleShot(true); connect(m_reconnectTimer, &QTimer::timeout, this, &AlarmEventDataService::onReconnectTimeout); m_networkManager = new QNetworkAccessManager(this); } AlarmEventDataService::~AlarmEventDataService() {} bool AlarmEventDataService::initialize(const ServiceConfig& config) { if(m_serviceStatus != ServiceStatus::Uninitialized) return false; if(!config.isValid()) return false; m_config = config; m_serviceStatus = ServiceStatus::Disconnected; return true; } void AlarmEventDataService::start() { if(m_serviceStatus != ServiceStatus::Disconnected) return; m_serviceStatus = ServiceStatus::Connecting; //启动实时数据服务 startRealTimeDataService(); } void AlarmEventDataService::stop() { cancelReconnect(); cleanupRabbitMQConnection(); m_serviceStatus = ServiceStatus::Disconnected; } void AlarmEventDataService::queryHistoricalEvents(const QDateTime& startTime, const QDateTime& endTime) { QUrl url = bulidHistoricalQueryUrl(startTime, endTime); if(!url.isValid()) return; //创建网络请求 QNetworkRequest request(url); request.setTransferTimeout(m_config.historicalConfig.timeout); //发送请求 QNetworkReply* reply = m_networkManager->get(request); connect(reply, &QNetworkReply::finished, this, [this, reply](){ onHistoricalRequestFinished(reply); }); } void AlarmEventDataService::startRealTimeDataService() { cleanupRabbitMQConnection(); m_amqpClient = new QAmqpClient(this); m_amqpClient->setHost(m_config.rabbitMQConfig.host); m_amqpClient->setPort(m_config.rabbitMQConfig.port); m_amqpClient->setUsername(m_config.rabbitMQConfig.username); m_amqpClient->setPassword(m_config.rabbitMQConfig.password); m_amqpClient->setVirtualHost(m_config.rabbitMQConfig.virtualHost); connect(m_amqpClient, &QAmqpClient::connected, this, &AlarmEventDataService::onRabbitMQConnected); connect(m_amqpClient, &QAmqpClient::disconnected, this, &AlarmEventDataService::onRabbitMQDisconnected); connect(m_amqpClient, QOverload::of(&QAmqpClient::error), this, &AlarmEventDataService::onRabbitMQError); } void AlarmEventDataService::cleanupRabbitMQConnection() { if(m_amqpQueue) { m_amqpQueue->deleteLater(); m_amqpQueue = nullptr; } if(m_amqpExchange) { m_amqpExchange->deleteLater(); m_amqpExchange = nullptr; } if(m_amqpClient) { m_amqpClient->disconnectFromHost(); m_amqpClient->deleteLater(); m_amqpClient = nullptr; } } void AlarmEventDataService::scheduleReconnect() { if (m_reconnectAttempts < m_maxReconnectAttempts) { int delay = m_config.rabbitMQConfig.reconnectInterval * (1 << m_reconnectAttempts); // 指数退避,<start(delay); m_reconnectAttempts++; } else qInfo() << "Maximum reconnection attempts reached"; } void AlarmEventDataService::cancelReconnect() { m_reconnectTimer->stop(); m_reconnectAttempts = 0; } MessageHandleResult AlarmEventDataService::processMessage(const QAmqpMessage& message) { QString errorString; EventData event = parseEventFromMessage(message.payload(), errorString); if(!errorString.isEmpty()) return MessageHandleResult::ParseError; if(!validateEvent(event)) return MessageHandleResult::ValidationError; emit realTimeEventReceived(event); return MessageHandleResult::Success; } EventData AlarmEventDataService::parseEventFromMessage(const QByteArray& data, QString& errorString) { QJsonParseError parseError; QJsonDocument doc = QJsonDocument::fromJson(data, &parseError); if(parseError.error != QJsonParseError::NoError) { errorString = QString("JSON parse error: %1 at offset %2") .arg(parseError.errorString()) .arg(parseError.offset); return EventData(); } if(!doc.isObject()) { errorString = "JSON document is not an object"; return EventData(); } QJsonObject obj = doc.object(); EventData event = EventData::fromJson(obj); return event; } bool AlarmEventDataService::validateEvent(const EventData& event) { if(event.id.isEmpty()) return false; QDateTime eventTime = QDateTime::fromMSecsSinceEpoch(event.timestamp); if(!eventTime.isValid()) return false; return true; } QUrl AlarmEventDataService::bulidHistoricalQueryUrl(const QDateTime& startTime, const QDateTime& endTime) { QUrl url = m_config.historicalConfig.baseUrl; QUrlQuery query; query.addQueryItem("begin", QString::number(startTime.toMSecsSinceEpoch())); query.addQueryItem("end", QString::number(startTime.toMSecsSinceEpoch())); url.setQuery(query); return url; } void AlarmEventDataService::processHistoricalResponse(const QByteArray& data) { QJsonParseError parseError; QJsonDocument doc = QJsonDocument::fromJson(data, &parseError); if(parseError.error != QJsonParseError::NoError) { QString errorMsg = QString("Historical Events JSON parse error: %1 at offset %2") .arg(parseError.errorString()) .arg(parseError.offset); emit historicalQueryError(errorMsg); } if(!doc.isObject()) { QString errorMsg = "Historical Events JSON document is not an object"; emit historicalQueryError(errorMsg); return ; } QJsonObject obj = doc.object(); QJsonValue dataValue = obj.value("data"); if(dataValue.isArray()) { QList historicalEvents; QJsonArray eventArray = dataValue.toArray(); int size = eventArray.size(); for(int i = 0; i < size; ++i) { QJsonValue eventValue = eventArray.at(i); if(!eventValue.isObject()) continue; QJsonObject eventObj = eventValue.toObject(); EventData event = EventData::fromJson(eventObj); if(!event.id.isEmpty()) historicalEvents.append(event); } emit historicalQuertData(historicalEvents); } } // ==================== 槽函数 ==================== void AlarmEventDataService::onRabbitMQConnected() { m_serviceStatus = ServiceStatus::Connected; //Exchange m_amqpExchange = m_amqpClient->createExchange(m_config.rabbitMQConfig.exchangeName); m_amqpExchange->declare(QAmqpExchange::FanOut, QAmqpExchange::Durable); //Queue m_amqpQueue = m_amqpClient->createQueue(m_config.rabbitMQConfig.queueName); m_amqpQueue->declare(QAmqpQueue::AutoDelete); m_amqpQueue->bind(m_amqpExchange, m_config.rabbitMQConfig.routingKey); //对于广播类型的交换机,消息会发送给所有与之绑定的队列,所以此时routingKey无所谓 connect(m_amqpQueue, &QAmqpQueue::messageReceived, this, &AlarmEventDataService::onMessageReceived); if(m_config.rabbitMQConfig.autoAck) m_amqpQueue->consume(QAmqpQueue::coNoAck); else m_amqpQueue->consume(0); //0表示不适用任何特殊选项,即手动确认 } void AlarmEventDataService::onRabbitMQDisconnected() { qWarning() << "Disconnected to RabbitMQ"; m_serviceStatus = ServiceStatus::Disconnected; scheduleReconnect(); } void AlarmEventDataService::onRabbitMQError(QAMQP::Error error) { qWarning() << m_amqpClient->errorString(); m_serviceStatus = ServiceStatus::Error; scheduleReconnect(); } void AlarmEventDataService::onMessageReceived() { QAmqpMessage message = m_amqpQueue->dequeue(); MessageHandleResult result = processMessage(message); //手动确认消息 if(!m_config.rabbitMQConfig.autoAck && result == MessageHandleResult::Success) m_amqpQueue->ack(message); } void AlarmEventDataService::onReconnectTimeout() { if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Reconnecting) return; m_serviceStatus = ServiceStatus::Reconnecting; startRealTimeDataService(); } void AlarmEventDataService::onHistoricalRequestFinished(QNetworkReply* reply) { if(reply->error() == QNetworkReply::NoError) { QByteArray data = reply->readAll(); processHistoricalResponse(data); } else { QString errorMsg = QString("Request HistoricalEvents error: %1").arg(reply->errorString()); emit historicalQueryError(errorMsg); } reply->deleteLater(); }