// Package mq provides read or write access to message queue services package mq import ( "context" "encoding/json" "time" "modelRT/constants" "modelRT/logger" amqp "github.com/rabbitmq/amqp091-go" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) // MessageMsgChan buffers message records to be published to the message exchange asynchronously var MessageMsgChan chan *EventMessage func init() { MessageMsgChan = make(chan *EventMessage, 10000) } func initMessageChannel(ctx context.Context) (*amqp.Channel, error) { channel, err := GetConn().Channel() if err != nil { logger.Error(ctx, "open rabbitMQ server channel failed", "error", err) return nil, err } err = channel.ExchangeDeclare(constants.MessageDeadExchangeName, "topic", true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare message dead letter exchange failed", "error", err) return nil, err } _, err = channel.QueueDeclare(constants.MessageDeadQueueName, true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare message dead letter queue failed", "error", err) return nil, err } err = channel.QueueBind(constants.MessageDeadQueueName, constants.MessageDeadRoutingKey, constants.MessageDeadExchangeName, false, nil) if err != nil { logger.Error(ctx, "bind message dead letter queue failed", "error", err) return nil, err } err = channel.ExchangeDeclare(constants.MessageExchangeName, "topic", true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare message exchange failed", "error", err) return nil, err } args := amqp.Table{ "x-dead-letter-exchange": constants.MessageDeadExchangeName, "x-dead-letter-routing-key": constants.MessageDeadRoutingKey, } _, err = channel.QueueDeclare(constants.MessageQueueName, true, false, false, false, args) if err != nil { logger.Error(ctx, "declare message queue failed", "error", err) return nil, err } err = channel.QueueBind(constants.MessageQueueName, constants.MessageRoutingKey, constants.MessageExchangeName, false, nil) if err != nil { logger.Error(ctx, "bind message queue failed", "error", err) return nil, err } if err := channel.Confirm(false); err != nil { logger.Error(ctx, "channel could not be put into confirm mode", "error", err) return nil, err } return channel, nil } // PushMessageToRabbitMQ publishes message records from msgChan to the message exchange. // The category of each record is used as the routing key so consumers can bind selectively. func PushMessageToRabbitMQ(ctx context.Context, msgChan chan *EventMessage) { channel, err := initMessageChannel(ctx) if err != nil { logger.Error(ctx, "initializing message rabbitMQ channel failed", "error", err) return } confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100)) go func() { for { select { case confirm, ok := <-confirms: if !ok { return } if !confirm.Ack { logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag) } case <-ctx.Done(): return } } }() for { select { case <-ctx.Done(): logger.Info(ctx, "push message to rabbitMQ stopped by context cancel") channel.Close() return case msg, ok := <-msgChan: if !ok { logger.Info(ctx, "push message to rabbitMQ stopped by msgChan closed") channel.Close() return } record := msg.Record recordBytes, err := json.Marshal(record) if err != nil { logger.Error(ctx, "marshal message record failed", "event_uuid", record.EventUUID, "error", err) continue } msgCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier)) headers := amqp.Table{} otel.GetTextMapPropagator().Inject(msgCtx, amqpHeaderCarrier(headers)) routingKey := record.Category pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) err = channel.PublishWithContext(pubCtx, constants.MessageExchangeName, routingKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body: recordBytes, Headers: headers, }) cancel() if err != nil { logger.Error(ctx, "publish message to rabbitMQ failed", "routing_key", routingKey, "error", err) } } } }