147 lines
4.3 KiB
Go
147 lines
4.3 KiB
Go
// Package mq provides read or write access to message queue services
|
||
package mq
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"time"
|
||
|
||
"modelRT/constants"
|
||
"modelRT/logger"
|
||
"modelRT/mq/event"
|
||
|
||
amqp "github.com/rabbitmq/amqp091-go"
|
||
)
|
||
|
||
// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ
|
||
var MsgChan chan *event.EventRecord
|
||
|
||
func init() {
|
||
MsgChan = make(chan *event.EventRecord, 10000)
|
||
}
|
||
|
||
func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
|
||
var channel *amqp.Channel
|
||
var err error
|
||
|
||
channel, err = GetConn().Channel()
|
||
if err != nil {
|
||
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil)
|
||
if err != nil {
|
||
logger.Error(ctx, "declare event dead letter exchange failed", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
_, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil)
|
||
if err != nil {
|
||
logger.Error(ctx, "declare event dead letter queue failed", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
err = channel.QueueBind(constants.EventUpDownDeadQueueName, "#", constants.EventDeadExchangeName, false, nil)
|
||
if err != nil {
|
||
logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil)
|
||
if err != nil {
|
||
logger.Error(ctx, "declare event exchange failed", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
args := amqp.Table{
|
||
"x-max-length": int32(50),
|
||
"x-dead-letter-exchange": constants.EventDeadExchangeName,
|
||
"x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey,
|
||
}
|
||
_, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args)
|
||
if err != nil {
|
||
logger.Error(ctx, "declare event queue failed", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil)
|
||
if err != nil {
|
||
logger.Error(ctx, "bind event queue with routing key and exchange failed", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
if err := channel.Confirm(false); err != nil {
|
||
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
|
||
return nil, err
|
||
}
|
||
return channel, nil
|
||
}
|
||
|
||
// PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ
|
||
func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.EventRecord) {
|
||
channel, err := initUpDownLimitEventChannel(ctx)
|
||
if err != nil {
|
||
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
|
||
return
|
||
}
|
||
// TODO 使用配置修改确认模式通道参数
|
||
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100))
|
||
|
||
go func() {
|
||
for {
|
||
select {
|
||
case confirm, ok := <-confirms:
|
||
if !ok {
|
||
return
|
||
}
|
||
if !confirm.Ack {
|
||
logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag)
|
||
}
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel")
|
||
channel.Close()
|
||
return
|
||
case eventRecord, ok := <-msgChan:
|
||
if !ok {
|
||
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop")
|
||
channel.Close()
|
||
return
|
||
}
|
||
|
||
// TODO 将消息的序列化移动到发送之前,以便使用eventRecord的category来作为routing key
|
||
recordBytes, err := json.Marshal(eventRecord)
|
||
if err != nil {
|
||
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
|
||
continue
|
||
}
|
||
|
||
// send event alarm message to rabbitMQ queue
|
||
routingKey := eventRecord.Category
|
||
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||
err = channel.PublishWithContext(pubCtx,
|
||
constants.EventExchangeName, // exchange
|
||
routingKey, // routing key
|
||
false, // mandatory
|
||
false, // immediate
|
||
amqp.Publishing{
|
||
ContentType: "text/plain",
|
||
Body: recordBytes,
|
||
})
|
||
cancel()
|
||
|
||
if err != nil {
|
||
logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", recordBytes, "error", err)
|
||
}
|
||
}
|
||
}
|
||
}
|