From fdc164ed5ed38f36cdfd12faac8b2f49c36e59e7 Mon Sep 17 00:00:00 2001 From: duanshengchao <519970194@qq.com> Date: Thu, 13 Nov 2025 18:14:24 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=E4=BF=AE=E6=94=B9=E9=93=BE=E6=8E=A5Ra?= =?UTF-8?q?bbitMQ=E7=9A=84=E7=9B=B8=E5=85=B3=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E5=AF=B9=E5=AE=9E=E9=99=85=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=E7=9A=84=E6=95=B0=E6=8D=AE=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- alarmEventService_config.ini | 14 ++++----- include/alarmEventDataService.h | 2 ++ source/alarmEventDataService.cpp | 52 +++++++++++++++++++++++--------- source/alarmEventDataView.cpp | 10 +++--- 4 files changed, 52 insertions(+), 26 deletions(-) diff --git a/alarmEventService_config.ini b/alarmEventService_config.ini index bc80b69..cb67fd4 100644 --- a/alarmEventService_config.ini +++ b/alarmEventService_config.ini @@ -1,13 +1,13 @@ [RabbitMQConfig] -host=127.0.0.1 +host=192.168.46.30 port=5672 -username=alarm_service -password=123456 -virtualHost=/alarm +username=rabbitmq +password=password +virtualHost=/ exchangeName=event_notify_fanout -queueName=event_nofify_queue -routingKey=key -reconnectInterval=3000 +queueName= +routingKey=event_notify_key +reconnectInterval=2000 heartbeat=60 autoAck=1 diff --git a/include/alarmEventDataService.h b/include/alarmEventDataService.h index 3119c6e..98c3b11 100644 --- a/include/alarmEventDataService.h +++ b/include/alarmEventDataService.h @@ -36,6 +36,8 @@ private slots: void onRabbitMQConnected(); void onRabbitMQDisconnected(); void onRabbitMQError(QAMQP::Error error); + void onRabbitMQSocketErrot(QAbstractSocket::SocketError error); + void onAmqpQueueOpend(); void onMessageReceived(); void onReconnectTimeout(); void onHistoricalRequestFinished(QNetworkReply* reply); diff --git a/source/alarmEventDataService.cpp b/source/alarmEventDataService.cpp index 03de838..7b0af69 100644 --- a/source/alarmEventDataService.cpp +++ b/source/alarmEventDataService.cpp @@ -38,7 +38,9 @@ AlarmEventDataService::AlarmEventDataService() } AlarmEventDataService::~AlarmEventDataService() -{} +{ + stop(); +} void AlarmEventDataService::initialize() { @@ -114,15 +116,21 @@ void AlarmEventDataService::startRealTimeDataService() cleanupRabbitMQConnection(); m_amqpClient = new QAmqpClient(this); + /*connect(m_amqpClient, &QAmqpClient::connected, this, &AlarmEventDataService::onRabbitMQConnected); + connect(m_amqpClient, &QAmqpClient::disconnected, this, &AlarmEventDataService::onRabbitMQDisconnected); + connect(m_amqpClient, QOverload::of(&QAmqpClient::error), this, &AlarmEventDataService::onRabbitMQError);*/ + //采用QT4的连接方式,新的连接方式无法识别 + connect(m_amqpClient, SIGNAL(connected()), this, SLOT(onRabbitMQConnected())); + connect(m_amqpClient, SIGNAL(disconnected()), this, SLOT(onRabbitMQDisconnected())); + connect(m_amqpClient, SIGNAL(error(QAMQP::Error)), this, SLOT(onRabbitMQError(QAMQP::Error))); + connect(m_amqpClient, SIGNAL(socketErrorOccurred(QAbstractSocket::SocketError)), this, SLOT(onRabbitMQSocketErrot(QAbstractSocket::SocketError))); + m_amqpClient->setHost(m_config.rabbitMQConfig.host); m_amqpClient->setPort(m_config.rabbitMQConfig.port); m_amqpClient->setUsername(m_config.rabbitMQConfig.username); m_amqpClient->setPassword(m_config.rabbitMQConfig.password); m_amqpClient->setVirtualHost(m_config.rabbitMQConfig.virtualHost); - - connect(m_amqpClient, &QAmqpClient::connected, this, &AlarmEventDataService::onRabbitMQConnected); - connect(m_amqpClient, &QAmqpClient::disconnected, this, &AlarmEventDataService::onRabbitMQDisconnected); - connect(m_amqpClient, QOverload::of(&QAmqpClient::error), this, &AlarmEventDataService::onRabbitMQError); + m_amqpClient->connectToHost(); } void AlarmEventDataService::cleanupRabbitMQConnection() @@ -151,7 +159,7 @@ void AlarmEventDataService::scheduleReconnect() { if (m_reconnectAttempts < m_maxReconnectAttempts) { - int delay = m_config.rabbitMQConfig.reconnectInterval * (1 << m_reconnectAttempts); // 指数退避,<start(delay); m_reconnectAttempts++; } @@ -161,7 +169,8 @@ void AlarmEventDataService::scheduleReconnect() void AlarmEventDataService::cancelReconnect() { - m_reconnectTimer->stop(); + if(m_reconnectTimer->isActive()) + m_reconnectTimer->stop(); m_reconnectAttempts = 0; } @@ -270,19 +279,14 @@ void AlarmEventDataService::processHistoricalResponse(const QByteArray& data) // ==================== 槽函数 ==================== void AlarmEventDataService::onRabbitMQConnected() { + qInfo() << "RabbitMQ connectecd"; m_serviceStatus = ServiceStatus::Connected; //Exchange m_amqpExchange = m_amqpClient->createExchange(m_config.rabbitMQConfig.exchangeName); m_amqpExchange->declare(QAmqpExchange::FanOut, QAmqpExchange::Durable); //Queue m_amqpQueue = m_amqpClient->createQueue(m_config.rabbitMQConfig.queueName); - m_amqpQueue->declare(QAmqpQueue::AutoDelete); - m_amqpQueue->bind(m_amqpExchange, m_config.rabbitMQConfig.routingKey); //对于广播类型的交换机,消息会发送给所有与之绑定的队列,所以此时routingKey无所谓 - connect(m_amqpQueue, &QAmqpQueue::messageReceived, this, &AlarmEventDataService::onMessageReceived); - if(m_config.rabbitMQConfig.autoAck) - m_amqpQueue->consume(QAmqpQueue::coNoAck); - else - m_amqpQueue->consume(0); //0表示不适用任何特殊选项,即手动确认 + connect(m_amqpQueue, SIGNAL(opened()), this, SLOT(onAmqpQueueOpend())); } void AlarmEventDataService::onRabbitMQDisconnected() @@ -299,9 +303,29 @@ void AlarmEventDataService::onRabbitMQError(QAMQP::Error error) scheduleReconnect(); } +void AlarmEventDataService::onRabbitMQSocketErrot(QAbstractSocket::SocketError error) +{ + qWarning() << "RabbitMQ connection socket error: " << error; + m_serviceStatus = ServiceStatus::Error; + scheduleReconnect(); +} + +void AlarmEventDataService::onAmqpQueueOpend() +{ + qInfo() << "AmqpQueue opend"; + m_amqpQueue->declare(QAmqpQueue::AutoDelete); + m_amqpQueue->bind(m_amqpExchange, m_config.rabbitMQConfig.routingKey); //对于广播类型的交换机,消息会发送给所有与之绑定的队列,所以此时routingKey无所谓 + connect(m_amqpQueue, SIGNAL(messageReceived()), this, SLOT(onMessageReceived())); + if(m_config.rabbitMQConfig.autoAck) + m_amqpQueue->consume(QAmqpQueue::coNoAck); + else + m_amqpQueue->consume(0); //0表示不适用任何特殊选项,即手动确认 +} + void AlarmEventDataService::onMessageReceived() { QAmqpMessage message = m_amqpQueue->dequeue(); + //qDebug() << message.payload(); MessageHandleResult result = processMessage(message); //手动确认消息 diff --git a/source/alarmEventDataView.cpp b/source/alarmEventDataView.cpp index 3bc1440..48780f9 100644 --- a/source/alarmEventDataView.cpp +++ b/source/alarmEventDataView.cpp @@ -32,10 +32,13 @@ AlarmEventDataModel::AlarmEventDataModel(AlarmDataMode mode, QObject* parent) connect(AlarmEventDataService::instance(), &AlarmEventDataService::realTimeEventReceived, this, &AlarmEventDataModel::onRealTimeEventReceived); connect(AlarmEventDataService::instance(), &AlarmEventDataService::historicalQueryData, this, &AlarmEventDataModel::onHistoricalEventsReceived); + AlarmEventDataService::instance()->start(); + + //实时数据测试 - m_simulatedDataTimer = new QTimer(this); + /*m_simulatedDataTimer = new QTimer(this); connect(m_simulatedDataTimer, &QTimer::timeout, this, &AlarmEventDataModel::onTimeoutSimulateData); - m_simulatedDataTimer->start(3000); + m_simulatedDataTimer->start(3000);*/ } AlarmEventDataModel::~AlarmEventDataModel() @@ -339,13 +342,10 @@ void AlarmEventDataModel::onRealTimeEventReceived(const EventData& event) int insertPosition = 0; for(; insertPosition < m_displayEvents.size(); ++insertPosition) { - qDebug() << "event.timestamp: " << event.timestamp << " m_displayEvents.at(n): " << m_displayEvents.at(insertPosition).timestamp; if(event.timestamp > m_displayEvents.at(insertPosition).timestamp) break; } - qDebug() << insertPosition; - if(insertPosition < m_displayEvents.size()) m_displayEvents.insert(insertPosition, event); else