From 0962645144f78b24faa3ae05f07499e2827fcc29 Mon Sep 17 00:00:00 2001 From: duanshengchao <519970194@qq.com> Date: Fri, 7 Nov 2025 17:33:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E5=AE=8C=E6=88=90=E5=AF=B9RabbitMQ?= =?UTF-8?q?=E7=9A=84=E8=87=AA=E5=8A=A8=E9=87=8D=E8=BF=9E=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/alarmEventDataService.h | 9 +++++- include/alarmEventUtils.h | 4 +-- source/alarmEventDataService.cpp | 55 +++++++++++++++++++++++++++++--- 3 files changed, 60 insertions(+), 8 deletions(-) diff --git a/include/alarmEventDataService.h b/include/alarmEventDataService.h index 550576d..b07d58b 100644 --- a/include/alarmEventDataService.h +++ b/include/alarmEventDataService.h @@ -10,6 +10,7 @@ class QAmqpClient; class QAmqpQueue; class QAmqpExchange; +class QTimer; class AlarmEventDataService : public QObject { @@ -29,6 +30,7 @@ private slots: void onRabbitMQDisconnected(); void onRabbitMQError(QAMQP::Error error); void onMessageReceived(); + void onReconnectTimeout(); private: explicit AlarmEventDataService(); @@ -37,6 +39,8 @@ private: //连接管理 void startRealTimeDataService(); void cleanupRabbitMQConnection(); + void scheduleReconnect(); + void cancelReconnect(); //实时信息处理 MessageHandleResult processMessage(const QAmqpMessage& message); EventData parseEventFromMessage(const QByteArray& data, QString& errorString); @@ -52,7 +56,10 @@ private: QAmqpQueue* m_amqpQueue; QAmqpExchange* m_amqpExchange; - + //重连相关 + QTimer* m_reconnectTimer; + int m_reconnectAttempts; + int m_maxReconnectAttempts; }; #endif diff --git a/include/alarmEventUtils.h b/include/alarmEventUtils.h index dc41629..94b0ad1 100644 --- a/include/alarmEventUtils.h +++ b/include/alarmEventUtils.h @@ -22,7 +22,7 @@ struct RabbitMQConfig RabbitMQConfig() : port(5672) - , reconnectInterval(5000) + , reconnectInterval(3000) , heartbeat(60) , autoAck(true) {} @@ -166,7 +166,7 @@ struct ServiceConfig enum class ServiceStatus { Uninitialized,//未初始化 - Initialized, //已初始化 + //Initialized, //已初始化 Disconnected, //未连接 Connecting, //连接中 Connected, //已连接 diff --git a/source/alarmEventDataService.cpp b/source/alarmEventDataService.cpp index a32ea0e..35e1956 100644 --- a/source/alarmEventDataService.cpp +++ b/source/alarmEventDataService.cpp @@ -1,4 +1,5 @@ #include "alarmEventDataService.h" +#include #include #include #include @@ -17,7 +18,13 @@ AlarmEventDataService::AlarmEventDataService() , m_amqpClient(nullptr) , m_amqpQueue(nullptr) , m_amqpExchange(nullptr) -{} + , m_reconnectAttempts(0) + , m_maxReconnectAttempts(10) +{ + m_reconnectTimer = new QTimer(this); + m_reconnectTimer->setSingleShot(true); + connect(m_reconnectTimer, &QTimer::timeout, this, &AlarmEventDataService::onReconnectTimeout); +} AlarmEventDataService::~AlarmEventDataService() {} @@ -31,21 +38,27 @@ bool AlarmEventDataService::initialize(const ServiceConfig& config) return false; m_config = config; - + m_serviceStatus = ServiceStatus::Disconnected; return true; } void AlarmEventDataService::start() { - if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Connecting) + if(m_serviceStatus != ServiceStatus::Disconnected) return; m_serviceStatus = ServiceStatus::Connecting; + //启动实时数据服务 startRealTimeDataService(); } void AlarmEventDataService::stop() -{} +{ + cancelReconnect(); + cleanupRabbitMQConnection(); + + m_serviceStatus = ServiceStatus::Disconnected; +} void AlarmEventDataService::startRealTimeDataService() { @@ -85,6 +98,24 @@ void AlarmEventDataService::cleanupRabbitMQConnection() } } +void AlarmEventDataService::scheduleReconnect() +{ + if (m_reconnectAttempts < m_maxReconnectAttempts) + { + int delay = m_config.rabbitMQConfig.reconnectInterval * (1 << m_reconnectAttempts); // 指数退避,<start(delay); + m_reconnectAttempts++; + } + else + qInfo() << "Maximum reconnection attempts reached"; +} + +void AlarmEventDataService::cancelReconnect() +{ + m_reconnectTimer->stop(); + m_reconnectAttempts = 0; +} + MessageHandleResult AlarmEventDataService::processMessage(const QAmqpMessage& message) { QString errorString; @@ -154,12 +185,17 @@ void AlarmEventDataService::onRabbitMQConnected() } void AlarmEventDataService::onRabbitMQDisconnected() -{} +{ + qWarning() << "Disconnected to RabbitMQ"; + m_serviceStatus = ServiceStatus::Disconnected; + scheduleReconnect(); +} void AlarmEventDataService::onRabbitMQError(QAMQP::Error error) { qWarning() << m_amqpClient->errorString(); m_serviceStatus = ServiceStatus::Error; + scheduleReconnect(); } void AlarmEventDataService::onMessageReceived() @@ -171,3 +207,12 @@ void AlarmEventDataService::onMessageReceived() if(!m_config.rabbitMQConfig.autoAck && result == MessageHandleResult::Success) m_amqpQueue->ack(message); } + +void AlarmEventDataService::onReconnectTimeout() +{ + if(m_serviceStatus == ServiceStatus::Connected || m_serviceStatus == ServiceStatus::Reconnecting) + return; + + m_serviceStatus = ServiceStatus::Reconnecting; + startRealTimeDataService(); +}