From 345f6d4a3d46c831ad8c1ca7b891077415fbba6c Mon Sep 17 00:00:00 2001 From: zhuxu Date: Fri, 20 Mar 2026 17:37:39 +0800 Subject: [PATCH] query and update 104 --- data/data.go | 1 + data/influx/104_point.go | 61 ++++++++++++ data/influx/common.go | 2 +- data/influx/influx.go | 4 + data/influx/ssu_point.go | 4 +- data/postgres/measurement.go | 54 ++++++++++- data/update_104.go | 124 +++++++++++++++++++++++++ data/update_phasor.go | 8 +- route/{ws => api}/104up.go | 40 +++++--- route/{admin/command.go => api/cmd.go} | 6 +- route/api/point.go | 40 +++++--- route/route.go | 12 +-- route/ws/ws.go | 19 ---- 13 files changed, 306 insertions(+), 69 deletions(-) create mode 100644 data/influx/104_point.go create mode 100644 data/update_104.go rename route/{ws => api}/104up.go (81%) rename route/{admin/command.go => api/cmd.go} (84%) delete mode 100644 route/ws/ws.go diff --git a/data/data.go b/data/data.go index 2b1437d..49c643b 100644 --- a/data/data.go +++ b/data/data.go @@ -28,6 +28,7 @@ func (p *Processes) StartDataProcessing() { } updatingRedisPhasor(ctx) + updatingRedisCL104(ctx) cl104.ConnectCLs() } diff --git a/data/influx/104_point.go b/data/influx/104_point.go new file mode 100644 index 0000000..8269060 --- /dev/null +++ b/data/influx/104_point.go @@ -0,0 +1,61 @@ +package influx + +import ( + "context" + "fmt" + "net/url" + "time" +) + +const ( + db104 = "influxBucket" + tb104 = "cl104" +) + +const ( + mainPos104Key = "ca" + subPos104Key = "ioa" + val104Key = "val" +) + +func Get104PointLast(ctx context.Context, req *Request) ([]TV, error) { + req.Begin = time.Now().UnixMilli() - int64(15*60*1000) + return client.Get104PointLastLimit(ctx, req, 1) +} + +func Get104PointDuration(ctx context.Context, req *Request) ([]TV, error) { + return client.Get104PointLastLimit(ctx, req, 1) +} + +func (client *influxClient) Get104PointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { + sql := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and %s='%s' and %s='%s';", + val104Key, val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos) + if limit > 1 { + sql = fmt.Sprintf("select %s from %s where station='%s' and and %s='%s' and %s='%s' and time>=%dms order by time desc limit %d;", + val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, limit) + } + + reqData := url.Values{ + "db": {req.DB}, + "q": {sql}, + } + + return client.getTVsResp(ctx, reqData, "csv") +} + +func (client *influxClient) Get104PointDurationData(ctx context.Context, req *Request) ([]TV, error) { + sql := fmt.Sprintf("select %s from %s where station='%s' and %s='%s' and %s='%s' and time>=%dms and time<=%dms;", + val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, req.End) + + if req.Operate != "" && req.Step != "" && req.Default != "" { + sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and %s='%s' and %s='%s' and time>=%dms and time<=%dms group by time(%s) fill(%s);", + req.Operate, req.SubPos, req.SubPos, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, req.End, req.Step, req.Default) + } + + reqData := url.Values{ + "db": {req.DB}, + "q": {sql}, + } + + return client.getTVsResp(ctx, reqData, "csv") +} diff --git a/data/influx/common.go b/data/influx/common.go index a49df92..fbb2314 100644 --- a/data/influx/common.go +++ b/data/influx/common.go @@ -258,7 +258,7 @@ func convertJsonToF2TVs(cols []string, data [][]any) (map[string][]TV, error) { func convertCsvToTVs(data [][]string) ([]TV, error) { ret := make([]TV, 0, len(data)) - for _, row := range data[1:] { + for _, row := range data { if len(row) > 3 { ns, err := strconv.ParseInt(row[2], 10, 64) if err != nil { diff --git a/data/influx/influx.go b/data/influx/influx.go index 43e0d8a..ee79f02 100644 --- a/data/influx/influx.go +++ b/data/influx/influx.go @@ -58,6 +58,8 @@ func GetDB(tp string) (string, error) { return dbphasor, nil case "sample": return dbsample, nil + case "104": + return db104, nil } return "", errors.New("invalid type") @@ -78,6 +80,8 @@ func GetTable(tp string, mainPos string) (string, error) { } case "sample": return "sample", nil + case "104": + return tb104, nil } return "", errors.New("invalid type") diff --git a/data/influx/ssu_point.go b/data/influx/ssu_point.go index 95b57c1..80b1e5a 100644 --- a/data/influx/ssu_point.go +++ b/data/influx/ssu_point.go @@ -9,8 +9,8 @@ import ( ) const ( - dbphasor = "ssuBucket" - dbsample = "ssuBucket" + dbphasor = "influxBucket" + dbsample = "influxBucket" ) // keep consistent with telegraf diff --git a/data/postgres/measurement.go b/data/postgres/measurement.go index bf67c9f..b9423d4 100644 --- a/data/postgres/measurement.go +++ b/data/postgres/measurement.go @@ -5,6 +5,7 @@ import ( "database/sql/driver" "encoding/json" "errors" + "strconv" "sync" "sync/atomic" ) @@ -83,8 +84,8 @@ type measurement struct { type ChannelSize struct { Type int Station string - Device string - Channel string + MainPos string + SubPos string Size int IRatio float64 IPolar int @@ -234,8 +235,8 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, channelSize := ChannelSize{ Type: 1, Station: station, - Device: device, - Channel: channel, + MainPos: device, + SubPos: channel, Size: record.Size, IRatio: 1, IPolar: 1, @@ -262,7 +263,52 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, } case 2: + if rawAddr, ok := record.DataSource.Addr.(map[string]any); ok { + station, ok := rawAddr["station"].(string) + if !ok { + return errors.New("invalid station") + } + pack, ok := rawAddr["packet"].(float64) + if !ok { + return errors.New("invalid packet") + } + offset, ok := rawAddr["offset"].(float64) + if !ok { + return errors.New("invalid offset") + } + mainPos := strconv.Itoa(int(pack)) + subPos := strconv.Itoa(int(offset)) + + if _, ok := ChannelSizes.Load(record.Tag); !ok { + channelSize := ChannelSize{ + Type: 2, + Station: station, + MainPos: mainPos, + SubPos: subPos, + Size: record.Size, + IRatio: 1, + IPolar: 1, + URatio: 1, + UPolar: 1, + } + + if record.Binding != nil { + if record.Binding.CT.Index > 0 { // depends on value of index + channelSize.IRatio = record.Binding.CT.Ratio + channelSize.IPolar = record.Binding.CT.Polarity + } + if record.Binding.PT.Index > 0 { // depends on value of index + channelSize.URatio = record.Binding.PT.Ratio + channelSize.UPolar = record.Binding.PT.Polarity + } + } + + ChannelSizes.Store(record.Tag, channelSize) + } + } else { + return errors.New("invalid io_address") + } default: return errors.New("invalid data_source.type") } diff --git a/data/update_104.go b/data/update_104.go new file mode 100644 index 0000000..24db88b --- /dev/null +++ b/data/update_104.go @@ -0,0 +1,124 @@ +package data + +import ( + "context" + "datart/data/influx" + "datart/data/postgres" + "datart/data/redis" + "datart/log" + "sync" + "time" +) + +const ( + workerNum = 5 +) + +const ( + update104Duration time.Duration = time.Second * 1 +) + +type cl104Q struct { + Station string + CA string + IOA string + // Begin int64 + // End int64 +} + +type workerPool struct { + workerNum int + wg sync.WaitGroup + qChan chan cl104Q +} + +var cl104Pool *workerPool + +func init() { + cl104Pool = newWorkerPool(workerNum) +} + +func newWorkerPool(workerNum int) *workerPool { + return &workerPool{ + workerNum: workerNum, + qChan: make(chan cl104Q, 128), + } +} + +func updatingRedisCL104(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(update104Duration): + postgres.ChannelSizes.Range(func(key, value any) bool { + if channelSize, ok := value.(postgres.ChannelSize); ok && channelSize.Type == 2 { + cl104Pool.qChan <- cl104Q{ + Station: channelSize.Station, + CA: channelSize.MainPos, + IOA: channelSize.SubPos, + } + } + return true + }) + } + } + }() + + for range workerNum { + go func() { + for { + select { + case <-ctx.Done(): + return + default: + q := <-cl104Pool.qChan + + tvs, err := cl104Pool.queryInflux104(ctx, q) + if err != nil { + log.Error(err) + } else { + cl104Pool.refreshRedis(ctx, q, tvs) + } + } + } + }() + } +} + +func (wp *workerPool) queryInflux104(ctx context.Context, q cl104Q) ([]influx.TV, error) { + db, err := influx.GetDB("104") + if err != nil { + return nil, err + } + tb, err := influx.GetTable("104", q.CA) + if err != nil { + return nil, err + } + tvs, err := influx.Get104PointLast(ctx, &influx.Request{ + DB: db, + Table: tb, + Type: "104", + Station: q.Station, + MainPos: q.CA, + SubPos: q.IOA, + }) + if err != nil { + return nil, err + } + + return tvs, nil +} + +func (wp *workerPool) refreshRedis(ctx context.Context, q cl104Q, tvs []influx.TV) { + key := genRedis104Key(q.Station, q.CA, q.IOA) + + members := convertTVsToMenmbers(tvs) + + redis.ZAtomicReplace(ctx, key, members) +} + +func genRedis104Key(station string, ca string, ioa string) string { + return station + ":104:" + ca + ":" + ioa +} diff --git a/data/update_phasor.go b/data/update_phasor.go index a5d6828..d918bc1 100644 --- a/data/update_phasor.go +++ b/data/update_phasor.go @@ -19,7 +19,7 @@ func updatingRedisPhasor(ctx context.Context) { ssuType := config.Conf().ServerConf().GetSSUType() ssuChans := make(map[string]chan zUnit, len(ssuType)) for ssu := range ssuType { - ssuChans[ssu] = make(chan zUnit, 32) + ssuChans[ssu] = make(chan zUnit, 64) } go queringSSUInfluxPhasor(ctx, ssuChans) @@ -65,8 +65,8 @@ func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) } func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) { - fields := GenPhasorFields(channelSize.Channel) - f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device, + fields := GenPhasorFields(channelSize.SubPos) + f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.MainPos, fields, channelSize.Size) if err != nil { log.Error(err) @@ -78,7 +78,7 @@ func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan // } for f, tvs := range f2tvs { - key := genRedisPhasorKey(channelSize.Station, channelSize.Device, f) + key := genRedisPhasorKey(channelSize.Station, channelSize.MainPos, f) for i := range tvs { tvs[i].Value = ToPrimary(f, tvs[i].Value, diff --git a/route/ws/104up.go b/route/api/104up.go similarity index 81% rename from route/ws/104up.go rename to route/api/104up.go index b6f436d..e585742 100644 --- a/route/ws/104up.go +++ b/route/api/104up.go @@ -1,4 +1,4 @@ -package ws +package api import ( "context" @@ -33,9 +33,19 @@ type wsMessage struct { data []byte } +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + WriteBufferPool: &sync.Pool{ + New: func() any { + return make([]byte, 1024) + }, + }, +} + var upConnNum int64 -func (w *Ws) Cl104Up(ctx *gin.Context) { +func (a *Api) Cl104Up(ctx *gin.Context) { if atomic.SwapInt64(&upConnNum, 1) > 0 { ctx.JSON(http.StatusConflict, nil) return @@ -58,15 +68,15 @@ func (w *Ws) Cl104Up(ctx *gin.Context) { ctrlCh: make(chan wsMessage, 16), } - if err := setUpSessionConn104(session); err != nil { + if err := setUpSessionConn104Up(session); err != nil { log.Error(err) return } - startUpWorkers(session) + start104UpWorkers(session) } -func writeControl104(session *upSession, mt int, payload []byte) error { +func writeControl104Up(session *upSession, mt int, payload []byte) error { select { case <-session.ctx.Done(): return session.ctx.Err() @@ -77,14 +87,14 @@ func writeControl104(session *upSession, mt int, payload []byte) error { } } -func setUpSessionConn104(session *upSession) error { +func setUpSessionConn104Up(session *upSession) error { session.conn.SetPongHandler(func(appData string) error { session.conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) session.conn.SetPingHandler(func(appData string) error { - if err := writeControl104(session, websocket.PongMessage, []byte(appData)); err != nil { + if err := writeControl104Up(session, websocket.PongMessage, []byte(appData)); err != nil { session.cancel() return err } @@ -99,32 +109,32 @@ func setUpSessionConn104(session *upSession) error { return session.conn.SetReadDeadline(time.Now().Add(pongWait)) } -func startUpWorkers(session *upSession) { +func start104UpWorkers(session *upSession) { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - monitorUpWrite(session) + monitor104UpWrite(session) }() wg.Add(1) go func() { defer wg.Done() - monitorUpRead(session) + monitor104UpRead(session) }() wg.Add(1) go func() { defer wg.Done() - sendPing104(session) + sendPing104Up(session) }() wg.Wait() atomic.SwapInt64(&upConnNum, 0) } -func monitorUpWrite(session *upSession) { +func monitor104UpWrite(session *upSession) { var err error for { select { @@ -156,7 +166,7 @@ func monitorUpWrite(session *upSession) { } } -func monitorUpRead(session *upSession) { +func monitor104UpRead(session *upSession) { for { select { case <-session.ctx.Done(): @@ -211,7 +221,7 @@ func monitorUpRead(session *upSession) { } } -func sendPing104(session *upSession) { +func sendPing104Up(session *upSession) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() @@ -221,7 +231,7 @@ func sendPing104(session *upSession) { case <-session.ctx.Done(): return case <-ticker.C: - err := writeControl104(session, websocket.PingMessage, nil) + err := writeControl104Up(session, websocket.PingMessage, nil) if err != nil { session.cancel() return diff --git a/route/admin/command.go b/route/api/cmd.go similarity index 84% rename from route/admin/command.go rename to route/api/cmd.go index dd650df..c204f04 100644 --- a/route/admin/command.go +++ b/route/api/cmd.go @@ -1,4 +1,4 @@ -package admin +package api import ( "datart/data/postgres" @@ -15,7 +15,7 @@ type command struct { Args []any `json:"args"` } -func (a *Admin) PostExecuteCommand(ctx *gin.Context) { +func (a *Api) PostExecuteCommand(ctx *gin.Context) { req, err := a.checkAndGenExecuteCommandRequest(ctx) if err != nil { log.Error(err) @@ -42,7 +42,7 @@ func (a *Admin) PostExecuteCommand(ctx *gin.Context) { }) } -func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, error) { +func (a *Api) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, error) { req := new(command) err := ctx.ShouldBindJSON(req) if err != nil { diff --git a/route/api/point.go b/route/api/point.go index 598b119..5a93385 100644 --- a/route/api/point.go +++ b/route/api/point.go @@ -27,18 +27,30 @@ func (a *Api) GetPoints(ctx *gin.Context) { } var ftvs map[string][]influx.TV - switch { - case request.Begin > 0 && request.End > 0: - ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request) + switch request.Type { + case "104": + var tvs []influx.TV + switch { + case request.Begin > 0 && request.End > 0: + tvs, err = influx.Get104PointDuration(ctx.Request.Context(), request) + default: + tvs, err = influx.Get104PointLast(ctx.Request.Context(), request) + } + ftvs = map[string][]influx.TV{strings.Join(request.Tags, "."): tvs} + case "phasor": + switch { + case request.Begin > 0 && request.End > 0: + ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request) - case request.Begin > 0 && request.End < 0: - ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1) + case request.Begin > 0 && request.End < 0: + ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1) - case request.Begin < 0 && request.End > 0: - ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1) + case request.Begin < 0 && request.End > 0: + ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1) - default: - ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1) + default: + ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1) + } } if err != nil { @@ -83,10 +95,16 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er v, ok := postgres.ChannelSizes.Load(tags[len(tags)-1]) if ok { if channelSize, ok := v.(postgres.ChannelSize); ok { - fields := data.GenPhasorFields(channelSize.Channel) + var fields []string + if typeStr == "phasor" { + fields = data.GenPhasorFields(channelSize.SubPos) + } + if typeStr == "104" { + fields = []string{channelSize.SubPos} + } station = channelSize.Station - mainPos = channelSize.Device + mainPos = channelSize.MainPos subPos = strings.Join(fields, ",") } } else { diff --git a/route/route.go b/route/route.go index e889770..110ef12 100644 --- a/route/route.go +++ b/route/route.go @@ -1,9 +1,7 @@ package route import ( - "datart/route/admin" "datart/route/api" - "datart/route/ws" "github.com/gin-gonic/gin" ) @@ -17,12 +15,6 @@ func LoadRoute(engine *gin.Engine) { ga.GET("/points", a.GetPoints) ga.GET("/events", a.GetEvents) ga.POST("/events", a.PostUpsertEvents) - - d := new(admin.Admin) - gd := engine.Group("admin") - gd.POST("/command", d.PostExecuteCommand) - - w := new(ws.Ws) - gw := engine.Group("ws") - gw.GET("/104up", w.Cl104Up) + ga.POST("/command", a.PostExecuteCommand) + ga.GET("/104up", a.Cl104Up) } diff --git a/route/ws/ws.go b/route/ws/ws.go deleted file mode 100644 index 80a0903..0000000 --- a/route/ws/ws.go +++ /dev/null @@ -1,19 +0,0 @@ -package ws - -import ( - "sync" - - "github.com/gorilla/websocket" -) - -type Ws struct{} - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 4096, - WriteBufferPool: &sync.Pool{ - New: func() any { - return make([]byte, 4096) - }, - }, -}