From 07b1958d4d8ea08e5f34d990f47356613cc38d16 Mon Sep 17 00:00:00 2001 From: zhuxu Date: Thu, 25 Jun 2026 14:45:57 +0800 Subject: [PATCH] feat: add upload alarm --- event/upload_event.go | 56 +++++++++++++++ go.mod | 1 + go.sum | 2 + handler/alarm_upload.go | 156 ++++++++++++++++++++++++++++++++++++++++ main.go | 5 ++ 5 files changed, 220 insertions(+) create mode 100644 event/upload_event.go create mode 100644 handler/alarm_upload.go diff --git a/event/upload_event.go b/event/upload_event.go new file mode 100644 index 0000000..42d2beb --- /dev/null +++ b/event/upload_event.go @@ -0,0 +1,56 @@ +package event + +import ( + "context" + "eventRT/constants" + "eventRT/database" + "eventRT/logger" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" +) + +func PersistAndPublishEvent(ctx context.Context, record *EventRecord) error { + ctx, span := otel.Tracer("eventRT/event").Start(ctx, "PersistAndPublishEvent") + defer span.End() + + span.SetAttributes( + attribute.String("event_uuid", record.EventUUID), + ) + + record.IsPersisted = true + record.Status = constants.EventStatusReported + record.Operations = append(record.Operations, OperationRecord{ + Action: "report", + Op: "eventrt", // TODO + TS: time.Now().UnixMilli(), + }) + + dbCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + _, err := database.GetMongoClient().Database(constants.EventDBName). + Collection(constants.EventCollectionName). + InsertOne(dbCtx, record) + if err != nil { + logger.Error(ctx, "insert event into database failed", + "error", err, "event", record) + return err + } + + uiCh, err := initUIEventChannel(ctx) + if err != nil { + logger.Error(ctx, "init UI event channel failed", "error", err) + return nil + } + defer uiCh.Close() + + if err := PublishEventToUI(ctx, uiCh, record); err != nil { + logger.Error(ctx, "publish event to UI failed", + "event_uuid", record.EventUUID, "error", err) + return nil + } + + return nil +} diff --git a/go.mod b/go.mod index ab36524..c008d24 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.26.3 require ( github.com/gin-gonic/gin v1.11.0 + github.com/gofrs/uuid v4.4.0+incompatible github.com/natefinch/lumberjack v2.0.0+incompatible github.com/rabbitmq/amqp091-go v1.10.0 github.com/spf13/viper v1.21.0 diff --git a/go.sum b/go.sum index 7c1fd03..b0b6c48 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw= github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= +github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= +github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= diff --git a/handler/alarm_upload.go b/handler/alarm_upload.go new file mode 100644 index 0000000..09faecd --- /dev/null +++ b/handler/alarm_upload.go @@ -0,0 +1,156 @@ +// Package handler define HTTP handler functions for eventRT service +package handler + +import ( + "errors" + "eventRT/constants" + "eventRT/event" + "eventRT/logger" + "net/http" + "regexp" + + "github.com/gin-gonic/gin" + "github.com/gofrs/uuid" +) + +func PostAlarmHandler(c *gin.Context) { + ctx := c.Request.Context() + + record, err := checkAndGenPostAlarmParam(c) + if err != nil { + logger.Error(ctx, "check and gen param failed", "error", err) + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "msg": "check and gen param failed", + }) + return + } + + if err := event.PersistAndPublishEvent(ctx, record); err != nil { + logger.Error(ctx, "persist and publish event failed", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 2, + "msg": "persist and publish event failed", + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "msg": "success", + }) +} + +func checkAndGenPostAlarmParam(c *gin.Context) (*event.EventRecord, error) { + a := new(alarm) + if err := c.ShouldBindJSON(a); err != nil { + return nil, err + } + + ok, err := regexp.MatchString(`ssu\d{3}`, a.DeviceNo) + if err != nil { + return nil, err + } + if !ok { + return nil, errors.New("invalid device_no") + } + + if a.AlarmCode < 1 || a.AlarmCode > 10 { + return nil, errors.New("invalid alarm_code") + } + + if a.AlarmStatus < 0 || a.AlarmStatus > 1 { + return nil, errors.New("invalid alarm_status") + } + + event, err := a.ConvertToEvent(c.RemoteIP()) // TODO + if err != nil { + return nil, err + } + + return event, nil +} + +var almCode2Name = []string{ + 0: "", // 占位 + 1: "通信异常", + 2: "AD故障", + 3: "同步秒脉冲异常", + 4: "备用", + 5: "单元初始化", + 6: "读参数错", + 7: "备用", + 8: "启动采样-内部转换信号", + 9: "秒内采样点数过量", + 10: "秒内采样点数欠量", +} + +type alarm struct { + DriverName string `bson:"driver_name" json:"driver_name"` + DeviceNo string `bson:"device_no" json:"device_no"` + AlarmCode int `bson:"alarm_code" json:"alarm_code"` + AlarmTime int64 `bson:"alarm_time" json:"alarm_time"` + AlarmStatus int `bson:"alarm_status" json:"alarm_status"` +} + +func (a *alarm) ConvertToEvent(op string) (*event.EventRecord, error) { + e := new(event.EventRecord) + uid, err := uuid.NewV4() + if err != nil { + return nil, err + } + + if a != nil { + e.EventName = almCode2Name[a.AlarmCode] + e.EventUUID = uid.String() + e.Type = a.GetType() + e.Priority = a.GetPriority() + e.Status = constants.EventStatusHappened + e.Timestamp = a.AlarmTime + e.From = "station" + e.Operations = append(e.Operations, event.OperationRecord{ + Action: "happen", + Op: op, + TS: a.AlarmTime, + }) + e.Origin = map[string]any{ + "driver_name": a.DriverName, + "device_no": a.DeviceNo, + "alarm_code": a.AlarmCode, + "alarm_time": a.AlarmTime, + "alarm_status": a.AlarmStatus, + } + } + + return e, nil +} + +func (a *alarm) GetType() int { + switch a.AlarmCode { + case 4, 5, 7, 8: + return 0 + + case 9, 10: + return 3 + + case 1, 2, 3, 6: + return 6 + } + + return -1 +} + +func (a *alarm) GetPriority() int { + switch a.AlarmCode { + case 4, 5, 7, 8: + return 1 + + case 9, 10: + return 4 + + case 1, 2, 3, 6: + return 7 + } + + return -1 +} diff --git a/main.go b/main.go index 8e71591..e2062c7 100644 --- a/main.go +++ b/main.go @@ -116,6 +116,11 @@ func main() { events.PATCH("/:event_uuid/close", handler.CloseEventHandler) } + api := engine.Group("/api") + { + api.POST("/alarm", handler.PostAlarmHandler) + } + server := http.Server{ Addr: eventRTConfig.ServiceAddr, Handler: engine,