refactor:修改链接RabbitMQ的相关逻辑,完成对实际服务器的数据测试

This commit is contained in:
duanshengchao 2025-11-13 18:14:24 +08:00
parent 728e7c7383
commit fdc164ed5e
4 changed files with 52 additions and 26 deletions

View File

@ -1,13 +1,13 @@
[RabbitMQConfig] [RabbitMQConfig]
host=127.0.0.1 host=192.168.46.30
port=5672 port=5672
username=alarm_service username=rabbitmq
password=123456 password=password
virtualHost=/alarm virtualHost=/
exchangeName=event_notify_fanout exchangeName=event_notify_fanout
queueName=event_nofify_queue queueName=
routingKey=key routingKey=event_notify_key
reconnectInterval=3000 reconnectInterval=2000
heartbeat=60 heartbeat=60
autoAck=1 autoAck=1

View File

@ -36,6 +36,8 @@ private slots:
void onRabbitMQConnected(); void onRabbitMQConnected();
void onRabbitMQDisconnected(); void onRabbitMQDisconnected();
void onRabbitMQError(QAMQP::Error error); void onRabbitMQError(QAMQP::Error error);
void onRabbitMQSocketErrot(QAbstractSocket::SocketError error);
void onAmqpQueueOpend();
void onMessageReceived(); void onMessageReceived();
void onReconnectTimeout(); void onReconnectTimeout();
void onHistoricalRequestFinished(QNetworkReply* reply); void onHistoricalRequestFinished(QNetworkReply* reply);

View File

