diff --git a/config/config.go b/config/config.go index d2ea6c5..2b4d14e 100644 --- a/config/config.go +++ b/config/config.go @@ -17,40 +17,40 @@ type config struct { } var conf *config -var confPath string +var confDir string func init() { - flag.StringVar(&confPath, "conf_path", "./configs", "conf path") + flag.StringVar(&confDir, "conf_dir", "./configs", "conf dir") flag.Parse() conf = new(config) conf.serverConf = new(serverConfig) - serverConf := confPath + string(os.PathSeparator) + serverConfigName() + serverConf := confDir + string(os.PathSeparator) + serverConfigName() conf.unmarshalJsonFile(serverConf, conf.serverConf) conf.logConf = new(logConfig) - logConf := confPath + string(os.PathSeparator) + logConfigName() + logConf := confDir + string(os.PathSeparator) + logConfigName() conf.unmarshalJsonFile(logConf, conf.logConf) conf.postgresConf = make(map[string]*postgresConfig) - postgresConf := confPath + string(os.PathSeparator) + postgresConfigName() + postgresConf := confDir + string(os.PathSeparator) + postgresConfigName() conf.unmarshalJsonFile(postgresConf, &conf.postgresConf) conf.influxConf = make(map[string]*influxConfig) - influxConf := confPath + string(os.PathSeparator) + influxConfigName() + influxConf := confDir + string(os.PathSeparator) + influxConfigName() conf.unmarshalJsonFile(influxConf, &conf.influxConf) conf.redisConf = make(map[string]*redisConfig) - redisConf := confPath + string(os.PathSeparator) + redisConfigName() + redisConf := confDir + string(os.PathSeparator) + redisConfigName() conf.unmarshalJsonFile(redisConf, &conf.redisConf) conf.mongoConf = make(map[string]*mongoConfig) - mongoConf := confPath + string(os.PathSeparator) + mongoConfigName() + mongoConf := confDir + string(os.PathSeparator) + mongoConfigName() conf.unmarshalJsonFile(mongoConf, &conf.mongoConf) conf.rabbitConf = make(map[string][]*rabbitConfig) - rabbitConf := confPath + string(os.PathSeparator) + rabbitConfigName() + rabbitConf := confDir + string(os.PathSeparator) + rabbitConfigName() conf.unmarshalJsonFile(rabbitConf, &conf.rabbitConf) } diff --git a/config/rabbit.go b/config/rabbit.go index 04b19f8..44ef6b0 100644 --- a/config/rabbit.go +++ b/config/rabbit.go @@ -16,9 +16,6 @@ func (conf *rabbitConfig) GenAddress(tls bool) string { } address := "amqp://" - if tls { - address = "amqps://" - } if conf.GetUsername() != "" && conf.GetPassword() != "" { address += conf.GetUsername() + ":" + conf.GetPassword() + "@" } diff --git a/configs/log.json b/configs/log.json index 92fbf19..8f0a813 100644 --- a/configs/log.json +++ b/configs/log.json @@ -1,6 +1,6 @@ { "filename": "./logs/datart.log", - "maxsize": 100, + "maxsize": 128, "maxage": 7, "maxbackups": 20, "localtime": true, diff --git a/data/data.go b/data/data.go index 9fcf129..b5ce372 100644 --- a/data/data.go +++ b/data/data.go @@ -2,24 +2,38 @@ package data import ( "context" + "datart/data/influx" + "datart/data/mongo" "datart/data/postgres" + "datart/data/rabbit" + "datart/data/redis" ) -type Process struct { +type Processes struct { cancel context.CancelFunc } -func NewProcess() *Process { - return new(Process) +func NewProcesses() *Processes { + return new(Processes) } -func (p *Process) StartDataProcessing() { +func (p *Processes) StartDataProcessing() { ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel + postgres.GenSSU2ChannelSizes(ctx, 500) + updatingRedisPhasor(ctx) } -func (p *Process) Cancel() { +func (p *Processes) Cancel(ctx context.Context) { p.cancel() + + eventNotifyPublisher.Close(ctx) + + influx.CloseDefault() + mongo.CloseDefault(ctx) + postgres.CloseDefault() + rabbit.CloseDefault(ctx) + redis.CloseDefault() } diff --git a/data/influx/ssu_point.go b/data/influx/ssu_point.go index ea2a8fa..a1dd82a 100644 --- a/data/influx/ssu_point.go +++ b/data/influx/ssu_point.go @@ -9,8 +9,8 @@ import ( ) const ( - dbphasor = "influxBucket" - dbsample = "influxBucket" + dbphasor = "ssuBucket" + dbsample = "ssuBucket" ) // keep consistent with telegraf @@ -35,13 +35,15 @@ const ( FieldSuffixRMS = "_rms" ) +const adaptedms = 5000 + func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { - req.Begin = time.Now().UnixMilli() - int64(limit*20+10000) + req.Begin = time.Now().UnixMilli() - int64(limit*20+adaptedms) return client.GetSSUPointLastLimit(ctx, req, limit) } func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { - req.Begin = time.Now().UnixMilli() - int64(limit*20+10000) + req.Begin = time.Now().UnixMilli() - int64(limit*20+adaptedms) return client.GetSSUPointsLastLimit(ctx, req, limit) } @@ -68,7 +70,7 @@ func (client *influxClient) GetSSUPointLastLimit(ctx context.Context, req *Reque req.SubPos, req.SubPos, req.Table, req.Station, req.MainPos) if limit > 1 { sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;", - req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20) + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit) } reqData := url.Values{ @@ -90,7 +92,7 @@ func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Requ strings.Join(fields, ","), req.Table, req.Station, req.MainPos) } else { sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;", - req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20) + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit) } reqData := url.Values{ @@ -105,7 +107,7 @@ func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Requ ret := make(map[string][]TV, len(f2tvs)) for f, tvs := range f2tvs { - ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple + ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs } return ret, nil @@ -154,7 +156,7 @@ func (client *influxClient) GetSSUPointsDurationData(ctx context.Context, req *R ret := make(map[string][]TV, len(f2tvs)) for f, tvs := range f2tvs { - ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple + ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs } return ret, nil @@ -188,7 +190,7 @@ func (client *influxClient) GetSSUPointsAfterLimit(ctx context.Context, req *Req ret := make(map[string][]TV, len(f2tvs)) for f, tvs := range f2tvs { - ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple + ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs } return ret, nil @@ -198,7 +200,7 @@ func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Re reqData := url.Values{ "db": {req.DB}, "q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms order by time desc limit %d;", - req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)}, // begin = req.End-20-20 + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)}, } f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv") @@ -208,7 +210,7 @@ func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Re ret := make(map[string][]TV, len(f2tvs)) for f, tvs := range f2tvs { - ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple + ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs } return ret, nil diff --git a/data/mongo/alarm.go b/data/mongo/alarm.go index ab40a4f..111dd87 100644 --- a/data/mongo/alarm.go +++ b/data/mongo/alarm.go @@ -7,17 +7,17 @@ import ( ) const ( - _ = iota - almCodeCommmExcept // 通信异常 - almCodeADFault // AD故障 - almCodePPSExcept // 同步秒脉冲异常 - almCodeReserve1 // 备用 - almCodeUnitInit // 单元初始化 - almCodeReadParamErr // 读参数错 - almCodeReserve2 // 备用 - almCodeStartSample // 启动采样-内部转换信号 - almCodeOverSample // 秒内采样点数过量 - almCodeUnderSample // 秒内采样点数欠量 + _ = iota + almCodeCommmExcept + almCodeADFault + almCodePPSExcept + almCodeReserve1 + almCodeUnitInit + almCodeReadParamErr + almCodeReserve2 + almCodeStartSample + almCodeOverSample + almCodeUnderSample ) type Alarm struct { @@ -47,11 +47,20 @@ func (a *Alarm) GetName() string { func (a *Alarm) GetType() int { switch a.AlarmCode { - case almCodeReserve1, almCodeReserve2, almCodeUnitInit, almCodeStartSample: + case almCodeReserve1, + almCodeReserve2, + almCodeUnitInit, + almCodeStartSample: return genEventType(0, 0) - case almCodeOverSample, almCodeUnderSample: + + case almCodeOverSample, + almCodeUnderSample: return genEventType(0, 1) - case almCodeCommmExcept, almCodeADFault, almCodePPSExcept, almCodeReadParamErr: + + case almCodeCommmExcept, + almCodeADFault, + almCodePPSExcept, + almCodeReadParamErr: return genEventType(0, 2) } @@ -60,11 +69,20 @@ func (a *Alarm) GetType() int { func (a *Alarm) GetPriority() int { switch a.AlarmCode { - case almCodeReserve1, almCodeReserve2, almCodeUnitInit, almCodeStartSample: + case almCodeReserve1, + almCodeReserve2, + almCodeUnitInit, + almCodeStartSample: return 1 - case almCodeOverSample, almCodeUnderSample: + + case almCodeOverSample, + almCodeUnderSample: return 4 - case almCodeCommmExcept, almCodeADFault, almCodePPSExcept, almCodeReadParamErr: + + case almCodeCommmExcept, + almCodeADFault, + almCodePPSExcept, + almCodeReadParamErr: return 7 } @@ -96,7 +114,7 @@ func (a *Alarm) ConvertToEvent(ip string) (*Event, error) { e.Timestamp = a.AlarmTime e.From = "station" e.Operations = append(e.Operations, &operation{ - Action: EventActionHappened, + Action: EventActionHappen, OP: ip, TS: a.AlarmTime, }) diff --git a/data/mongo/event.go b/data/mongo/event.go index e4cd131..0048d80 100644 --- a/data/mongo/event.go +++ b/data/mongo/event.go @@ -3,6 +3,7 @@ package mongo import ( "context" "encoding/json" + "time" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo/options" @@ -23,15 +24,37 @@ const ( ) const ( - EventActionHappened = "happened" + EventActionHappen = "happen" + EventActionDataAt = "data_attach" + EventActionReport = "report" + EventActionConfirm = "confirm" + EventActionPersist = "persist" + EventActionClose = "close" ) +var EventStatusAction = []string{ + EventStatusHappen: EventActionHappen, + EventStatusDataAt: EventActionDataAt, + EventStatusReport: EventActionReport, + EventStatusConfirm: EventActionConfirm, + EventStatusPersist: EventActionPersist, + EventStatusClose: EventActionClose, +} + type operation struct { Action string `bson:"action" json:"action"` OP string `bson:"op" json:"op"` TS int64 `bson:"ts" json:"ts"` } +func GenOperation(action, op string) operation { + return operation{ + Action: action, + OP: op, + TS: time.Now().UnixMilli(), + } +} + type Event struct { Event string `bson:"event" json:"event"` EventUUID string `bson:"event_uuid" json:"event_uuid"` @@ -54,7 +77,7 @@ func InsertOneEvent(ctx context.Context, doc any) error { return err } -func InsertEvents(ctx context.Context, docs []*Event) error { +func InsertEvents(ctx context.Context, docs any) error { _, err := getCollection(dbevent, tbevent).InsertMany(ctx, docs) return err } @@ -69,71 +92,16 @@ func DeleteEvents[T bson.M | bson.D](ctx context.Context, filter T) error { return err } -func UpsertOneEvent[T bson.M | bson.D](ctx context.Context, filter T, update T) error { - opts := options.UpdateOne().SetUpsert(true) - _, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update, opts) +func UpdateOneEvent[T bson.M | bson.D](ctx context.Context, filter T, update T) error { + _, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update) return err } -func UpsertEvents[T bson.M | bson.D](ctx context.Context, filter T, update T) error { - opts := options.UpdateMany().SetUpsert(true) - _, err := getCollection(dbevent, tbevent).UpdateMany(ctx, filter, update, opts) +func UpdateEvents[T bson.M | bson.D](ctx context.Context, filter T, update T) error { + _, err := getCollection(dbevent, tbevent).UpdateMany(ctx, filter, update) return err } -func FindOneEvent[T bson.M | bson.D](ctx context.Context, filter T) (*Event, error) { - doc := new(Event) - err := getCollection(dbevent, tbevent).FindOne(ctx, filter).Decode(doc) - // if errors.Is(err, mongo.ErrNoDocuments) { - // return nil, nil - // } - if err != nil { - return nil, err - } - - return doc, nil -} - -func FindEvents[T bson.M | bson.D](ctx context.Context, filter T) ([]*Event, error) { - cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter) - if err != nil { - return nil, err - } - defer cursor.Close(ctx) - - var docs []*Event - if err = cursor.All(ctx, &docs); err != nil { - return nil, err - } - - return docs, nil -} - -func FindEventsInBatch[T bson.M | bson.D](ctx context.Context, filter T, - batchSize int32) ([]*Event, error) { - - opt := options.Find().SetBatchSize(batchSize) - cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter, opt) - if err != nil { - return nil, err - } - defer cursor.Close(ctx) - - var docs []*Event - for cursor.Next(ctx) { - doc := new(Event) - if err = cursor.Decode(doc); err != nil { - return nil, err - } - docs = append(docs, doc) - } - if err := cursor.Err(); err != nil { - return docs, err - } - - return docs, nil -} - func FindEventsWithPageLimit[T bson.M | bson.D](ctx context.Context, filter T, sort int, page int64, limit int64) ([]*Event, error) { @@ -143,6 +111,7 @@ func FindEventsWithPageLimit[T bson.M | bson.D](ctx context.Context, filter T, } else { opt.SetSort(bson.D{{Key: "_id", Value: 1}}) } + if page > 0 && limit > 0 { opt.SetSkip(limit * (page - 1)).SetLimit(limit) } diff --git a/data/postgres/measurement.go b/data/postgres/measurement.go index accffaf..d6b3619 100644 --- a/data/postgres/measurement.go +++ b/data/postgres/measurement.go @@ -213,6 +213,8 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, return errors.New("invalid io_address") } + case 2: + default: return errors.New("invalid data_source.type") } diff --git a/data/publish_event.go b/data/publish_event.go index e192f0d..11ddf8e 100644 --- a/data/publish_event.go +++ b/data/publish_event.go @@ -30,7 +30,9 @@ func init() { m.Init(ctx, rm, rx, nil, nil) - m.DeclareExchange(ctx) + if err := m.DeclareExchange(ctx); err != nil { + panic(err) + } publisher, err := rabbit.NewPublisher(ctx, "default", &eventNotifyXQK) if err != nil { diff --git a/data/update_phasor.go b/data/update_phasor.go index 15b7fd4..3fa455f 100644 --- a/data/update_phasor.go +++ b/data/update_phasor.go @@ -12,7 +12,7 @@ import ( ) const ( - duration time.Duration = 5 * time.Second + updatePhasorDuration time.Duration = 5 * time.Second ) func updatingRedisPhasor(ctx context.Context) { @@ -30,7 +30,7 @@ func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit) ssuType := config.Conf().ServerConf().GetSSUType() for ssu := range ssuType { go func(ssu string) { - timer := time.Tick(duration) + timer := time.Tick(updatePhasorDuration) for { select { case <-timer: @@ -73,7 +73,7 @@ func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan } // if len(f2tvs) <= 0 { - // log.Info(channelSize.Station, " ", channelSize.Device, " ", + // log.Debug(channelSize.Station, " ", channelSize.Device, " ", // fields, " query none of ", channelSize.Size) // } diff --git a/log/log.go b/log/log.go index 602d022..bac7b2e 100644 --- a/log/log.go +++ b/log/log.go @@ -27,7 +27,7 @@ func init() { // ErrorLevel, 2, logs are high-priority. If an application is running smoothly, // it shouldn't generate any error-level logs. - // DPanicLevel, 3, logs are particularly important errors. In development the + // PanicLevel, 3, logs are particularly important errors. In development the // logger panics after writing the message. // PanicLevel, 4, logs a message, then panics. diff --git a/main.go b/main.go index 0deb7e8..624c02c 100644 --- a/main.go +++ b/main.go @@ -1,25 +1,44 @@ package main import ( + "context" "datart/config" "datart/data" "datart/route" + "os" + "os/signal" "strconv" + "syscall" + "time" "github.com/gin-gonic/gin" ) func main() { - gin.SetMode(gin.ReleaseMode) - engine := gin.New() + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + // gin.SetMode(gin.ReleaseMode) + + engine := gin.Default() + // engine := gin.New() route.LoadRoute(engine) - process := data.NewProcess() - process.StartDataProcessing() + processes := data.NewProcesses() + processes.StartDataProcessing() - port := strconv.Itoa(config.Conf().ServerConf().GetPort()) - if err := engine.Run(":" + port); err != nil { - panic(err) - } + go func() { + port := strconv.Itoa(config.Conf().ServerConf().GetPort()) + if err := engine.Run(":" + port); err != nil { + panic(err) + } + }() + + <-signalChan + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + processes.Cancel(ctx) } diff --git a/route/admin/command.go b/route/admin/command.go index e736662..002399a 100644 --- a/route/admin/command.go +++ b/route/admin/command.go @@ -50,7 +50,7 @@ func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, er } if req.Command != "GenSSU2ChannelSizes" { - return nil, errors.New("invalid function") + return nil, errors.New("invalid command") } return req, nil diff --git a/route/api/alarm.go b/route/api/alarm.go index a301461..9f9b57b 100644 --- a/route/api/alarm.go +++ b/route/api/alarm.go @@ -1,6 +1,7 @@ package api import ( + "context" "datart/data" "datart/data/mongo" "datart/log" @@ -11,8 +12,8 @@ import ( "github.com/gin-gonic/gin" ) -func (a *Api) PostInsertAlarm(ctx *gin.Context) { - alarm, ip, err := a.checkAndGenInsertAlarmRequest(ctx) +func (a *Api) PostUploadAlarm(ctx *gin.Context) { + alarm, ip, err := a.checkAndGenUploadAlarmRequest(ctx) if err != nil { log.Error(err) @@ -48,17 +49,12 @@ func (a *Api) PostInsertAlarm(ctx *gin.Context) { return } - err = data.PublishEvent(ctx.Request.Context(), event) - if err != nil { + go func(e *mongo.Event) { + if err := data.PublishEvent(context.Background(), e); err != nil { - log.Error(err, fmt.Sprintf(" params: %v", event)) - - ctx.JSON(200, gin.H{ - "code": 4, - "msg": "publish error", - }) - return - } + log.Error(err, fmt.Sprintf(" params: %v", e)) + } + }(event) ctx.JSON(200, gin.H{ "code": 0, @@ -66,7 +62,7 @@ func (a *Api) PostInsertAlarm(ctx *gin.Context) { }) } -func (a *Api) checkAndGenInsertAlarmRequest(ctx *gin.Context) (*mongo.Alarm, string, error) { +func (a *Api) checkAndGenUploadAlarmRequest(ctx *gin.Context) (*mongo.Alarm, string, error) { alarm := new(mongo.Alarm) err := ctx.ShouldBindJSON(alarm) diff --git a/route/api/event.go b/route/api/event.go index a36df53..1e24258 100644 --- a/route/api/event.go +++ b/route/api/event.go @@ -1,12 +1,15 @@ package api import ( + "context" + "datart/data" "datart/data/mongo" "datart/log" "errors" "fmt" "strconv" "strings" + "sync" "github.com/gin-gonic/gin" "github.com/google/uuid" @@ -50,7 +53,7 @@ func (a *Api) GetEvents(ctx *gin.Context) { } func (a *Api) PostUpsertEvents(ctx *gin.Context) { - noUUID, eventUUID, upsert, err := a.checkAndGenUpsertEventsRequest(ctx) + uuids, update, events, err := a.checkAndGenUpsertEventsRequest(ctx) if err != nil { log.Error(err) ctx.JSON(200, gin.H{ @@ -60,29 +63,58 @@ func (a *Api) PostUpsertEvents(ctx *gin.Context) { return } - if noUUID { - if err = mongo.InsertOneEvent(ctx.Request.Context(), upsert); err != nil { - - log.Error(err, fmt.Sprintf(" params: %v", upsert)) - + if len(uuids) > 0 { + operation := mongo.GenOperation(mongo.EventStatusAction[update.Status], ctx.RemoteIP()) + err := mongo.UpdateEvents(ctx.Request.Context(), + bson.M{"event_uuid": bson.M{"$in": uuids}}, + bson.M{"$set": bson.M{"status": update.Status}, "$push": bson.M{"operations": operation}}) + if err != nil { + log.Error(err, fmt.Sprintf(" params:%v %v", update, uuids)) ctx.JSON(200, gin.H{ "code": 2, "msg": err.Error(), }) return } - } else if err = mongo.UpsertOneEvent(ctx.Request.Context(), bson.M{"event_uuid": eventUUID}, - bson.M{"$set": bson.M(upsert)}); err != nil { - log.Error(err, fmt.Sprintf(" params: %v", upsert)) - - ctx.JSON(200, gin.H{ - "code": 3, - "msg": err.Error(), - }) - return + events = make([]map[string]any, len(uuids)) + for i := range events { + events[i] = map[string]any{"event_uuid": uuids[i], "status": update.Status} + } + } else { + err := mongo.InsertEvents(ctx.Request.Context(), events) + if err != nil { + log.Error(err, fmt.Sprintf(" params: %v", events)) + ctx.JSON(200, gin.H{ + "code": 3, + "msg": err.Error(), + }) + return + } } + go func(evts []map[string]any) { + workers := 5 + ch := make(chan map[string]any, len(evts)) + var wg sync.WaitGroup + for range workers { + wg.Add(1) + go func() { + defer wg.Done() + for e := range ch { + if err := data.PublishEvent(context.Background(), e); err != nil { + log.Error(err, fmt.Sprintf("publish event failed: %v", e)) + } + } + }() + } + for _, e := range evts { + ch <- e + } + close(ch) + wg.Wait() + }(events) + ctx.JSON(200, gin.H{ "code": 0, "msg": "success", @@ -178,40 +210,78 @@ func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64, } if pageSize > pageSizeLimit { - pageSize = pageSizeLimit + return nil, 0, -1, -1, fmt.Errorf("too many events, max %d", pageSizeLimit) } } return filter, sort, int64(pageNo), int64(pageSize), nil } -func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) (bool, string, map[string]any, error) { - e := map[string]any{} - err := ctx.ShouldBindJSON(&e) - if err != nil { - return false, "", nil, errors.New("invalid body param") +func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) ([]string, *mongo.Event, []map[string]any, error) { + insert := true + update := &mongo.Event{} + statusStr := ctx.Query("status") + if len(statusStr) > 0 { + insert = false + status, err := strconv.Atoi(statusStr) + if err != nil { + return nil, nil, nil, err + } + update.Status = status } - eventUUID := "" + if !insert { + uuids := []string{} + err := ctx.ShouldBindJSON(&uuids) + if err != nil { + return nil, nil, nil, err + } + if len(uuids) == 0 { + return nil, nil, nil, errors.New("no uuid") + } + if len(uuids) > pageSizeLimit { + return nil, nil, nil, fmt.Errorf("too many uuids, max %d", pageSizeLimit) + } + + return uuids, update, nil, nil + } + + events := []map[string]any{} + err := ctx.ShouldBindJSON(&events) + if err != nil { + return nil, nil, nil, err + } + if len(events) == 0 { + return nil, nil, nil, errors.New("no event") + } + if len(events) > pageSizeLimit { + return nil, nil, nil, fmt.Errorf("too many events, max %d", pageSizeLimit) + } + + return nil, nil, events, nil +} + +func validateEventUpsert(event map[string]any) error { noUUID := true - if eu, ok := e["event_uuid"]; ok { - if eUUID, ok := eu.(string); ok { - if uuid.Validate(eUUID) == nil { - eventUUID = eUUID + if eu, ok := event["event_uuid"]; ok { + if eID, ok := eu.(string); ok { + if uuid.Validate(eID) == nil { noUUID = false } } } if noUUID { - noUUID = true if uid, err := uuid.NewV7(); err != nil { - return false, "", nil, err + return err } else { - eventUUID = uid.String() - e["event_uuid"] = eventUUID + event["event_uuid"] = uid.String() } } - return noUUID, eventUUID, e, nil + if len(event) < 2 { + return errors.New("invalid event") + } + + return nil } diff --git a/route/api/point.go b/route/api/point.go index c9afdc7..a8e4b9b 100644 --- a/route/api/point.go +++ b/route/api/point.go @@ -13,8 +13,8 @@ import ( "github.com/gin-gonic/gin" ) -func (a *Api) GetPointData(ctx *gin.Context) { - request, err := a.checkAndGenGetPointRequest(ctx) +func (a *Api) GetPoints(ctx *gin.Context) { + request, err := a.checkAndGenGetPointsRequest(ctx) if err != nil { log.Error(err) @@ -59,7 +59,7 @@ func (a *Api) GetPointData(ctx *gin.Context) { }) } -func (a *Api) checkAndGenGetPointRequest(ctx *gin.Context) (*influx.Request, error) { +func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, error) { typeStr := ctx.DefaultQuery("type", "") if len(typeStr) <= 0 { diff --git a/route/route.go b/route/route.go index ac6992c..29bfa54 100644 --- a/route/route.go +++ b/route/route.go @@ -12,8 +12,8 @@ func LoadRoute(engine *gin.Engine) { a := new(api.Api) ga := engine.Group("api") - ga.POST("/alarm", a.PostInsertAlarm) - ga.GET("/points", a.GetPointData) + ga.POST("/alarm", a.PostUploadAlarm) + ga.GET("/points", a.GetPoints) ga.GET("/events", a.GetEvents) ga.POST("/events", a.PostUpsertEvents)