diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000..6740251 --- /dev/null +++ b/event/event.go @@ -0,0 +1,39 @@ +// Package event define real time data evnet operation functions +package event + +// EventRecord define struct for CIM event record +type EventRecord struct { + // 事件名称 + EventName string `json:"event"` + // 事件唯一标识符 + EventUUID string `json:"event_uuid"` + // 事件类型 + Type int `json:"type"` + // 事件优先级 (0-9) + Priority int `json:"priority"` + // 事件状态 + Status int `json:"status"` + // 可选模板参数 + Category string `json:"category,omitempty"` + // 毫秒级时间戳 (Unix epoch) + Timestamp int64 `json:"timestamp"` + // 事件来源 (station, platform, msa) + From string `json:"from"` + // 事件场景描述对象 (如阈值、当前值) + Condition map[string]any `json:"condition"` + // 与事件相关的订阅信息 + AttachedSubscriptions []any `json:"attached_subscriptions"` + // 事件分析结果对象 + Result map[string]any `json:"result,omitempty"` + // 操作历史记录 (CIM ActivityRecord) + Operations []OperationRecord `json:"operations"` + // 子站告警原始数据 (CIM Alarm 数据) + Origin map[string]any `json:"origin,omitempty"` +} + +// OperationRecord 描述对事件的操作记录,如确认(acknowledgment)等 +type OperationRecord struct { + Action string `json:"action"` // 执行的动作,如 "acknowledgment" + Op string `json:"op"` // 操作人/操作账号标识 + TS int64 `json:"ts"` // 操作发生的毫秒时间戳 +} diff --git a/event/up_down_limit_alarm.go b/event/up_down_limit_alarm.go new file mode 100644 index 0000000..844c7c5 --- /dev/null +++ b/event/up_down_limit_alarm.go @@ -0,0 +1,102 @@ +// Package event define real time data evnet operation functions +package event + +import ( + "context" + "encoding/json" + + "eventRT/database" + "eventRT/logger" + "eventRT/mq" + + "go.mongodb.org/mongo-driver/bson" +) + +const ( + queueName = "event-alarm-queue" +) + +// ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm +func ReceiptUpDownLimitAlarm(ctx context.Context) { + conn := mq.GetConn() + if conn == nil { + logger.Error(ctx, "get rabbitMQ connection for receiving alarms failed") + return + } + + channel, err := conn.Channel() + if err != nil { + logger.Error(ctx, "open rabbitMQ channel for consumer failed", "error", err) + return + } + defer channel.Close() + + err = channel.Qos(1, 0, false) + if err != nil { + logger.Error(ctx, "set rabbitMQ Qos config failed", "error", err) + } + + // registered consumer + msgs, err := channel.Consume( + queueName, // queue + "", // consumer tag + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + logger.Error(ctx, "failed to register a consumer", "error", err) + return + } + + logger.Info(ctx, "started receiving up-down limit alarms from rabbitMQ") + + for { + select { + case <-ctx.Done(): + logger.Info(ctx, "receipt up-down limit alarm stopped by context cancel") + return + case msg, ok := <-msgs: + if !ok { + logger.Error(ctx, "message channel closed, exiting consumer loop") + return + } + processAlarmMessage(ctx, msg.Body) + } + } +} + +func processAlarmMessage(ctx context.Context, body []byte) { + logger.Info(ctx, "received event alarm from modelRT up and down limit compute process") + mongodbClient := database.GetMongoClient() + + var alarmEvent EventRecord + err := json.Unmarshal(body, &alarmEvent) + if err != nil { + logger.Error(ctx, "unmarshal alarm event message failed", "error", err) + return + } + + // TODO: 根据业务逻辑进行进一步处理,如写入数据库、推送前端等 + logger.Info(ctx, "alarm message processed successfully", "data_len", len(body)) + _, err = mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, alarmEvent) + if err != nil { + logger.Error(ctx, "failed to insert alarm event into database", "error", err) + return + } + + // TODO 尝试跳过中间结构体的方式插入到 mongoDB 中 + var doc bson.D + err = bson.UnmarshalExtJSON(body, true, &doc) + if err != nil { + logger.Error(ctx, "unmarshal alarm event message failed", "error", err) + } + result, err := mongodbClient.Database("eventdb").Collection("alarms").InsertOne(ctx, doc) + if err != nil { + logger.Error(ctx, "failed to insert alarm event into database", "error", err) + return + } + logger.Info(ctx, "alarm event inserted into database", "result", result) +} diff --git a/go.mod b/go.mod index 3050ae4..5dd96f6 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/rabbitmq/amqp091-go v1.10.0 github.com/spf13/viper v1.21.0 github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 - go.mongodb.org/mongo-driver v1.17.7 + go.mongodb.org/mongo-driver v1.17.9 go.uber.org/zap v1.27.1 ) diff --git a/go.sum b/go.sum index ac4444e..d66495d 100644 --- a/go.sum +++ b/go.sum @@ -108,8 +108,8 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.mongodb.org/mongo-driver v1.17.7 h1:a9w+U3Vt67eYzcfq3k/OAv284/uUUkL0uP75VE5rCOU= -go.mongodb.org/mongo-driver v1.17.7/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= +go.mongodb.org/mongo-driver v1.17.9 h1:IexDdCuuNJ3BHrELgBlyaH9p60JXAvdzWR128q+U5tU= +go.mongodb.org/mongo-driver v1.17.9/go.mod h1:LlOhpH5NUEfhxcAwG0UEkMqwYcc4JU18gtCdGudk/tQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= diff --git a/main.go b/main.go index 7833fbd..fbfd51d 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "eventRT/config" "eventRT/constants" "eventRT/database" + "eventRT/event" "eventRT/logger" "eventRT/mq" "eventRT/util" @@ -55,10 +56,10 @@ func main() { // init logger instance logger.InitLoggerInstance(eventRTConfig.LoggerConfig) - // init MongoDB client notifyCtx, stop := signal.NotifyContext(context.TODO(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) defer stop() + // init MongoDB client client := database.InitMongoInstance(notifyCtx, eventRTConfig) defer func() { disconnectCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -74,6 +75,8 @@ func main() { mq.InitRabbitProxy(notifyCtx, eventRTConfig.RabbitMQConfig) defer mq.GetConn().Close() + go event.ReceiptUpDownLimitAlarm(notifyCtx) + // use release mode in production if eventRTConfig.DeployEnv == constants.ProductionDeployMode { gin.SetMode(gin.ReleaseMode)