// Package mq provides read or write access to message queue services package mq import ( "context" "encoding/json" "time" "modelRT/constants" "modelRT/logger" "modelRT/mq/event" amqp "github.com/rabbitmq/amqp091-go" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) // amqpHeaderCarrier adapts amqp.Table to propagation.TextMapCarrier for trace context injection. type amqpHeaderCarrier amqp.Table func (c amqpHeaderCarrier) Get(key string) string { val, ok := amqp.Table(c)[key] if !ok { return "" } str, _ := val.(string) return str } func (c amqpHeaderCarrier) Set(key, value string) { amqp.Table(c)[key] = value } func (c amqpHeaderCarrier) Keys() []string { keys := make([]string, 0, len(c)) for k := range c { keys = append(keys, k) } return keys } var _ propagation.TextMapCarrier = amqpHeaderCarrier{} // EventMessage wraps an EventRecord with the trace context of the computation cycle that produced it. type EventMessage struct { Record *event.EventRecord TraceCarrier map[string]string } // MsgChan define variable of channel to store messages that need to be sent to rabbitMQ var MsgChan chan *EventMessage func init() { MsgChan = make(chan *EventMessage, 10000) } func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) { var channel *amqp.Channel var err error channel, err = GetConn().Channel() if err != nil { logger.Error(ctx, "open rabbitMQ server channel failed", "error", err) return nil, err } err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare event dead letter exchange failed", "error", err) return nil, err } _, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare event dead letter queue failed", "error", err) return nil, err } err = channel.QueueBind(constants.EventUpDownDeadQueueName, "#", constants.EventDeadExchangeName, false, nil) if err != nil { logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err) return nil, err } err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare event exchange failed", "error", err) return nil, err } args := amqp.Table{ "x-max-length": int32(50), "x-dead-letter-exchange": constants.EventDeadExchangeName, "x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey, } _, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args) if err != nil { logger.Error(ctx, "declare event queue failed", "error", err) return nil, err } err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil) if err != nil { logger.Error(ctx, "bind event queue with routing key and exchange failed", "error", err) return nil, err } if err := channel.Confirm(false); err != nil { logger.Error(ctx, "channel could not be put into confirm mode", "error", err) return nil, err } return channel, nil } // PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *EventMessage) { channel, err := initUpDownLimitEventChannel(ctx) if err != nil { logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err) return } // TODO 使用配置修改确认模式通道参数 confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100)) go func() { for { select { case confirm, ok := <-confirms: if !ok { return } if !confirm.Ack { logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag) } case <-ctx.Done(): return } } }() for { select { case <-ctx.Done(): logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel") channel.Close() return case msg, ok := <-msgChan: if !ok { logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop") channel.Close() return } eventRecord := msg.Record // TODO 将消息的序列化移动到发送之前,以便使用eventRecord的category来作为routing key recordBytes, err := json.Marshal(eventRecord) if err != nil { logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err) continue } // restore computation cycle trace context so the AMQP message carries the correct parent span msgCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier)) headers := amqp.Table{} otel.GetTextMapPropagator().Inject(msgCtx, amqpHeaderCarrier(headers)) // send event alarm message to rabbitMQ queue routingKey := eventRecord.Category pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) err = channel.PublishWithContext(pubCtx, constants.EventExchangeName, // exchange routingKey, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: recordBytes, Headers: headers, }) cancel() if err != nil { logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", recordBytes, "error", err) } } } }