diff --git a/data/data.go b/data/data.go index 2b1437d..49c643b 100644 --- a/data/data.go +++ b/data/data.go @@ -28,6 +28,7 @@ func (p *Processes) StartDataProcessing() { } updatingRedisPhasor(ctx) + updatingRedisCL104(ctx) cl104.ConnectCLs() } diff --git a/data/influx/104_point.go b/data/influx/104_point.go new file mode 100644 index 0000000..8269060 --- /dev/null +++ b/data/influx/104_point.go @@ -0,0 +1,61 @@ +package influx + +import ( + "context" + "fmt" + "net/url" + "time" +) + +const ( + db104 = "influxBucket" + tb104 = "cl104" +) + +const ( + mainPos104Key = "ca" + subPos104Key = "ioa" + val104Key = "val" +) + +func Get104PointLast(ctx context.Context, req *Request) ([]TV, error) { + req.Begin = time.Now().UnixMilli() - int64(15*60*1000) + return client.Get104PointLastLimit(ctx, req, 1) +} + +func Get104PointDuration(ctx context.Context, req *Request) ([]TV, error) { + return client.Get104PointLastLimit(ctx, req, 1) +} + +func (client *influxClient) Get104PointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { + sql := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and %s='%s' and %s='%s';", + val104Key, val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos) + if limit > 1 { + sql = fmt.Sprintf("select %s from %s where station='%s' and and %s='%s' and %s='%s' and time>=%dms order by time desc limit %d;", + val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, limit) + } + + reqData := url.Values{ + "db": {req.DB}, + "q": {sql}, + } + + return client.getTVsResp(ctx, reqData, "csv") +} + +func (client *influxClient) Get104PointDurationData(ctx context.Context, req *Request) ([]TV, error) { + sql := fmt.Sprintf("select %s from %s where station='%s' and %s='%s' and %s='%s' and time>=%dms and time<=%dms;", + val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, req.End) + + if req.Operate != "" && req.Step != "" && req.Default != "" { + sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and %s='%s' and %s='%s' and time>=%dms and time<=%dms group by time(%s) fill(%s);", + req.Operate, req.SubPos, req.SubPos, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, req.End, req.Step, req.Default) + } + + reqData := url.Values{ + "db": {req.DB}, + "q": {sql}, + } + + return client.getTVsResp(ctx, reqData, "csv") +} diff --git a/data/influx/influx.go b/data/influx/influx.go index 43e0d8a..ee79f02 100644 --- a/data/influx/influx.go +++ b/data/influx/influx.go @@ -58,6 +58,8 @@ func GetDB(tp string) (string, error) { return dbphasor, nil case "sample": return dbsample, nil + case "104": + return db104, nil } return "", errors.New("invalid type") @@ -78,6 +80,8 @@ func GetTable(tp string, mainPos string) (string, error) { } case "sample": return "sample", nil + case "104": + return tb104, nil } return "", errors.New("invalid type") diff --git a/data/influx/ssu_point.go b/data/influx/ssu_point.go index 95b57c1..80b1e5a 100644 --- a/data/influx/ssu_point.go +++ b/data/influx/ssu_point.go @@ -9,8 +9,8 @@ import ( ) const ( - dbphasor = "ssuBucket" - dbsample = "ssuBucket" + dbphasor = "influxBucket" + dbsample = "influxBucket" ) // keep consistent with telegraf diff --git a/data/postgres/measurement.go b/data/postgres/measurement.go index bf67c9f..d0ed171 100644 --- a/data/postgres/measurement.go +++ b/data/postgres/measurement.go @@ -5,6 +5,7 @@ import ( "database/sql/driver" "encoding/json" "errors" + "strconv" "sync" "sync/atomic" ) @@ -83,8 +84,8 @@ type measurement struct { type ChannelSize struct { Type int Station string - Device string - Channel string + MainPos string + SubPos string Size int IRatio float64 IPolar int @@ -234,8 +235,8 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, channelSize := ChannelSize{ Type: 1, Station: station, - Device: device, - Channel: channel, + MainPos: device, + SubPos: channel, Size: record.Size, IRatio: 1, IPolar: 1, @@ -262,7 +263,52 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, } case 2: + if rawAddr, ok := record.DataSource.Addr.(map[string]any); ok { + station, ok := rawAddr["station"].(string) + if !ok { + return errors.New("invalid station") + } + pack, ok := rawAddr["package"].(int) + if !ok { + return errors.New("invalid package") + } + offset, ok := rawAddr["offset"].(int) + if !ok { + return errors.New("invalid offset") + } + mainPos := strconv.Itoa(pack) + subPos := strconv.Itoa(offset) + + if _, ok := ChannelSizes.Load(record.Tag); !ok { + channelSize := ChannelSize{ + Type: 2, + Station: station, + MainPos: mainPos, + SubPos: subPos, + Size: record.Size, + IRatio: 1, + IPolar: 1, + URatio: 1, + UPolar: 1, + } + + if record.Binding != nil { + if record.Binding.CT.Index > 0 { // depends on value of index + channelSize.IRatio = record.Binding.CT.Ratio + channelSize.IPolar = record.Binding.CT.Polarity + } + if record.Binding.PT.Index > 0 { // depends on value of index + channelSize.URatio = record.Binding.PT.Ratio + channelSize.UPolar = record.Binding.PT.Polarity + } + } + + ChannelSizes.Store(record.Tag, channelSize) + } else { + return errors.New("invalid io_address") + } + } default: return errors.New("invalid data_source.type") } diff --git a/data/update_104.go b/data/update_104.go new file mode 100644 index 0000000..a3de2d8 --- /dev/null +++ b/data/update_104.go @@ -0,0 +1,114 @@ +package data + +import ( + "context" + "datart/data/influx" + "datart/data/postgres" + "datart/data/redis" + "datart/log" + "sync" + "time" +) + +const ( + workerNum = 5 +) + +const ( + update104Duration time.Duration = time.Second * 1 +) + +type cl104Q struct { + Station string + CA string + IOA string + // Begin int64 + // End int64 +} + +type workerPool struct { + workerNum int + wg sync.WaitGroup + qChan chan cl104Q +} + +var cl104Pool *workerPool + +func init() { + cl104Pool = newWorkerPool(workerNum) +} + +func newWorkerPool(workerNum int) *workerPool { + return &workerPool{ + workerNum: workerNum, + qChan: make(chan cl104Q, 128), + } +} + +func updatingRedisCL104(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(update104Duration): + postgres.ChannelSizes.Range(func(key, value any) bool { + if channelSize, ok := value.(postgres.ChannelSize); ok && channelSize.Type == 2 { + cl104Pool.qChan <- cl104Q{ + Station: channelSize.Station, + CA: channelSize.MainPos, + IOA: channelSize.SubPos, + } + } + return true + }) + } + } + }() + + for range workerNum { + go func() { + for { + select { + case <-ctx.Done(): + return + default: + q := <-cl104Pool.qChan + + tvs, err := cl104Pool.queryInflux104(ctx, q) + if err != nil { + log.Error(err) + } else { + cl104Pool.refreshRedis(ctx, q, tvs) + } + } + } + }() + } +} + +func (wp *workerPool) queryInflux104(ctx context.Context, q cl104Q) ([]influx.TV, error) { + tvs, err := influx.Get104PointLast(ctx, &influx.Request{ + Type: "104", + Station: q.Station, + MainPos: q.CA, + SubPos: q.IOA, + }) + if err != nil { + return nil, err + } + + return tvs, nil +} + +func (wp *workerPool) refreshRedis(ctx context.Context, q cl104Q, tvs []influx.TV) { + key := genRedis104Key(q.Station, q.CA, q.IOA) + + members := convertTVsToMenmbers(tvs) + + redis.ZAtomicReplace(ctx, key, members) +} + +func genRedis104Key(station string, ca string, ioa string) string { + return station + ":104:" + ca + ":" + ioa +} diff --git a/data/update_phasor.go b/data/update_phasor.go index a5d6828..d918bc1 100644 --- a/data/update_phasor.go +++ b/data/update_phasor.go @@ -19,7 +19,7 @@ func updatingRedisPhasor(ctx context.Context) { ssuType := config.Conf().ServerConf().GetSSUType() ssuChans := make(map[string]chan zUnit, len(ssuType)) for ssu := range ssuType { - ssuChans[ssu] = make(chan zUnit, 32) + ssuChans[ssu] = make(chan zUnit, 64) } go queringSSUInfluxPhasor(ctx, ssuChans) @@ -65,8 +65,8 @@ func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) } func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) { - fields := GenPhasorFields(channelSize.Channel) - f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device, + fields := GenPhasorFields(channelSize.SubPos) + f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.MainPos, fields, channelSize.Size) if err != nil { log.Error(err) @@ -78,7 +78,7 @@ func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan // } for f, tvs := range f2tvs { - key := genRedisPhasorKey(channelSize.Station, channelSize.Device, f) + key := genRedisPhasorKey(channelSize.Station, channelSize.MainPos, f) for i := range tvs { tvs[i].Value = ToPrimary(f, tvs[i].Value, diff --git a/route/api/point.go b/route/api/point.go index 598b119..247f847 100644 --- a/route/api/point.go +++ b/route/api/point.go @@ -27,18 +27,30 @@ func (a *Api) GetPoints(ctx *gin.Context) { } var ftvs map[string][]influx.TV - switch { - case request.Begin > 0 && request.End > 0: - ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request) + switch request.Type { + case "104": + var tvs []influx.TV + switch { + case request.Begin > 0 && request.End > 0: + tvs, err = influx.Get104PointDuration(ctx.Request.Context(), request) + default: + tvs, err = influx.Get104PointLast(ctx.Request.Context(), request) + } + ftvs = map[string][]influx.TV{strings.Join(request.Tags, "."): tvs} + case "phasor": + switch { + case request.Begin > 0 && request.End > 0: + ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request) - case request.Begin > 0 && request.End < 0: - ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1) + case request.Begin > 0 && request.End < 0: + ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1) - case request.Begin < 0 && request.End > 0: - ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1) + case request.Begin < 0 && request.End > 0: + ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1) - default: - ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1) + default: + ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1) + } } if err != nil { @@ -83,10 +95,10 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er v, ok := postgres.ChannelSizes.Load(tags[len(tags)-1]) if ok { if channelSize, ok := v.(postgres.ChannelSize); ok { - fields := data.GenPhasorFields(channelSize.Channel) + fields := data.GenPhasorFields(channelSize.SubPos) station = channelSize.Station - mainPos = channelSize.Device + mainPos = channelSize.MainPos subPos = strings.Join(fields, ",") } } else { diff --git a/route/ws/ws.go b/route/ws/ws.go index 80a0903..ed339a5 100644 --- a/route/ws/ws.go +++ b/route/ws/ws.go @@ -10,10 +10,10 @@ type Ws struct{} var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, - WriteBufferSize: 4096, + WriteBufferSize: 1024, WriteBufferPool: &sync.Pool{ New: func() any { - return make([]byte, 4096) + return make([]byte, 1024) }, }, }