@ -38,7 +38,9 @@ AlarmEventDataService::AlarmEventDataService()
} }
AlarmEventDataService::~AlarmEventDataService() AlarmEventDataService::~AlarmEventDataService()
{} {
stop();
}
void AlarmEventDataService::initialize() void AlarmEventDataService::initialize()
{ {
@ -114,15 +116,21 @@ void AlarmEventDataService::startRealTimeDataService()
cleanupRabbitMQConnection(); cleanupRabbitMQConnection();
m_amqpClient = new QAmqpClient(this); m_amqpClient = new QAmqpClient(this);
/*connect(m_amqpClient, &QAmqpClient::connected, this, &AlarmEventDataService::onRabbitMQConnected);
connect(m_amqpClient, &QAmqpClient::disconnected, this, &AlarmEventDataService::onRabbitMQDisconnected);
connect(m_amqpClient, QOverload<QAMQP::Error>::of(&QAmqpClient::error), this, &AlarmEventDataService::onRabbitMQError);*/
//采用QT4的连接方式新的连接方式无法识别
connect(m_amqpClient, SIGNAL(connected()), this, SLOT(onRabbitMQConnected()));
connect(m_amqpClient, SIGNAL(disconnected()), this, SLOT(onRabbitMQDisconnected()));
connect(m_amqpClient, SIGNAL(error(QAMQP::Error)), this, SLOT(onRabbitMQError(QAMQP::Error)));
connect(m_amqpClient, SIGNAL(socketErrorOccurred(QAbstractSocket::SocketError)), this, SLOT(onRabbitMQSocketErrot(QAbstractSocket::SocketError)));
m_amqpClient->setHost(m_config.rabbitMQConfig.host); m_amqpClient->setHost(m_config.rabbitMQConfig.host);
m_amqpClient->setPort(m_config.rabbitMQConfig.port); m_amqpClient->setPort(m_config.rabbitMQConfig.port);
m_amqpClient->setUsername(m_config.rabbitMQConfig.username); m_amqpClient->setUsername(m_config.rabbitMQConfig.username);
m_amqpClient->setPassword(m_config.rabbitMQConfig.password); m_amqpClient->setPassword(m_config.rabbitMQConfig.password);
m_amqpClient->setVirtualHost(m_config.rabbitMQConfig.virtualHost); m_amqpClient->setVirtualHost(m_config.rabbitMQConfig.virtualHost);
m_amqpClient->connectToHost();
connect(m_amqpClient, &QAmqpClient::connected, this, &AlarmEventDataService::onRabbitMQConnected);
connect(m_amqpClient, &QAmqpClient::disconnected, this, &AlarmEventDataService::onRabbitMQDisconnected);
connect(m_amqpClient, QOverload<QAMQP::Error>::of(&QAmqpClient::error), this, &AlarmEventDataService::onRabbitMQError);
} }
void AlarmEventDataService::cleanupRabbitMQConnection() void AlarmEventDataService::cleanupRabbitMQConnection()
@ -151,7 +159,7 @@ void AlarmEventDataService::scheduleReconnect()
{ {
if (m_reconnectAttempts < m_maxReconnectAttempts) if (m_reconnectAttempts < m_maxReconnectAttempts)
{ {
int delay = m_config.rabbitMQConfig.reconnectInterval * (1 << m_reconnectAttempts); // 指数退避,<<n相当于乘以2的n次方 int delay = m_config.rabbitMQConfig.reconnectInterval * (1 /*<< m_reconnectAttempts*/); // 指数退避,<<n相当于乘以2的n次方
m_reconnectTimer->start(delay); m_reconnectTimer->start(delay);
m_reconnectAttempts++; m_reconnectAttempts++;
} }
@ -161,7 +169,8 @@ void AlarmEventDataService::scheduleReconnect()
void AlarmEventDataService::cancelReconnect() void AlarmEventDataService::cancelReconnect()
{ {
m_reconnectTimer->stop(); if(m_reconnectTimer->isActive())
m_reconnectTimer->stop();
m_reconnectAttempts = 0; m_reconnectAttempts = 0;
} }
@ -270,19 +279,14 @@ void AlarmEventDataService::processHistoricalResponse(const QByteArray& data)
// ==================== 槽函数 ==================== // ==================== 槽函数 ====================
void AlarmEventDataService::onRabbitMQConnected() void AlarmEventDataService::onRabbitMQConnected()
{ {
qInfo() << "RabbitMQ connectecd";
m_serviceStatus = ServiceStatus::Connected; m_serviceStatus = ServiceStatus::Connected;
//Exchange //Exchange
m_amqpExchange = m_amqpClient->createExchange(m_config.rabbitMQConfig.exchangeName); m_amqpExchange = m_amqpClient->createExchange(m_config.rabbitMQConfig.exchangeName);
m_amqpExchange->declare(QAmqpExchange::FanOut, QAmqpExchange::Durable); m_amqpExchange->declare(QAmqpExchange::FanOut, QAmqpExchange::Durable);
//Queue //Queue
m_amqpQueue = m_amqpClient->createQueue(m_config.rabbitMQConfig.queueName); m_amqpQueue = m_amqpClient->createQueue(m_config.rabbitMQConfig.queueName);
m_amqpQueue->declare(QAmqpQueue::AutoDelete); connect(m_amqpQueue, SIGNAL(opened()), this, SLOT(onAmqpQueueOpend()));
m_amqpQueue->bind(m_amqpExchange, m_config.rabbitMQConfig.routingKey); //对于广播类型的交换机消息会发送给所有与之绑定的队列所以此时routingKey无所谓
connect(m_amqpQueue, &QAmqpQueue::messageReceived, this, &AlarmEventDataService::onMessageReceived);
if(m_config.rabbitMQConfig.autoAck)
m_amqpQueue->consume(QAmqpQueue::coNoAck);
else
m_amqpQueue->consume(0); //0表示不适用任何特殊选项即手动确认
} }
void AlarmEventDataService::onRabbitMQDisconnected() void AlarmEventDataService::onRabbitMQDisconnected()
@ -299,9 +303,29 @@ void AlarmEventDataService::onRabbitMQError(QAMQP::Error error)
scheduleReconnect(); scheduleReconnect();
} }
void AlarmEventDataService::onRabbitMQSocketErrot(QAbstractSocket::SocketError error)
{
qWarning() << "RabbitMQ connection socket error: " << error;
m_serviceStatus = ServiceStatus::Error;
scheduleReconnect();
}
void AlarmEventDataService::onAmqpQueueOpend()
{
qInfo() << "AmqpQueue opend";
m_amqpQueue->declare(QAmqpQueue::AutoDelete);
m_amqpQueue->bind(m_amqpExchange, m_config.rabbitMQConfig.routingKey); //对于广播类型的交换机消息会发送给所有与之绑定的队列所以此时routingKey无所谓
connect(m_amqpQueue, SIGNAL(messageReceived()), this, SLOT(onMessageReceived()));
if(m_config.rabbitMQConfig.autoAck)
m_amqpQueue->consume(QAmqpQueue::coNoAck);
else
m_amqpQueue->consume(0); //0表示不适用任何特殊选项即手动确认
}
void AlarmEventDataService::onMessageReceived() void AlarmEventDataService::onMessageReceived()
{ {
QAmqpMessage message = m_amqpQueue->dequeue(); QAmqpMessage message = m_amqpQueue->dequeue();
//qDebug() << message.payload();
MessageHandleResult result = processMessage(message); MessageHandleResult result = processMessage(message);
//手动确认消息 //手动确认消息

View File

@ -32,10 +32,13 @@ AlarmEventDataModel::AlarmEventDataModel(AlarmDataMode mode, QObject* parent)
connect(AlarmEventDataService::instance(), &AlarmEventDataService::realTimeEventReceived, this, &AlarmEventDataModel::onRealTimeEventReceived); connect(AlarmEventDataService::instance(), &AlarmEventDataService::realTimeEventReceived, this, &AlarmEventDataModel::onRealTimeEventReceived);
connect(AlarmEventDataService::instance(), &AlarmEventDataService::historicalQueryData, this, &AlarmEventDataModel::onHistoricalEventsReceived); connect(AlarmEventDataService::instance(), &AlarmEventDataService::historicalQueryData, this, &AlarmEventDataModel::onHistoricalEventsReceived);
AlarmEventDataService::instance()->start();
//实时数据测试 //实时数据测试
m_simulatedDataTimer = new QTimer(this); /*m_simulatedDataTimer = new QTimer(this);
connect(m_simulatedDataTimer, &QTimer::timeout, this, &AlarmEventDataModel::onTimeoutSimulateData); connect(m_simulatedDataTimer, &QTimer::timeout, this, &AlarmEventDataModel::onTimeoutSimulateData);
m_simulatedDataTimer->start(3000); m_simulatedDataTimer->start(3000);*/
} }
AlarmEventDataModel::~AlarmEventDataModel() AlarmEventDataModel::~AlarmEventDataModel()
@ -339,13 +342,10 @@ void AlarmEventDataModel::onRealTimeEventReceived(const EventData& event)
int insertPosition = 0; int insertPosition = 0;
for(; insertPosition < m_displayEvents.size(); ++insertPosition) for(; insertPosition < m_displayEvents.size(); ++insertPosition)
{ {
qDebug() << "event.timestamp: " << event.timestamp << " m_displayEvents.at(n): " << m_displayEvents.at(insertPosition).timestamp;
if(event.timestamp > m_displayEvents.at(insertPosition).timestamp) if(event.timestamp > m_displayEvents.at(insertPosition).timestamp)
break; break;
} }
qDebug() << insertPosition;
if(insertPosition < m_displayEvents.size()) if(insertPosition < m_displayEvents.size())
m_displayEvents.insert(insertPosition, event); m_displayEvents.insert(insertPosition, event);
else else