From 71ebf6b9381c35e6f020c99cfb7e2fa4a37e9819 Mon Sep 17 00:00:00 2001 From: zhuxu Date: Thu, 6 Nov 2025 21:09:50 +0800 Subject: [PATCH] add api and update files --- configs/mongo.json | 8 +- configs/redis.json | 4 +- data/data.go | 2 + data/influx/common.go | 4 +- data/influx/influx.go | 74 +++------- data/influx/ssu_point.go | 147 ++++++++++++++----- data/mongo/alarm.go | 44 ++++-- data/mongo/event.go | 41 ++++-- data/mongo/mongo.go | 4 + data/postgres/measurement.go | 21 +-- data/postgres/postgres.go | 2 +- data/publish_event.go | 13 +- data/rabbit/client.go | 18 ++- data/rabbit/consume.go | 11 +- data/rabbit/{management.go => manage.go} | 10 +- data/rabbit/publish.go | 13 +- data/rabbit/rabbit.go | 10 +- data/update_phasor.go | 61 ++++---- route/admin/admin.go | 3 + route/admin/command.go | 57 ++++++++ route/api/alarm.go | 32 +++- route/api/event.go | 179 +++++++++++++++++++++++ route/api/point.go | 119 +++++++++++++++ route/route.go | 8 + util/util.go | 2 +- 25 files changed, 685 insertions(+), 202 deletions(-) rename data/rabbit/{management.go => manage.go} (78%) create mode 100644 route/admin/admin.go create mode 100644 route/admin/command.go diff --git a/configs/mongo.json b/configs/mongo.json index fb758a4..55487c7 100644 --- a/configs/mongo.json +++ b/configs/mongo.json @@ -1,9 +1,9 @@ { "default":{ - "addrs":["192.168.46.100:27017"], - "username":"mongo", - "password":"123RTYjkl", - "authsource":"events", + "addrs":["127.0.0.1:27017"], + "username":"admin", + "password":"password", + "authsource":"admin", "authmechanism":"SCRAM-SHA-256" } } \ No newline at end of file diff --git a/configs/redis.json b/configs/redis.json index 2d326ed..37d682f 100644 --- a/configs/redis.json +++ b/configs/redis.json @@ -6,8 +6,8 @@ "db":0, "protocol":3, "dialtimeout":50, - "readtimeout":200, - "writetimeout":200, + "readtimeout":250, + "writetimeout":250, "poolsize":20 } } \ No newline at end of file diff --git a/data/data.go b/data/data.go index 424bf61..3b2127b 100644 --- a/data/data.go +++ b/data/data.go @@ -3,6 +3,7 @@ package data import ( "context" "datart/data/influx" + "datart/data/postgres" "github.com/redis/go-redis/v9" ) @@ -18,6 +19,7 @@ func NewProcess() *Process { func (p *Process) StartDataProcessing() { ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel + postgres.GenSSU2ChannelSizes(ctx, 500) updatingRedisPhasor(ctx) } diff --git a/data/influx/common.go b/data/influx/common.go index e930c31..e2bf66f 100644 --- a/data/influx/common.go +++ b/data/influx/common.go @@ -264,7 +264,7 @@ func convertJsonToF2TVs(cols []string, data [][]any) (map[string][]TV, error) { func convertCsvToTVs(data [][]string) ([]TV, error) { ret := make([]TV, 0, len(data)) - for _, row := range data { + for _, row := range data[1:] { if len(row) > 3 { ns, err := strconv.ParseInt(row[2], 10, 64) if err != nil { @@ -288,7 +288,7 @@ func convertCsvToTVs(data [][]string) ([]TV, error) { func convertCsvToF2TVs(data [][]string) (map[string][]TV, error) { f2tvs := make(map[string][]TV) - for _, row := range data { + for _, row := range data[1:] { if len(row) > 3 { ns, err := strconv.ParseInt(row[2], 10, 64) if err != nil { diff --git a/data/influx/influx.go b/data/influx/influx.go index 736fc49..2ee0d0d 100644 --- a/data/influx/influx.go +++ b/data/influx/influx.go @@ -16,25 +16,6 @@ type influxClient struct { org string } -type Request struct { - RespType string - Bucket string - Measure string - Station string - MainPos string - SubPos string // separate whith ',' - Begin int64 - End int64 - Operate string - Step string - Default string -} - -const ( - PhasorBucket = "influxBucket" - SampleBucket = "influxBucket" -) - var client *influxClient func init() { @@ -71,19 +52,19 @@ func NewInfluxClient(cli *http.Client, url, org, token string) *influxClient { } } -func GetBucket(tp string) (string, error) { +func GetDB(tp string) (string, error) { switch tp { case "phasor": - return PhasorBucket, nil + return dbphasor, nil case "sample": - return SampleBucket, nil + return dbsample, nil } return "", errors.New("invalid type") } // serverConf -func GetMeasurement(tp string, mainPos string) (string, error) { +func GetTable(tp string, mainPos string) (string, error) { switch tp { case "phasor": ssu2Type := config.Conf().ServerConf().GetSSUType() @@ -102,39 +83,26 @@ func GetMeasurement(tp string, mainPos string) (string, error) { return "", errors.New("invalid type") } -func WriteLinesData(ctx context.Context, bucket string, data []byte) error { - return client.WriteLinesData(ctx, bucket, data) +func WriteLinesData(ctx context.Context, db string, data []byte) error { + return client.WriteLinesData(ctx, db, data) +} + +type Request struct { + DB string + Table string + + Type string + Station string + MainPos string + SubPos string // separate whith ',' + Begin int64 + End int64 + Operate string + Step string + Default string } type TV struct { Time int64 `json:"time"` Value float64 `json:"value"` } - -func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { - req.Begin = time.Now().UnixMilli() - int64(limit*20+20) - return client.GetSSUPointLastLimit(ctx, req, limit) -} - -func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { - req.Begin = time.Now().UnixMilli() - int64(limit*20+20) - return client.GetSSUPointsLastLimit(ctx, req, limit) -} - -func GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) { - return client.GetSSUPointDurationData(ctx, req) -} - -func GetSSUPointsDurationData(ctx context.Context, req *Request) (map[string][]TV, error) { - return client.GetSSUPointsDurationData(ctx, req) -} - -func GetSSUPointsAfterLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { - req.End = req.Begin + int64(limit*20+20) - return client.GetSSUPointsAfterLimit(ctx, req, limit) -} - -func GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { - req.Begin = req.End - int64(limit*20+20) - return client.GetSSUPointsBeforeLimit(ctx, req, limit) -} diff --git a/data/influx/ssu_point.go b/data/influx/ssu_point.go index 75d8681..9571e68 100644 --- a/data/influx/ssu_point.go +++ b/data/influx/ssu_point.go @@ -5,11 +5,18 @@ import ( "fmt" "net/url" "strings" + "time" ) const ( - FieldCPrifix string = "c" - FieldIPrefix string = "i" + dbphasor = "influxBucket" + dbsample = "influxBucket" +) + +// keep consistent with telegraf +const ( + FieldYCPrefix string = "tm" + FieldYXPrefix string = "ts" // FieldP string = "p" // FieldQ string = "q" @@ -28,20 +35,48 @@ const ( FieldSuffixRMS = "_rms" ) +func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { + req.Begin = time.Now().UnixMilli() - int64(limit*20+20) + return client.GetSSUPointLastLimit(ctx, req, limit) +} + +func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { + req.Begin = time.Now().UnixMilli() - int64(limit*20+20) + return client.GetSSUPointsLastLimit(ctx, req, limit) +} + +func GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) { + return client.GetSSUPointDurationData(ctx, req) +} + +func GetSSUPointsDurationData(ctx context.Context, req *Request) (map[string][]TV, error) { + return client.GetSSUPointsDurationData(ctx, req) +} + +func GetSSUPointsAfterLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { + req.End = req.Begin + int64(limit*20+20) + return client.GetSSUPointsAfterLimit(ctx, req, limit) +} + +func GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { + req.Begin = req.End - int64(limit*20+20) + return client.GetSSUPointsBeforeLimit(ctx, req, limit) +} + func (client *influxClient) GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { sql := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and device='%s';", - req.SubPos, req.SubPos, req.Measure, req.Station, req.MainPos) + req.SubPos, req.SubPos, req.Table, req.Station, req.MainPos) if limit > 1 { - sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms order by time desc limit %d;", - req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20) + sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;", + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20) } reqData := url.Values{ - "db": {req.Bucket}, + "db": {req.DB}, "q": {sql}, } - return client.getTVsResp(ctx, reqData, req.RespType) + return client.getTVsResp(ctx, reqData, "csv") } func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { @@ -52,40 +87,50 @@ func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Requ fields[i] = "last(" + field + ") as " + field } sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s';", - strings.Join(fields, ","), req.Measure, req.Station, req.MainPos) + strings.Join(fields, ","), req.Table, req.Station, req.MainPos) } else { - sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms order by time desc limit %d;", - req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20) + sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;", + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20) } reqData := url.Values{ - "db": {req.Bucket}, + "db": {req.DB}, "q": {sql}, } - return client.getF2TVsResp(ctx, reqData, req.RespType) + f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv") + if err != nil { + return f2tvs, nil + } + + ret := make(map[string][]TV, len(f2tvs)) + for f, tvs := range f2tvs { + ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple + } + + return ret, nil } func (client *influxClient) GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) { - sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms;", - req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End) + sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms;", + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End) if req.Operate != "" && req.Step != "" && req.Default != "" { - sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms group by time(%s) fill(%s);", - req.Operate, req.SubPos, req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End, req.Step, req.Default) + sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms group by time(%s) fill(%s);", + req.Operate, req.SubPos, req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, req.Step, req.Default) } reqData := url.Values{ - "db": {req.Bucket}, + "db": {req.DB}, "q": {sql}, } - return client.getTVsResp(ctx, reqData, req.RespType) + return client.getTVsResp(ctx, reqData, "csv") } func (client *influxClient) GetSSUPointsDurationData(ctx context.Context, req *Request) (map[string][]TV, error) { - sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms;", - req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End) + sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms;", + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End) if req.Operate != "" && req.Step != "" && req.Default != "" { subPoss := strings.Split(req.SubPos, ",") @@ -93,52 +138,82 @@ func (client *influxClient) GetSSUPointsDurationData(ctx context.Context, req *R for i, subPos := range subPoss { selectSections[i] = req.Operate + "(" + subPos + ")" + " as " + subPos } - sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms group by time(%s) fill(%s);", - strings.Join(selectSections, ", "), req.Measure, req.Station, req.MainPos, req.Begin, req.End, req.Step, req.Default) + sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms group by time(%s) fill(%s);", + strings.Join(selectSections, ", "), req.Table, req.Station, req.MainPos, req.Begin, req.End, req.Step, req.Default) } reqData := url.Values{ - "db": {req.Bucket}, + "db": {req.DB}, "q": {sql}, } - return client.getF2TVsResp(ctx, reqData, req.RespType) + f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv") + if err != nil { + return f2tvs, nil + } + + ret := make(map[string][]TV, len(f2tvs)) + for f, tvs := range f2tvs { + ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple + } + + return ret, nil } func (client *influxClient) GetSSUPointAfterLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms limit %d;", - req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End, limit) + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit) reqData := url.Values{ - "db": {req.Bucket}, + "db": {req.DB}, "q": {sql}, } - return client.getTVsResp(ctx, reqData, req.RespType) + return client.getTVsResp(ctx, reqData, "csv") } func (client *influxClient) GetSSUPointsAfterLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms limit %d;", - req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End, limit) + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit) reqData := url.Values{ - "db": {req.Bucket}, + "db": {req.DB}, "q": {sql}, } - return client.getF2TVsResp(ctx, reqData, req.RespType) + f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv") + if err != nil { + return f2tvs, nil + } + + ret := make(map[string][]TV, len(f2tvs)) + for f, tvs := range f2tvs { + ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple + } + + return ret, nil } func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { reqData := url.Values{ - "db": {req.Bucket}, - "q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms and time<=%dms order by time desc limit %d;", - req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End, limit)}, // begin = req.End-20-20 + "db": {req.DB}, + "q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms order by time desc limit %d;", + req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)}, // begin = req.End-20-20 } - return client.getF2TVsResp(ctx, reqData, req.RespType) + f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv") + if err != nil { + return f2tvs, nil + } + + ret := make(map[string][]TV, len(f2tvs)) + for f, tvs := range f2tvs { + ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple + } + + return ret, nil } -func (client *influxClient) WriteLinesData(ctx context.Context, bucket string, data []byte) error { - return client.writeLinesData(ctx, bucket, data, true) +func (client *influxClient) WriteLinesData(ctx context.Context, db string, data []byte) error { + return client.writeLinesData(ctx, db, data, true) } diff --git a/data/mongo/alarm.go b/data/mongo/alarm.go index 73035a0..540de1e 100644 --- a/data/mongo/alarm.go +++ b/data/mongo/alarm.go @@ -11,10 +11,10 @@ const ( almCodeCommmExcept // 通信异常 almCodeADFault // AD故障 almCodePPSExcept // 同步秒脉冲异常 - almCodeBackup // 备用 + almCodeReserve1 // 备用 almCodeUnitInit // 单元初始化 almCodeReadParamErr // 读参数错 - almCodeReserve // 备用 + almCodeReserve2 // 备用 almCodeStartSample // 启动采样-内部转换信号 almCodeOverSample // 秒内采样点数过量 almCodeUnderSample // 秒内采样点数欠量 @@ -28,14 +28,14 @@ type Alarm struct { AlarmStatus int `bson:"alarm_status" json:"alarm_status"` // 0 "复位", 1 "动作/产生/告警" } -var almCode2Name = map[int]string{ +var almCode2Name = []string{ almCodeCommmExcept: "通信异常", almCodeADFault: "AD故障", almCodePPSExcept: "同步秒脉冲异常", - almCodeBackup: "备用", + almCodeReserve1: "备用", almCodeUnitInit: "单元初始化", almCodeReadParamErr: "读参数错", - almCodeReserve: "备用", + almCodeReserve2: "备用", almCodeStartSample: "启动采样-内部转换信号", almCodeOverSample: "秒内采样点数过量", almCodeUnderSample: "秒内采样点数欠量", @@ -45,20 +45,46 @@ func (a *Alarm) GetName() string { return almCode2Name[a.AlarmCode] } +func (a *Alarm) GetType() int { + switch a.AlarmCode { + case almCodeReserve1, almCodeReserve2, almCodeUnitInit, almCodeStartSample: + return genEventType(0, 0) + case almCodeOverSample, almCodeUnderSample: + return genEventType(0, 1) + case almCodeCommmExcept, almCodeADFault, almCodePPSExcept, almCodeReadParamErr: + return genEventType(0, 2) + } + + return -1 +} + +func (a *Alarm) GetPriority() int { + switch a.AlarmCode { + case almCodeReserve1, almCodeReserve2, almCodeUnitInit, almCodeStartSample: + return 1 + case almCodeOverSample, almCodeUnderSample: + return 4 + case almCodeCommmExcept, almCodeADFault, almCodePPSExcept, almCodeReadParamErr: + return 7 + } + + return -1 +} + func (a *Alarm) ConvertToEvent(ip string) *Event { e := new(Event) if a != nil { e.Event = a.GetName() e.EventUUID = uuid.NewString() - e.Type = genEventType(0, 2) - e.Priority = 5 + e.Type = a.GetType() + e.Priority = a.GetPriority() e.Status = EventStatusHappen - e.Timestamp = a.AlarmTime / 1e3 // TODO ms ? + e.Timestamp = a.AlarmTime e.From = "station" e.Operations = append(e.Operations, &operation{ Action: EventActionHappened, // TODO OP: ip, - TS: a.AlarmTime / 1e3, + TS: a.AlarmTime, }) e.Alarm = a } diff --git a/data/mongo/event.go b/data/mongo/event.go index 33263f3..6532b27 100644 --- a/data/mongo/event.go +++ b/data/mongo/event.go @@ -9,8 +9,8 @@ import ( ) const ( - dbevent string = "event" - tbevent string = "event" + dbevent string = "cl" + tbevent string = "events" ) const ( @@ -59,31 +59,29 @@ func InsertEvents(ctx context.Context, docs []*Event) error { return err } -func DeleteOneEvent[T bson.M | *bson.D](ctx context.Context, filter T) error { +func DeleteOneEvent[T bson.M | bson.D](ctx context.Context, filter T) error { _, err := getCollection(dbevent, tbevent).DeleteOne(ctx, filter) return err } -func DeleteEvents[T bson.M | *bson.D](ctx context.Context, filter T) error { +func DeleteEvents[T bson.M | bson.D](ctx context.Context, filter T) error { _, err := getCollection(dbevent, tbevent).DeleteMany(ctx, filter) return err } -// insert if not update -func UpdateOneEvent[T bson.M | *bson.D](ctx context.Context, filter T, update T) error { +func UpsertOneEvent[T bson.M | bson.D](ctx context.Context, filter T, update T) error { opts := options.UpdateOne().SetUpsert(true) _, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update, opts) return err } -// insert if not update -func UpdateEvents[T bson.M | *bson.D](ctx context.Context, filter T, update T) error { +func UpsertEvents[T bson.M | bson.D](ctx context.Context, filter T, update T) error { opts := options.UpdateMany().SetUpsert(true) _, err := getCollection(dbevent, tbevent).UpdateMany(ctx, filter, update, opts) return err } -func FindOneEvent[T bson.M | *bson.D](ctx context.Context, filter T) (*Event, error) { +func FindOneEvent[T bson.M | bson.D](ctx context.Context, filter T) (*Event, error) { doc := new(Event) err := getCollection(dbevent, tbevent).FindOne(ctx, filter).Decode(doc) // if errors.Is(err, mongo.ErrNoDocuments) { @@ -96,7 +94,7 @@ func FindOneEvent[T bson.M | *bson.D](ctx context.Context, filter T) (*Event, er return doc, nil } -func FindEvents[T bson.M | *bson.D](ctx context.Context, filter T) ([]*Event, error) { +func FindEvents[T bson.M | bson.D](ctx context.Context, filter T) ([]*Event, error) { cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter) if err != nil { return nil, err @@ -111,7 +109,7 @@ func FindEvents[T bson.M | *bson.D](ctx context.Context, filter T) ([]*Event, er return docs, nil } -func FindEventsInBatch[T bson.M | *bson.D](ctx context.Context, filter T, +func FindEventsInBatch[T bson.M | bson.D](ctx context.Context, filter T, batchSize int32) ([]*Event, error) { opt := options.Find().SetBatchSize(batchSize) @@ -129,15 +127,23 @@ func FindEventsInBatch[T bson.M | *bson.D](ctx context.Context, filter T, } docs = append(docs, doc) } + if err := cursor.Err(); err != nil { + return docs, err + } return docs, nil } -func FindEventsWithPageLimit[T bson.M | *bson.D](ctx context.Context, filter T, +func FindEventsWithPageLimit[T bson.M | bson.D](ctx context.Context, filter T, sort int, page int64, limit int64) ([]*Event, error) { - // TODO - opt := options.Find().SetSort(bson.D{{Key: "timestamp", Value: sort}}). - SetSkip(limit * (page - 1)).SetLimit(limit) + + opt := options.Find() + if sort == 1 || sort == -1 { + opt.SetSort(bson.D{{Key: "timestamp", Value: sort}}) + } + if page > 0 && limit > 0 { + opt.SetSkip(limit * (page - 1)).SetLimit(limit) + } cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter, opt) if err != nil { @@ -153,13 +159,16 @@ func FindEventsWithPageLimit[T bson.M | *bson.D](ctx context.Context, filter T, } docs = append(docs, doc) } + if err := cursor.Err(); err != nil { + return docs, err + } return docs, nil } // sys: 0-hard/1-platform/2-application // -// level:1-info/2-warn/3-error +// level:0-info/1-warn/2-error func genEventType(sys int, level int) int { return sys + level*3 } diff --git a/data/mongo/mongo.go b/data/mongo/mongo.go index 82c15ac..b5ab0de 100644 --- a/data/mongo/mongo.go +++ b/data/mongo/mongo.go @@ -45,6 +45,10 @@ func Disconnect(ctx context.Context) error { return client.Disconnect(ctx) } +func GetSession() (*mongo.Session, error) { + return client.StartSession() +} + func getCollection(db string, tb string) *mongo.Collection { return client.Database(db).Collection(tb) } diff --git a/data/postgres/measurement.go b/data/postgres/measurement.go index c5ae795..69fdefc 100644 --- a/data/postgres/measurement.go +++ b/data/postgres/measurement.go @@ -13,15 +13,15 @@ const ( ) const ( - ChannelCPrefix string = "TM" - ChannelIPrefix string = "TS" - ChannelP string = "P" - ChannelQ string = "Q" - ChannelS string = "S" - ChannelPF string = "PF" - ChannelF string = "F" - ChannelDF string = "deltaF" - ChannelUPrefix string = "U" + ChannelYCPrefix string = "TM" + ChannelYXPrefix string = "TS" + ChannelP string = "P" + ChannelQ string = "Q" + ChannelS string = "S" + ChannelPF string = "PF" + ChannelF string = "F" + ChannelDF string = "dF" + ChannelUPrefix string = "U" ) type dataSource struct { @@ -114,6 +114,7 @@ func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error) var records []*measurement result := client.WithContext(ctx).Table(tbmeasurement).Where("id > ?", id). Order("id ASC").Limit(batchSize).Find(&records) + if result.Error != nil { return totalRecords, result.Error } @@ -137,9 +138,11 @@ func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error { var records []*measurement result := client.WithContext(ctx).Table(tbmeasurement). Where("id > ?", id).Order("id ASC").Limit(batchSize).Find(&records) + if result.Error != nil { return result.Error } + length := len(records) if length <= 0 { break diff --git a/data/postgres/postgres.go b/data/postgres/postgres.go index 3eca0f9..02ffc98 100644 --- a/data/postgres/postgres.go +++ b/data/postgres/postgres.go @@ -16,7 +16,7 @@ func init() { postgresConfig.GetHost(), postgresConfig.GetUser(), postgresConfig.GetPassword(), postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetSSLMode(), postgresConfig.GetTimeZone()) - db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{SkipDefaultTransaction: true}) if err != nil { panic(err) } diff --git a/data/publish_event.go b/data/publish_event.go index ec5c5bb..b889d60 100644 --- a/data/publish_event.go +++ b/data/publish_event.go @@ -10,16 +10,15 @@ import ( rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) -var eventXQR = rabbit.XQR{ - ExchangeName: "event_produce_exchange", - QueueName: "event_produce_queue", - RoutingKey: "event_produce_route", +var eventXQK = rabbit.XQK{ + Exchange: "event_produce_exchange", + // Key: "event_produce_key", } var eventPublisher *rmq.Publisher func init() { - publisher, err := rabbit.NewPublisher(context.Background(), "default", &eventXQR) + publisher, err := rabbit.NewPublisher(context.Background(), "default", &eventXQK) if err != nil { panic(err) } @@ -27,10 +26,6 @@ func init() { } func PublishEvent(ctx context.Context, event *mongo.Event) error { - if err := mongo.InsertOneEvent(ctx, event); err != nil { - return err - } - data, err := event.Marshall() if err != nil { return err diff --git a/data/rabbit/client.go b/data/rabbit/client.go index 9c2c5d2..04599d5 100644 --- a/data/rabbit/client.go +++ b/data/rabbit/client.go @@ -11,18 +11,22 @@ type rabbitClient struct { conn *rmq.AmqpConnection } -func NewClient(ctx context.Context, endpoints []rmq.Endpoint) (*rabbitClient, error) { +func NewClient(ctx context.Context, tag string) (*rabbitClient, error) { - env := rmq.NewClusterEnvironment(endpoints) - conn, err := client.env.NewConnection(ctx) + endpoints, err := genEndpoints(tag) if err != nil { return nil, err } - return &rabbitClient{ - env: env, - conn: conn, - }, nil + cli := new(rabbitClient) + cli.env = rmq.NewClusterEnvironment(endpoints) + conn, err := cli.env.NewConnection(context.Background()) + if err != nil { + return nil, err + } + client.conn = conn + + return cli, nil } func (c *rabbitClient) Management() *rmq.AmqpManagement { diff --git a/data/rabbit/consume.go b/data/rabbit/consume.go index 9882013..b4e0c6f 100644 --- a/data/rabbit/consume.go +++ b/data/rabbit/consume.go @@ -7,20 +7,17 @@ import ( rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) -func NewConsumer(ctx context.Context, tag string, xqr *XQR) (*rmq.Consumer, error) { +func NewConsumer(ctx context.Context, tag string, xqk *XQK) (*rmq.Consumer, error) { cli := client if tag != "default" { - endpoints, err := genEndpoints(tag) - if err != nil { - return nil, err - } - cli, err = NewClient(ctx, endpoints) + var err error + cli, err = NewClient(ctx, tag) if err != nil { return nil, err } } - return cli.conn.NewConsumer(ctx, xqr.QueueName, nil) + return cli.conn.NewConsumer(ctx, xqk.Queue, nil) } func Consuming(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byte) { diff --git a/data/rabbit/management.go b/data/rabbit/manage.go similarity index 78% rename from data/rabbit/management.go rename to data/rabbit/manage.go index 05b3f5c..291c5bc 100644 --- a/data/rabbit/management.go +++ b/data/rabbit/manage.go @@ -6,7 +6,7 @@ import ( rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) -type Management struct { +type Manage struct { m *rmq.AmqpManagement xName string x rmq.IExchangeSpecification @@ -16,7 +16,7 @@ type Management struct { b rmq.IBindingSpecification } -func (m *Management) Init(ctx context.Context, rm *rmq.AmqpManagement, +func (m *Manage) Init(ctx context.Context, rm *rmq.AmqpManagement, rx rmq.IExchangeSpecification, rq rmq.IQueueSpecification, rb rmq.IBindingSpecification) { @@ -26,7 +26,7 @@ func (m *Management) Init(ctx context.Context, rm *rmq.AmqpManagement, m.b = rb } -func (m *Management) DeclareExchangeQueue(ctx context.Context) error { +func (m *Manage) DeclareExchangeQueue(ctx context.Context) error { _, err := m.m.DeclareExchange(ctx, m.x) if err != nil { return err @@ -40,7 +40,7 @@ func (m *Management) DeclareExchangeQueue(ctx context.Context) error { return nil } -func (m *Management) DeclareAndBind(ctx context.Context) error { +func (m *Manage) DeclareAndBind(ctx context.Context) error { xinfo, err := m.m.DeclareExchange(ctx, m.x) if err != nil { return err @@ -62,7 +62,7 @@ func (m *Management) DeclareAndBind(ctx context.Context) error { return nil } -func (m *Management) UnbindAndDelete(ctx context.Context) (purged int, err error) { +func (m *Manage) UnbindAndDelete(ctx context.Context) (purged int, err error) { err = m.m.Unbind(ctx, m.bPath) if err != nil { return diff --git a/data/rabbit/publish.go b/data/rabbit/publish.go index 2395f03..5ced8b4 100644 --- a/data/rabbit/publish.go +++ b/data/rabbit/publish.go @@ -8,21 +8,18 @@ import ( rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) -func NewPublisher(ctx context.Context, tag string, xqr *XQR) (*rmq.Publisher, error) { +func NewPublisher(ctx context.Context, tag string, xqk *XQK) (*rmq.Publisher, error) { cli := client if tag != "default" { - endpoints, err := genEndpoints(tag) - if err != nil { - return nil, err - } - cli, err = NewClient(ctx, endpoints) + var err error + cli, err = NewClient(ctx, tag) if err != nil { return nil, err } } return cli.conn.NewPublisher(context.Background(), &rmq.ExchangeAddress{ - Exchange: xqr.ExchangeName, - Key: xqr.RoutingKey, + Exchange: xqk.Exchange, + Key: xqk.Key, }, nil) } diff --git a/data/rabbit/rabbit.go b/data/rabbit/rabbit.go index 805edbc..04877c8 100644 --- a/data/rabbit/rabbit.go +++ b/data/rabbit/rabbit.go @@ -8,11 +8,11 @@ import ( rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) -type XQR struct { - ExchangeName string `json:"exchangename" yaml:"exchangename"` - QueueName string `json:"queuename" yaml:"queuename"` - RoutingKey string `json:"routingkey" yaml:"routingkey"` - QueueLength int64 `json:"queuelength" yaml:"queuelength"` +type XQK struct { + Exchange string `json:"exchange" yaml:"exchange"` + Queue string `json:"queue" yaml:"queue"` + Key string `json:"key" yaml:"key"` + QueueCap int64 `json:"queuecap" yaml:"queuecap"` } var client *rabbitClient diff --git a/data/update_phasor.go b/data/update_phasor.go index 547a93f..6d99b16 100644 --- a/data/update_phasor.go +++ b/data/update_phasor.go @@ -84,22 +84,23 @@ func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan func queryInfluxPhasor(ctx context.Context, station string, device string, fileds []string, size int) (map[string][]influx.TV, error) { - measure, err := influx.GetMeasurement("phasor", device) + bucket, err := influx.GetDB("phasor") if err != nil { return nil, err } - bucket, err := influx.GetBucket("phasor") + + measure, err := influx.GetTable("phasor", device) if err != nil { return nil, err } req := &influx.Request{ - RespType: "csv", - Bucket: bucket, - Measure: measure, - Station: station, - MainPos: device, - SubPos: strings.Join(fileds, ","), + DB: bucket, + Table: measure, + Type: "phasor", + Station: station, + MainPos: device, + SubPos: strings.Join(fileds, ","), } return influx.GetSSUPointsLastLimit(ctx, req, size) } @@ -110,27 +111,35 @@ func updateZUnitToRedis(ctx context.Context, unit zUnit) error { func genPhasorFields(channel string) []string { fields := make([]string, 0, 3) + switch { - case strings.HasPrefix(channel, postgres.ChannelCPrefix): - if after, ok := strings.CutPrefix(channel, postgres.ChannelCPrefix); ok { - fields = append(fields, - influx.FieldCPrifix+after+influx.FieldSuffixAMP, - influx.FieldCPrifix+after+influx.FieldSuffixPA, - influx.FieldCPrifix+after+influx.FieldSuffixRMS) - } - case strings.HasPrefix(channel, postgres.ChannelIPrefix): - if after, ok := strings.CutPrefix(channel, postgres.ChannelCPrefix); ok { - fields = append(fields, influx.FieldIPrefix+after) - } + case strings.HasPrefix(channel, postgres.ChannelYCPrefix): + + fieldPrefix := strings.ToLower(channel) + fields = append(fields, + fieldPrefix+influx.FieldSuffixAMP, + fieldPrefix+influx.FieldSuffixPA, + fieldPrefix+influx.FieldSuffixRMS) + + case strings.HasPrefix(channel, postgres.ChannelYXPrefix): + + fields = append(fields, strings.ToLower(channel)) + case strings.HasPrefix(channel, postgres.ChannelUPrefix): + fieldUPrefix := strings.ToLower(channel) - fields = append(fields, fieldUPrefix+influx.FieldSuffixAMP, - fieldUPrefix+influx.FieldSuffixPA, fieldUPrefix+influx.FieldSuffixRMS) - case channel == postgres.ChannelDF: - fields = append(fields, influx.FieldDF) - case channel == postgres.ChannelP, channel == postgres.ChannelQ, - channel == postgres.ChannelS, channel == postgres.ChannelPF, - channel == postgres.ChannelF: + fields = append(fields, + fieldUPrefix+influx.FieldSuffixAMP, + fieldUPrefix+influx.FieldSuffixPA, + fieldUPrefix+influx.FieldSuffixRMS) + + case channel == postgres.ChannelP, + channel == postgres.ChannelQ, + channel == postgres.ChannelS, + channel == postgres.ChannelPF, + channel == postgres.ChannelF, + channel == postgres.ChannelDF: + fields = append(fields, strings.ToLower(channel)) } diff --git a/route/admin/admin.go b/route/admin/admin.go new file mode 100644 index 0000000..c79646c --- /dev/null +++ b/route/admin/admin.go @@ -0,0 +1,3 @@ +package admin + +type Admin struct{} diff --git a/route/admin/command.go b/route/admin/command.go new file mode 100644 index 0000000..d114cab --- /dev/null +++ b/route/admin/command.go @@ -0,0 +1,57 @@ +package admin + +import ( + "datart/data/postgres" + "datart/log" + "errors" + "fmt" + + "github.com/gin-gonic/gin" +) + +type command struct { + Function string `json:"function"` + Timeout int64 `json:"timeout"` + Args []any `json:"args"` +} + +func (a *Admin) PostExecuteCommand(ctx *gin.Context) { + req, err := a.checkAndGenExecuteCommandRequest(ctx) + if err != nil { + log.Error(err) + ctx.JSON(200, gin.H{ + "code": 1, + "msg": err.Error(), + }) + return + } + + err = postgres.GenSSU2ChannelSizes(ctx.Request.Context(), 500) + if err != nil { + log.Error(err, fmt.Sprintf(" params: %v", req)) + ctx.JSON(200, gin.H{ + "code": 2, + "msg": err.Error(), + }) + return + } + + ctx.JSON(200, gin.H{ + "code": 0, + "msg": "success", + }) +} + +func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, error) { + req := new(command) + err := ctx.ShouldBindJSON(req) + if err != nil { + return req, errors.New("invalid body param") + } + + if req.Function != "GenSSU2ChannelSizes" { + return nil, errors.New("invalid function") + } + + return req, nil +} diff --git a/route/api/alarm.go b/route/api/alarm.go index 931648f..328da88 100644 --- a/route/api/alarm.go +++ b/route/api/alarm.go @@ -1,9 +1,11 @@ package api import ( + "datart/data" "datart/data/mongo" "datart/log" "errors" + "fmt" "regexp" "github.com/gin-gonic/gin" @@ -12,7 +14,9 @@ import ( func (a *Api) PostInsertAlarm(ctx *gin.Context) { alarm, ip, err := a.checkAndGenInsertAlarmRequest(ctx) if err != nil { + log.Error(err) + ctx.JSON(200, gin.H{ "code": 1, "msg": "invalid param", @@ -20,8 +24,30 @@ func (a *Api) PostInsertAlarm(ctx *gin.Context) { return } - _ = alarm - _ = ip + event := alarm.ConvertToEvent(ip) + err = mongo.InsertOneEvent(ctx.Request.Context(), event) + if err != nil { + + log.Error(err, fmt.Sprintf(" params: %v", event)) + + ctx.JSON(200, gin.H{ + "code": 2, + "msg": "insert error", + }) + return + } + + err = data.PublishEvent(ctx.Request.Context(), event) + if err != nil { + + log.Error(err, fmt.Sprintf(" params: %v", event)) + + ctx.JSON(200, gin.H{ + "code": 3, + "msg": "publish error", + }) + return + } ctx.JSON(200, gin.H{ "code": 0, @@ -31,6 +57,7 @@ func (a *Api) PostInsertAlarm(ctx *gin.Context) { func (a *Api) checkAndGenInsertAlarmRequest(ctx *gin.Context) (*mongo.Alarm, string, error) { alarm := new(mongo.Alarm) + err := ctx.ShouldBindJSON(alarm) if err != nil { return nil, "", err @@ -40,6 +67,7 @@ func (a *Api) checkAndGenInsertAlarmRequest(ctx *gin.Context) (*mongo.Alarm, str if err != nil { return nil, "", err } + if !ok { return nil, "", errors.New("invalid device_no") } diff --git a/route/api/event.go b/route/api/event.go index 778f64e..c385f72 100644 --- a/route/api/event.go +++ b/route/api/event.go @@ -1 +1,180 @@ package api + +import ( + "datart/data/mongo" + "datart/log" + "errors" + "fmt" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "go.mongodb.org/mongo-driver/v2/bson" +) + +const ( + pageSizeLimit = 100 +) + +func (a *Api) GetEvents(ctx *gin.Context) { + filter, sort, pageNo, pageSize, err := a.checkAndGenGetEventsRequest(ctx) + if err != nil { + + log.Error(err) + + ctx.JSON(200, gin.H{ + "code": 1, + "msg": err.Error(), + }) + return + } + + events, err := mongo.FindEventsWithPageLimit(ctx.Request.Context(), filter, sort, pageNo, pageSize) + if err != nil { + + log.Error(err, fmt.Sprintf(" params: %v, %d, %d, %d", filter, sort, pageNo, pageSize)) + + ctx.JSON(200, gin.H{ + "code": 2, + "msg": err.Error(), + }) + return + } + + ctx.JSON(200, gin.H{ + "code": 0, + "msg": "success", + "data": events, + }) +} + +func (a *Api) PostUpsertEvents(ctx *gin.Context) { + filter, update, err := a.checkAndGenUpsertEventsRequest(ctx) + if err != nil { + log.Error(err) + ctx.JSON(200, gin.H{ + "code": 1, + "msg": err.Error(), + }) + return + } + + if err = mongo.UpsertOneEvent(ctx.Request.Context(), filter, update); err != nil { + log.Error(err, fmt.Sprintf(" params: %v, %v", filter, update)) + ctx.JSON(200, gin.H{ + "code": 2, + "msg": err.Error(), + }) + return + } + + ctx.JSON(200, gin.H{ + "code": 0, + "msg": "success", + }) +} + +func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64, int64, error) { + uuidStr := ctx.Query("uuid") + if len(uuidStr) > 0 { + + if uuid.Validate(uuidStr) != nil { + return nil, 0, -1, -1, errors.New("invalid uuid") + } + + return bson.M{"event_uuid": uuidStr}, 0, -1, -1, nil + } + + filter := bson.M{} + + var err error + begin, end := int64(-1), int64(-1) + beginStr := ctx.Query("begin") + if len(beginStr) > 0 { + if begin, err = strconv.ParseInt(beginStr, 10, 64); err != nil { + return nil, 0, -1, -1, err + } + } + + endStr := ctx.Query("end") + if len(endStr) > 0 { + if end, err = strconv.ParseInt(endStr, 10, 64); err != nil { + return nil, 0, -1, -1, err + } + } + + if begin > 0 && end > 0 && begin > end { + return nil, 0, -1, -1, errors.New("invalid time") + } + + switch { + case begin > 0 && end < 0: + filter["timestamp"] = bson.M{"$gte": begin} + case begin < 0 && end > 0: + filter["timestamp"] = bson.M{"$lte": end} + case begin > 0 && end > 0: + filter["timestamp"] = bson.M{"$gte": begin, "$lte": end} + } + + var sort int + sortStr := ctx.Query("sort") + if len(sortStr) > 0 { + s, err := strconv.Atoi(sortStr) + if err != nil { + return nil, 0, -1, -1, err + } + + if s != 1 && s != -1 { + return nil, 0, -1, -1, errors.New("invalid sort") + } + sort = s + } + + pageNo, pageSize := -1, -1 + pageNoStr := ctx.Query("page_no") + pageSizeStr := ctx.Query("page_size") + if len(pageNoStr) > 0 && len(pageSizeStr) > 0 { + pageNo, err = strconv.Atoi(pageNoStr) + if err != nil { + return nil, 0, -1, -1, err + } + + pageSize, err = strconv.Atoi(pageSizeStr) + if err != nil { + return nil, 0, -1, -1, err + } + + if pageNo <= 0 || pageSize <= 0 { + return nil, 0, -1, -1, errors.New("invalid page param") + } + + if pageSize > pageSizeLimit { + pageSize = pageSizeLimit + } + } + + return filter, sort, int64(pageNo), int64(pageSize), nil +} + +func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) (bson.M, bson.M, error) { + e := map[string]any{} + err := ctx.ShouldBindJSON(&e) + if err != nil { + return nil, nil, errors.New("invalid body param") + } + + eventUUID := "" + if eu, ok := e["event_uuid"]; ok { + if eUUID, ok := eu.(string); ok { + if uuid.Validate(eUUID) != nil { + return nil, nil, errors.New("invalid param") + } else { + eventUUID = eUUID + } + } + } else { + return nil, nil, errors.New("no uuid") + } + + return bson.M{"event_uuid": eventUUID}, bson.M{"$set": bson.M(e)}, nil +} diff --git a/route/api/point.go b/route/api/point.go index 778f64e..41249d3 100644 --- a/route/api/point.go +++ b/route/api/point.go @@ -1 +1,120 @@ package api + +import ( + "datart/data/influx" + "datart/log" + "datart/util" + "errors" + "fmt" + + "github.com/gin-gonic/gin" +) + +func (a *Api) GetPointData(ctx *gin.Context) { + request, err := a.checkAndGenGetPointRequest(ctx) + if err != nil { + + log.Error(err) + + ctx.JSON(200, gin.H{ + "code": 1, + "msg": err.Error(), + }) + return + } + + var data map[string][]influx.TV + switch { + case request.Begin > 0 && request.End > 0: + data, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request) + + case request.Begin > 0 && request.End < 0: + data, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1) + + case request.Begin < 0 && request.End > 0: + data, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1) + + default: + data, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1) + } + + if err != nil { + + log.Error(err, fmt.Sprintf(" params: %v", request)) + + ctx.JSON(200, gin.H{ + "code": 2, + "msg": "query database error", + }) + return + } + + ctx.JSON(200, gin.H{ + "code": 0, + "msg": "success", + "data": data, + }) +} + +func (a *Api) checkAndGenGetPointRequest(ctx *gin.Context) (*influx.Request, error) { + + typeStr := ctx.DefaultQuery("type", "") + if len(typeStr) <= 0 { + return nil, errors.New("invalid type") + } + + // tag TODO + + station := ctx.DefaultQuery("station", "") + if len(station) <= 0 { + return nil, errors.New("invalid station") + } + + mainPos := ctx.DefaultQuery("main_pos", "") + if len(mainPos) <= 0 { + return nil, errors.New("invalid main_pos") + } + + subPos := ctx.DefaultQuery("sub_pos", "") + if len(subPos) <= 0 { + return nil, errors.New("invalid sub_pos") + } + + beginStr := ctx.DefaultQuery("begin", "") + + endStr := ctx.DefaultQuery("end", "") + + operate := ctx.DefaultQuery("operate", "") + + step := ctx.DefaultQuery("step", "") + + defaultV := ctx.DefaultQuery("default", "") + + begin := util.ConvertToInt64Default(beginStr, -1) + + end := util.ConvertToInt64Default(endStr, -1) + + bucket, err := influx.GetDB(typeStr) + if err != nil { + return nil, err + } + + measure, err := influx.GetTable(typeStr, mainPos) + if err != nil { + return nil, err + } + + return &influx.Request{ + DB: bucket, + Table: measure, + Type: typeStr, + Station: station, + MainPos: mainPos, + SubPos: subPos, + Begin: begin, + End: end, + Operate: operate, + Step: step, + Default: defaultV, + }, nil +} diff --git a/route/route.go b/route/route.go index 5055d46..996e057 100644 --- a/route/route.go +++ b/route/route.go @@ -1,6 +1,7 @@ package route import ( + "datart/route/admin" "datart/route/api" "github.com/gin-gonic/gin" @@ -12,4 +13,11 @@ func LoadRoute(engine *gin.Engine) { a := new(api.Api) ga := engine.Group("api") ga.POST("/alarm", a.PostInsertAlarm) + ga.GET("/points", a.GetPointData) + ga.GET("/events", a.GetEvents) + ga.POST("/events", a.PostUpsertEvents) + + d := new(admin.Admin) + gd := engine.Group("admin") + gd.POST("/command", d.PostExecuteCommand) } diff --git a/util/util.go b/util/util.go index 834b3a7..1763d6b 100644 --- a/util/util.go +++ b/util/util.go @@ -4,7 +4,7 @@ import ( "strconv" ) -func ConvertToTimestampDefault(tsStr string, defaultTS int64) int64 { +func ConvertToInt64Default(tsStr string, defaultTS int64) int64 { ts, err := strconv.ParseInt(tsStr, 10, 64) if err != nil { return defaultTS