#include "alarmEventDataService.h" #include #include #include #include #include #include #include #include #include #include #include #include AlarmEventDataService* AlarmEventDataService::instance() { static AlarmEventDataService instance; return &instance; } AlarmEventDataService::AlarmEventDataService() : m_serviceStatus(ServiceStatus::Uninitialized) //, m_realTimeConnectionStatus(ConnectionStatus::Disconnected) //, m_historicalConnectionStatus(ConnectionStatus::Disconnected) , m_amqpClient(nullptr) , m_amqpQueue(nullptr) , m_amqpExchange(nullptr) , m_reconnectAttempts(0) , m_maxReconnectAttempts(10) { m_reconnectTimer = new QTimer(this); m_reconnectTimer->setSingleShot(true); connect(m_reconnectTimer, &QTimer::timeout, this, &AlarmEventDataService::onReconnectTimeout); m_networkManager = new QNetworkAccessManager(this); initialize(); } AlarmEventDataService::~AlarmEventDataService() { stop(); } void AlarmEventDataService::initialize() { //读取配置文件 QString cofigDir = QCoreApplication::applicationDirPath(); m_settingsFile = cofigDir + "/config/alarmEventService_config.ini"; QFile file(m_settingsFile); if(file.open(QIODevice::ReadWrite)) { m_isVaildSettingsFile = true; m_settings = new QSettings(m_settingsFile, QSettings::IniFormat); //RabbitMQConfig m_config.rabbitMQConfig.host = m_settings->value("RabbitMQConfig/host").toString(); m_config.rabbitMQConfig.port = m_settings->value("RabbitMQConfig/port").toInt(); m_config.rabbitMQConfig.username = m_settings->value("RabbitMQConfig/username").toString(); m_config.rabbitMQConfig.password = m_settings->value("RabbitMQConfig/password").toString(); m_config.rabbitMQConfig.virtualHost = m_settings->value("RabbitMQConfig/virtualHost").toString(); m_config.rabbitMQConfig.exchangeName = m_settings->value("RabbitMQConfig/exchangeName").toString(); m_config.rabbitMQConfig.queueName = m_settings->value("RabbitMQConfig/queueName").toString(); m_config.rabbitMQConfig.routingKey = m_settings->value("RabbitMQConfig/routingKey").toString(); m_config.rabbitMQConfig.reconnectInterval = m_settings->value("RabbitMQConfig/reconnectInterval").toInt(); m_config.rabbitMQConfig.heartbeat = m_settings->value("RabbitMQConfig/heartbeat").toInt(); m_config.rabbitMQConfig.autoAck = m_settings->value("RabbitMQConfig/autoAck").toBool(); //HistoricalDataConfig m_config.historicalConfig.baseUrl = m_settings->value("HistoricalDataConfig/baseUrl").toString(); m_config.historicalConfig.timeout = m_settings->value("HistoricalDataConfig/timeout").toInt(); m_config.historicalConfig.maxRetries = m_settings->value("HistoricalDataConfig/maxRetries").toInt(); m_config.historicalConfig.retryInterval = m_settings->value("HistoricalDataConfig/retryInterval").toInt(); m_serviceStatus = ServiceStatus::Disconnected; } else m_isVaildSettingsFile = false; } void AlarmEventDataService::start() { if(m_serviceStatus != ServiceStatus::Disconnected) return; m_serviceStatus = ServiceStatus::Connecting; //启动实时数据服务 startRealTimeDataService(); } void AlarmEventDataService::stop() { cancelReconnect(); cleanupRabbitMQConnection(); m_serviceStatus = ServiceStatus::Disconnected; } void AlarmEventDataService::queryHistoricalEvents(const QDateTime& startTime, const QDateTime& endTime) { QUrl url = bulidHistoricalQueryUrl(startTime, endTime); //QUrl url = m_config.historicalConfig.baseUrl; if(!url.isValid()) return; //创建网络请求 QNetworkRequest request(url); request.setTransferTimeout(m_config.historicalConfig.timeout); //发送请求 QNetworkReply* reply = m_networkManager->get(request); connect(reply, &QNetworkReply::finished, this, [this, reply](){ onHistoricalRequestFinished(reply); }); } void AlarmEventDataService::startRealTimeDataService() { cleanupRabbitMQConnection(); m_amqpClient = new QAmqpClient(this); /*connect(m_amqpClient, &QAmqpClient::connected, this, &AlarmEventDataService::onRabbitMQConnected); connect(m_amqpClient, &QAmqpClient::disconnected, this, &AlarmEventDataService::onRabbitMQDisconnected); connect(m_amqpClient, QOverload::of(&QAmqpClient::error), this, &AlarmEventDataService::onRabbitMQError);*/ //采用QT4的连接方式,新的连接方式无法识别 connect(m_amqpClient, SIGNAL(connected()), this, SLOT(onRabbitMQConnected())); connect(m_amqpClient, SIGNAL(disconnected()), this, SLOT(onRabbitMQDisconnected())); connect(m_amqpClient, SIGNAL(error(QAMQP::Error)), this, SLOT(onRabbitMQError(QAMQP::Error))); connect(m_amqpClient, SIGNAL(socketErrorOccurred(QAbstractSocket::SocketError)), this, SLOT(onRabbitMQSocketError(QAbstractSocket::SocketError))); m_amqpClient->setHost(m_config.rabbitMQConfig.host); m_amqpClient->setPort(m_config.rabbitMQConfig.port); m_amqpClient->setUsername(m_config.rabbitMQConfig.username); m_amqpClient->setPassword(m_config.rabbitMQConfig.password); m_amqpClient->setVirtualHost(m_config.rabbitMQConfig.virtualHost); m_amqpClient->connectToHost(); } void AlarmEventDataService::cleanupRabbitMQConnection() { if(m_amqpQueue) { m_amqpQueue->deleteLater(); m_amqpQueue = nullptr; } if(m_amqpExchange) { m_amqpExchange->deleteLater(); m_amqpExchange = nullptr; } if(m_amqpClient) { m_amqpClient->disconnectFromHost(); m_amqpClient->deleteLater(); m_amqpClient = nullptr; } } void AlarmEventDataService::scheduleReconnect() { if (m_reconnectAttempts < m_maxReconnectAttempts) { int delay = m_config.rabbitMQConfig.reconnectInterval * (1 /*<< m_reconnectAttempts*/); // 指数退避,<start(delay); m_reconnectAttempts++; } else qInfo() << "Maximum reconnection attempts reached"; } void AlarmEventDataService::cancelReconnect() { if(m_reconnectTimer->isActive()) m_reconnectTimer->stop(); m_reconnectAttempts = 0; } MessageHandleResult AlarmEventDataService::processMessage(const QAmqpMessage& message) { QString errorString; EventData event = parseEventFromMessage(message.payload(), errorString); if(!errorString.isEmpty()) return MessageHandleResult::ParseError; if(!validateEvent(event)) return MessageHandleResult::ValidationError; emit realTimeEventReceived(event); return MessageHandleResult::Success; } EventData AlarmEventDataService::parseEventFromMessage(const QByteArray& data, QString& errorString) { QJsonParseError parseError; QJsonDocument doc = QJsonDocument::fromJson(data, &parseError); if(parseError.error != QJsonParseError::NoError) { errorString = QString("JSON parse error: %1 at offset %2") .arg(parseError.errorString()) .arg(parseError.offset); return EventData(); } if(!doc.isObject()) { errorString = "JSON document is not an object"; return EventData(); } QJsonObject obj = doc.object(); EventData event = EventData::fromJson(obj); return event; } bool AlarmEventDataService::validateEvent(const EventData& event) { if(event.id.isEmpty()) return false; QDateTime eventTime = QDateTime::fromMSecsSinceEpoch(event.timestamp); if(!eventTime.isValid()) return false; return true; } QUrl AlarmEventDataService::bulidHistoricalQueryUrl(const QDateTime& startTime, const QDateTime& endTime) { QUrl url = m_config.historicalConfig.baseUrl; QUrlQuery query; query.addQueryItem("begin", QString::number(startTime.toMSecsSinceEpoch())); query.addQueryItem("end", QString::number(endTime.toMSecsSinceEpoch())); url.setQuery(query); return url; } void AlarmEventDataService::processHistoricalResponse(const QByteArray& data) { QJsonParseError parseError; QJsonDocument doc = QJsonDocument::fromJson(data, &parseError); if(parseError.error != QJsonParseError::NoError) { QString errorMsg = QString("Historical Events JSON parse error: %1 at offset %2") .arg(parseError.errorString()) .arg(parseError.offset); emit historicalQueryError(errorMsg); } if(!doc.isObject()) { QString errorMsg = "Historical Events JSON document is not an object"; emit historicalQueryError(errorMsg); return ; } QJsonObject obj = doc.object(); QJsonValue dataValue = obj.value("data"); if(dataValue.isArray()) { QList historicalEvents; QJsonArray eventArray = dataValue.toArray(); int size = eventArray.size(); for(int i = 0; i < size; ++i) { QJsonValue eventValue = eventArray.at(i); if(!eventValue.isObject()) continue; QJsonObject eventObj = eventValue.toObject(); EventData event = EventData::fromJson(eventObj); if(validateEvent(event)) historicalEvents.append(event); } emit historicalQueryData(historicalEvents); } } // ==================== 槽函数 ==================== void AlarmEventDataService::onRabbitMQConnected() { qInfo() << "RabbitMQ connectecd"; m_serviceStatus = ServiceStatus::Connected; //Exchange m_amqpExchange = m_amqpClient->createExchange(m_config.rabbitMQConfig.exchangeName); m_amqpExchange->declare(QAmqpExchange::FanOut, QAmqpExchange::Durable); //Queue m_amqpQueue = m_amqpClient->createQueue(m_config.rabbitMQConfig.queueName); connect(m_amqpQueue, SIGNAL(opened()), this, SLOT(onAmqpQueueOpend())); } void AlarmEventDataService::onRabbitMQDisconnected() { qWarning() << "Disconnected to RabbitMQ"; m_serviceStatus = ServiceStatus::Disconnected; scheduleReconnect(); } void AlarmEventDataService::onRabbitMQError(QAMQP::Error error) { qWarning() << m_amqpClient->errorString(); m_serviceStatus = ServiceStatus::Error; scheduleReconnect(); } void AlarmEventDataService::onRabbitMQSocketError(QAbstractSocket::SocketError error) { qWarning() << "RabbitMQ connection socket error: " << error; m_serviceStatus = ServiceStatus::Error; scheduleReconnect(); } void AlarmEventDataService::onAmqpQueueOpend() { qInfo() << "AmqpQueue opend"; m_amqpQueue->declare(QAmqpQueue::AutoDelete); m_amqpQueue->bind(m_amqpExchange, m_config.rabbitMQConfig.routingKey); //对于广播类型的交换机,消息会发送给所有与之绑定的队列,所以此时routingKey无所谓 connect(m_amqpQueue, SIGNAL(messageReceived()), this, SLOT(onMessageReceived())); if(m_config.rabbitMQConfig.autoAck) m_amqpQueue->consume(QAmqpQueue::coNoAck); else m_amqpQueue->consume(0); //0表示不适用任何特殊选项,即手动确认 } void AlarmEventDataService::onMessageReceived() { QAmqpMessage message = m_amqpQueue->dequeue(); //qDebug() << message.payload(); MessageHandleResult result = processMessage(message); //手动确认消息 if(!m_config.rabbitMQConfig.autoAck && result == MessageHandleResult::Success) m_amqpQueue->ack(message); } void AlarmEventDataService::onReconnectTimeout() { if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Reconnecting) return; m_serviceStatus = ServiceStatus::Reconnecting; startRealTimeDataService(); } void AlarmEventDataService::onHistoricalRequestFinished(QNetworkReply* reply) { if(reply->error() == QNetworkReply::NoError) { QByteArray data = reply->readAll(); processHistoricalResponse(data); } else { QString errorMsg = QString("Request HistoricalEvents error: %1").arg(reply->errorString()); emit historicalQueryError(errorMsg); } reply->deleteLater(); }