150 lines
4.2 KiB
Go
150 lines
4.2 KiB
Go
// Package mq provides read or write access to message queue services
|
|
package mq
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"modelRT/constants"
|
|
"modelRT/logger"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
)
|
|
|
|
// MessageMsgChan buffers message records to be published to the message exchange asynchronously
|
|
var MessageMsgChan chan *EventMessage
|
|
|
|
func init() {
|
|
MessageMsgChan = make(chan *EventMessage, 10000)
|
|
}
|
|
|
|
func initMessageChannel(ctx context.Context) (*amqp.Channel, error) {
|
|
channel, err := GetConn().Channel()
|
|
if err != nil {
|
|
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
err = channel.ExchangeDeclare(constants.MessageDeadExchangeName, "topic", true, false, false, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare message dead letter exchange failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
_, err = channel.QueueDeclare(constants.MessageDeadQueueName, true, false, false, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare message dead letter queue failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
err = channel.QueueBind(constants.MessageDeadQueueName, constants.MessageDeadRoutingKey, constants.MessageDeadExchangeName, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "bind message dead letter queue failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
err = channel.ExchangeDeclare(constants.MessageExchangeName, "topic", true, false, false, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare message exchange failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
args := amqp.Table{
|
|
"x-dead-letter-exchange": constants.MessageDeadExchangeName,
|
|
"x-dead-letter-routing-key": constants.MessageDeadRoutingKey,
|
|
}
|
|
_, err = channel.QueueDeclare(constants.MessageQueueName, true, false, false, false, args)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare message queue failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
err = channel.QueueBind(constants.MessageQueueName, constants.MessageRoutingKey, constants.MessageExchangeName, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "bind message queue failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
if err := channel.Confirm(false); err != nil {
|
|
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
|
|
return nil, err
|
|
}
|
|
return channel, nil
|
|
}
|
|
|
|
// PushMessageToRabbitMQ publishes message records from msgChan to the message exchange.
|
|
// The category of each record is used as the routing key so consumers can bind selectively.
|
|
func PushMessageToRabbitMQ(ctx context.Context, msgChan chan *EventMessage) {
|
|
channel, err := initMessageChannel(ctx)
|
|
if err != nil {
|
|
logger.Error(ctx, "initializing message rabbitMQ channel failed", "error", err)
|
|
return
|
|
}
|
|
|
|
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100))
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case confirm, ok := <-confirms:
|
|
if !ok {
|
|
return
|
|
}
|
|
if !confirm.Ack {
|
|
logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info(ctx, "push message to rabbitMQ stopped by context cancel")
|
|
channel.Close()
|
|
return
|
|
case msg, ok := <-msgChan:
|
|
if !ok {
|
|
logger.Info(ctx, "push message to rabbitMQ stopped by msgChan closed")
|
|
channel.Close()
|
|
return
|
|
}
|
|
|
|
record := msg.Record
|
|
|
|
recordBytes, err := json.Marshal(record)
|
|
if err != nil {
|
|
logger.Error(ctx, "marshal message record failed", "event_uuid", record.EventUUID, "error", err)
|
|
continue
|
|
}
|
|
|
|
msgCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier))
|
|
headers := amqp.Table{}
|
|
otel.GetTextMapPropagator().Inject(msgCtx, amqpHeaderCarrier(headers))
|
|
|
|
routingKey := record.Category
|
|
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
err = channel.PublishWithContext(pubCtx,
|
|
constants.MessageExchangeName,
|
|
routingKey,
|
|
false,
|
|
false,
|
|
amqp.Publishing{
|
|
ContentType: "text/plain",
|
|
Body: recordBytes,
|
|
Headers: headers,
|
|
})
|
|
cancel()
|
|
|
|
if err != nil {
|
|
logger.Error(ctx, "publish message to rabbitMQ failed", "routing_key", routingKey, "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|