// Package event define real time data evnet operation functions package event import ( "context" "encoding/json" "eventRT/constants" "eventRT/database" "eventRT/logger" "eventRT/mq" amqp "github.com/rabbitmq/amqp091-go" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) // amqpHeaderCarrier adapts amqp.Table to propagation.TextMapCarrier for trace context extraction. type amqpHeaderCarrier amqp.Table func (c amqpHeaderCarrier) Get(key string) string { val, ok := amqp.Table(c)[key] if !ok { return "" } str, _ := val.(string) return str } func (c amqpHeaderCarrier) Set(key, value string) { amqp.Table(c)[key] = value } func (c amqpHeaderCarrier) Keys() []string { keys := make([]string, 0, len(c)) for k := range c { keys = append(keys, k) } return keys } var _ propagation.TextMapCarrier = amqpHeaderCarrier{} const ( queueName = "event-up-down-queue" exchangeName = "event-exchange" ) // ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm func ReceiptUpDownLimitAlarm(ctx context.Context) { conn := mq.GetConn() if conn == nil { logger.Error(ctx, "get rabbitMQ connection for receiving alarms failed") return } channel, err := conn.Channel() if err != nil { logger.Error(ctx, "open rabbitMQ channel for consumer failed", "error", err) return } defer channel.Close() uiChannel, err := initUIEventChannel(ctx) if err != nil { logger.Error(ctx, "init UI event channel failed", "error", err) return } defer uiChannel.Close() err = channel.QueueBind( queueName, // 队列名 "event.#.updown.*", exchangeName, // 交换机名 false, nil, ) if err != nil { logger.Error(ctx, "channel bind queue and exchange failed", "error", err) return } err = channel.Qos(1, 0, false) if err != nil { logger.Error(ctx, "set rabbitMQ Qos config failed", "error", err) } // registered consumer msgs, err := channel.Consume( queueName, // queue "", // consumer tag false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { logger.Error(ctx, "failed to register a consumer", "error", err) return } logger.Info(ctx, "started receiving up-down limit alarms from rabbitMQ") for { select { case <-ctx.Done(): logger.Info(ctx, "receipt up-down limit alarm stopped by context cancel") return case msg, ok := <-msgs: if !ok { logger.Error(ctx, "message channel closed, exiting consumer loop") return } processAlarmEventMessage(ctx, msg, uiChannel) } } } func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery, uiCh *amqp.Channel) { // extract upstream trace context injected by modelRT into AMQP headers ctx = otel.GetTextMapPropagator().Extract(ctx, amqpHeaderCarrier(msg.Headers)) ctx, span := otel.Tracer("eventRT/event").Start(ctx, "processAlarmEventMessage") defer span.End() logger.Info(ctx, "received event alarm from modelRT up and down limit compute process") mongodbClient := database.GetMongoClient() var alarmEvent EventRecord if err := json.Unmarshal(msg.Body, &alarmEvent); err != nil { logger.Error(ctx, "unmarshal alarm event message failed", "error", err) return } alarmEvent.IsPersisted = true alarmEvent.Status = constants.EventStatusReported result, err := mongodbClient.Database(constants.EventDBName).Collection(constants.EventCollectionName).InsertOne(ctx, alarmEvent) if err != nil { logger.Error(ctx, "failed to insert alarm event into database", "error", err) return } logger.Info(ctx, "alarm event inserted into database", "result", result) if err := msg.Ack(false); err != nil { logger.Error(ctx, "ack alarm event message failed", "error", err) return } if err := PublishEventToUI(ctx, uiCh, &alarmEvent); err != nil { logger.Error(ctx, "publish alarm event to UI failed", "event_uuid", alarmEvent.EventUUID, "error", err) } }