130 lines
3.6 KiB
Go
130 lines
3.6 KiB
Go
// Package mq provides read or write access to message queue services
|
|
package mq
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"modelRT/logger"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
const (
|
|
routingKey = "event-alarm-routing-key"
|
|
exchangeName = "event-alarm-exchange"
|
|
queueName = "event-alarm-queue"
|
|
deadRoutingKey = "event-alarm-dead-letter-routing-key"
|
|
deadExchangeName = "event-alarm-dead-letter-exchange"
|
|
deadQueueName = "event-alarm-dead-letter-queue"
|
|
)
|
|
|
|
func initChannel(ctx context.Context) (*amqp.Channel, error) {
|
|
var channel *amqp.Channel
|
|
var err error
|
|
|
|
channel, err = GetConn().Channel()
|
|
if err != nil {
|
|
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
|
|
}
|
|
|
|
err = channel.ExchangeDeclare(deadExchangeName, "topic", true, false, false, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare event dead letter exchange failed", "error", err)
|
|
}
|
|
|
|
_, err = channel.QueueDeclare(deadQueueName, true, false, false, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare event dead letter queue failed", "error", err)
|
|
}
|
|
|
|
err = channel.QueueBind(deadQueueName, deadRoutingKey, deadExchangeName, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err)
|
|
}
|
|
|
|
err = channel.ExchangeDeclare(exchangeName, "topic", true, false, false, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare event exchange failed", "error", err)
|
|
}
|
|
|
|
args := amqp.Table{
|
|
// messages that accumulate to the maximum number will be automatically transferred to the dead letter queue
|
|
"x-max-length": int32(50),
|
|
"x-dead-letter-exchange": deadExchangeName,
|
|
"x-dead-letter-routing-key": deadRoutingKey,
|
|
}
|
|
_, err = channel.QueueDeclare(queueName, true, false, false, false, args)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare event queue failed", "error", err)
|
|
}
|
|
|
|
err = channel.QueueBind(queueName, routingKey, exchangeName, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "bind event queue with routing key and exchange failed:", "error", err)
|
|
}
|
|
|
|
if err := channel.Confirm(false); err != nil {
|
|
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
|
|
}
|
|
return channel, nil
|
|
}
|
|
|
|
func pushEventToRabbitMQ(ctx context.Context, msgChan chan string) {
|
|
channel, err := initChannel(ctx)
|
|
if err != nil {
|
|
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
|
|
return
|
|
}
|
|
// TODO 使用配置修改确认模式通道参数
|
|
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100))
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case confirm, ok := <-confirms:
|
|
if !ok {
|
|
return
|
|
}
|
|
if !confirm.Ack {
|
|
logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel")
|
|
channel.Close()
|
|
return
|
|
case msg, ok := <-msgChan:
|
|
if !ok {
|
|
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop")
|
|
channel.Close()
|
|
return
|
|
}
|
|
|
|
// send event alarm message to rabbitMQ queue
|
|
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
err = channel.PublishWithContext(pubCtx,
|
|
exchangeName, // exchange
|
|
routingKey, // routing key
|
|
false, // mandatory
|
|
false, // immediate
|
|
amqp.Publishing{
|
|
ContentType: "text/plain",
|
|
Body: []byte(msg),
|
|
})
|
|
cancel()
|
|
|
|
if err != nil {
|
|
logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", msg, "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|