From ebd375d6c7762749b27360750998527542211532 Mon Sep 17 00:00:00 2001 From: zhuxu Date: Wed, 4 Mar 2026 09:42:18 +0800 Subject: [PATCH] add i/u ratio --- configs/postgres.json | 8 +++--- data/common.go | 16 ++++++++++++ data/influx/common.go | 6 ----- data/influx/influx.go | 1 + data/influx/ssu_point.go | 48 +++--------------------------------- data/mongo/alarm.go | 2 +- data/mongo/event.go | 2 +- data/postgres/measurement.go | 48 ++++++++++++++++++++++++++++++++++++ data/update_phasor.go | 10 ++++---- route/admin/command.go | 8 +++--- route/api/point.go | 48 ++++++++++++++++++------------------ 11 files changed, 108 insertions(+), 89 deletions(-) diff --git a/configs/postgres.json b/configs/postgres.json index 06b783f..6b50ff4 100644 --- a/configs/postgres.json +++ b/configs/postgres.json @@ -1,10 +1,10 @@ { "default":{ - "host":"127.0.0.1", - "port":5432, + "host":"192.168.46.100", + "port":9432, "user":"postgres", - "password":"password", - "dbname":"postgres", + "password":"123RTYjkl", + "dbname":"metamodule", "timezone":"Asia/Shanghai" } } \ No newline at end of file diff --git a/data/common.go b/data/common.go index 8ce7f6c..a00cad7 100644 --- a/data/common.go +++ b/data/common.go @@ -3,6 +3,7 @@ package data import ( "datart/data/influx" "datart/data/postgres" + "math" "strings" "github.com/redis/go-redis/v9" @@ -45,6 +46,21 @@ func GenPhasorFields(channel string) []string { return fields } +func ToPrimary(field string, value, iRatio, uRatio float64, iPolar int) float64 { + switch { + case field == "p", field == "q": + return value * iRatio * uRatio * float64(iPolar) + case strings.HasSuffix(field, "_pa"): + if iPolar < 0 { + return value + math.Pi + } + default: + return value * iRatio * float64(iPolar) + } + + return value +} + type zUnit struct { Key string Members []redis.Z diff --git a/data/influx/common.go b/data/influx/common.go index e2bf66f..a49df92 100644 --- a/data/influx/common.go +++ b/data/influx/common.go @@ -77,9 +77,6 @@ type fields struct { Values [][]any `json:"values"` } -// respType json/csv -// json_time:"2024-12-18T08:12:21.4735154Z" -// csv_time:"1734572793695885000" func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values, respType string) ([]TV, error) { request, err := http.NewRequestWithContext(ctx, http.MethodGet, @@ -112,9 +109,6 @@ func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values, return respDataToTVs(respData, respType) } -// respType json/csv -// json_time:"2024-12-18T08:12:21.4735154Z" -// csv_time:"1734572793695885000" func (client *influxClient) getF2TVsResp(ctx context.Context, reqData url.Values, respType string) (map[string][]TV, error) { request, err := http.NewRequestWithContext(ctx, http.MethodGet, diff --git a/data/influx/influx.go b/data/influx/influx.go index e1500f5..43e0d8a 100644 --- a/data/influx/influx.go +++ b/data/influx/influx.go @@ -90,6 +90,7 @@ func WriteLinesData(ctx context.Context, db string, data []byte) error { type Request struct { DB string Table string + Tags []string Type string Station string diff --git a/data/influx/ssu_point.go b/data/influx/ssu_point.go index 9c0946e..95b57c1 100644 --- a/data/influx/ssu_point.go +++ b/data/influx/ssu_point.go @@ -100,17 +100,7 @@ func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Requ "q": {sql}, } - f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv") - if err != nil { - return f2tvs, nil - } - - ret := make(map[string][]TV, len(f2tvs)) - for f, tvs := range f2tvs { - ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs - } - - return ret, nil + return client.getF2TVsResp(ctx, reqData, "csv") } func (client *influxClient) GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) { @@ -149,17 +139,7 @@ func (client *influxClient) GetSSUPointsDurationData(ctx context.Context, req *R "q": {sql}, } - f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv") - if err != nil { - return f2tvs, nil - } - - ret := make(map[string][]TV, len(f2tvs)) - for f, tvs := range f2tvs { - ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs - } - - return ret, nil + return client.getF2TVsResp(ctx, reqData, "csv") } func (client *influxClient) GetSSUPointAfterLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { @@ -183,17 +163,7 @@ func (client *influxClient) GetSSUPointsAfterLimit(ctx context.Context, req *Req "q": {sql}, } - f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv") - if err != nil { - return f2tvs, nil - } - - ret := make(map[string][]TV, len(f2tvs)) - for f, tvs := range f2tvs { - ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs - } - - return ret, nil + return client.getF2TVsResp(ctx, reqData, "csv") } func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { @@ -203,17 +173,7 @@ func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Re req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)}, } - f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv") - if err != nil { - return f2tvs, nil - } - - ret := make(map[string][]TV, len(f2tvs)) - for f, tvs := range f2tvs { - ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs - } - - return ret, nil + return client.getF2TVsResp(ctx, reqData, "csv") } func (client *influxClient) WriteLinesData(ctx context.Context, db string, data []byte) error { diff --git a/data/mongo/alarm.go b/data/mongo/alarm.go index 111dd87..3231938 100644 --- a/data/mongo/alarm.go +++ b/data/mongo/alarm.go @@ -118,7 +118,7 @@ func (a *Alarm) ConvertToEvent(ip string) (*Event, error) { OP: ip, TS: a.AlarmTime, }) - e.Alarm = a + e.Origin = a } return e, nil diff --git a/data/mongo/event.go b/data/mongo/event.go index 33432fa..51e0bb0 100644 --- a/data/mongo/event.go +++ b/data/mongo/event.go @@ -68,7 +68,7 @@ type Event struct { From string `bson:"from" json:"from"` Operations []*operation `bson:"operations" json:"operations"` // others - Alarm *Alarm `bson:"alarm" json:"alarm"` + Origin *Alarm `bson:"origin" json:"origin"` } func (e *Event) Marshall() ([]byte, error) { diff --git a/data/postgres/measurement.go b/data/postgres/measurement.go index d6b3619..bf67c9f 100644 --- a/data/postgres/measurement.go +++ b/data/postgres/measurement.go @@ -45,19 +45,51 @@ func (ds *dataSource) Value() (driver.Value, error) { return json.Marshal(ds) } +type bind struct { + Index int `json:"index"` + Ratio float64 `json:"ratio"` + Polarity int `json:"polarity"` +} + +type binding struct { + CT bind `json:"ct"` + PT bind `json:"pt"` +} + +func (b *binding) Scan(value any) error { + if value == nil { + return nil + } + bytes, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + return json.Unmarshal(bytes, b) +} + +func (b *binding) Value() (driver.Value, error) { + return json.Marshal(b) +} + type measurement struct { ID int64 `gorm:"colunmn:id"` Tag string `gorm:"column:tag"` Size int `gorm:"column:size"` DataSource *dataSource `gorm:"column:data_source;type:jsonb"` + Binding *binding `gorm:"column:binding;type:josnb"` // others } type ChannelSize struct { + Type int Station string Device string Channel string Size int + IRatio float64 + IPolar int + URatio float64 + UPolar int } // channel is original @@ -200,10 +232,26 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, } channelSize := ChannelSize{ + Type: 1, Station: station, Device: device, Channel: channel, Size: record.Size, + IRatio: 1, + IPolar: 1, + URatio: 1, + UPolar: 1, + } + + if record.Binding != nil { + if record.Binding.CT.Index > 0 { + channelSize.IRatio = record.Binding.CT.Ratio + channelSize.IPolar = record.Binding.CT.Polarity + } + if record.Binding.PT.Index > 0 { + channelSize.URatio = record.Binding.PT.Ratio + channelSize.UPolar = record.Binding.PT.Polarity + } } ChannelSizes.Store(record.Tag, channelSize) diff --git a/data/update_phasor.go b/data/update_phasor.go index b2e3470..a5d6828 100644 --- a/data/update_phasor.go +++ b/data/update_phasor.go @@ -78,13 +78,13 @@ func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan // } for f, tvs := range f2tvs { - sdf := strings.Split(f, ".") - if len(sdf) != 3 { - log.Error("invalid influx field") - return + key := genRedisPhasorKey(channelSize.Station, channelSize.Device, f) + + for i := range tvs { + tvs[i].Value = ToPrimary(f, tvs[i].Value, + channelSize.IRatio, channelSize.URatio, channelSize.IPolar) } - key := genRedisPhasorKey(sdf[0], sdf[1], sdf[2]) members := convertTVsToMenmbers(tvs) ssuChan <- zUnit{ Key: key, diff --git a/route/admin/command.go b/route/admin/command.go index 002399a..dd650df 100644 --- a/route/admin/command.go +++ b/route/admin/command.go @@ -10,9 +10,9 @@ import ( ) type command struct { - Command string `json:"command"` - Timeout int64 `json:"timeout"` - Args []any `json:"args"` + Command int `json:"command"` + Timeout int64 `json:"timeout"` + Args []any `json:"args"` } func (a *Admin) PostExecuteCommand(ctx *gin.Context) { @@ -49,7 +49,7 @@ func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, er return req, errors.New("invalid body param") } - if req.Command != "GenSSU2ChannelSizes" { + if req.Command != 1 { return nil, errors.New("invalid command") } diff --git a/route/api/point.go b/route/api/point.go index a8e4b9b..598b119 100644 --- a/route/api/point.go +++ b/route/api/point.go @@ -26,19 +26,19 @@ func (a *Api) GetPoints(ctx *gin.Context) { return } - var data map[string][]influx.TV + var ftvs map[string][]influx.TV switch { case request.Begin > 0 && request.End > 0: - data, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request) + ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request) case request.Begin > 0 && request.End < 0: - data, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1) + ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1) case request.Begin < 0 && request.End > 0: - data, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1) + ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1) default: - data, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1) + ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1) } if err != nil { @@ -52,10 +52,21 @@ func (a *Api) GetPoints(ctx *gin.Context) { return } + // handle key value + v, _ := postgres.ChannelSizes.Load(request.Tags[len(request.Tags)-1]) + if channelSize, ok := v.(postgres.ChannelSize); ok { + for f, tvs := range ftvs { + for i := range tvs { + tvs[i].Value = data.ToPrimary(f, tvs[i].Value, + channelSize.IRatio, channelSize.URatio, channelSize.IPolar) + } + } + } + ctx.JSON(200, gin.H{ "code": 0, "msg": "success", - "data": data, + "data": ftvs, }) } @@ -68,8 +79,8 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er station, mainPos, subPos := "", "", "" - mtag := ctx.DefaultQuery("mtag", "") - v, ok := postgres.ChannelSizes.Load(mtag) + tags := strings.Split(ctx.DefaultQuery("mtag", ""), ".") + v, ok := postgres.ChannelSizes.Load(tags[len(tags)-1]) if ok { if channelSize, ok := v.(postgres.ChannelSize); ok { fields := data.GenPhasorFields(channelSize.Channel) @@ -79,20 +90,7 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er subPos = strings.Join(fields, ",") } } else { - station = ctx.DefaultQuery("station", "") - if len(station) <= 0 { - return nil, errors.New("invalid station") - } - - mainPos = ctx.DefaultQuery("main_pos", "") - if len(mainPos) <= 0 { - return nil, errors.New("invalid main_pos") - } - - subPos = ctx.DefaultQuery("sub_pos", "") - if len(subPos) <= 0 { - return nil, errors.New("invalid sub_pos") - } + return nil, errors.New("invalid mtag") } beginStr := ctx.DefaultQuery("begin", "") @@ -120,8 +118,10 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er } return &influx.Request{ - DB: bucket, - Table: measure, + DB: bucket, + Table: measure, + Tags: tags, + Type: typeStr, Station: station, MainPos: mainPos,