query and update 104

This commit is contained in:
zhuxu 2026-03-20 17:37:39 +08:00
parent a4711c553b
commit c98c797ad7
10 changed files with 278 additions and 24 deletions

View File

@ -28,6 +28,7 @@ func (p *Processes) StartDataProcessing() {
} }
updatingRedisPhasor(ctx) updatingRedisPhasor(ctx)
updatingRedisCL104(ctx)
cl104.ConnectCLs() cl104.ConnectCLs()
} }

61
data/influx/104_point.go Normal file
View File

@ -0,0 +1,61 @@
package influx
import (
"context"
"fmt"
"net/url"
"time"
)
const (
db104 = "influxBucket"
tb104 = "cl104"
)
const (
mainPos104Key = "ca"
subPos104Key = "ioa"
val104Key = "val"
)
func Get104PointLast(ctx context.Context, req *Request) ([]TV, error) {
req.Begin = time.Now().UnixMilli() - int64(15*60*1000)
return client.Get104PointLastLimit(ctx, req, 1)
}
func Get104PointDuration(ctx context.Context, req *Request) ([]TV, error) {
return client.Get104PointLastLimit(ctx, req, 1)
}
func (client *influxClient) Get104PointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
sql := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and %s='%s' and %s='%s';",
val104Key, val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos)
if limit > 1 {
sql = fmt.Sprintf("select %s from %s where station='%s' and and %s='%s' and %s='%s' and time>=%dms order by time desc limit %d;",
val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, limit)
}
reqData := url.Values{
"db": {req.DB},
"q": {sql},
}
return client.getTVsResp(ctx, reqData, "csv")
}
func (client *influxClient) Get104PointDurationData(ctx context.Context, req *Request) ([]TV, error) {
sql := fmt.Sprintf("select %s from %s where station='%s' and %s='%s' and %s='%s' and time>=%dms and time<=%dms;",
val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, req.End)
if req.Operate != "" && req.Step != "" && req.Default != "" {
sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and %s='%s' and %s='%s' and time>=%dms and time<=%dms group by time(%s) fill(%s);",
req.Operate, req.SubPos, req.SubPos, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, req.End, req.Step, req.Default)
}
reqData := url.Values{
"db": {req.DB},
"q": {sql},
}
return client.getTVsResp(ctx, reqData, "csv")
}

View File

@ -258,7 +258,7 @@ func convertJsonToF2TVs(cols []string, data [][]any) (map[string][]TV, error) {
func convertCsvToTVs(data [][]string) ([]TV, error) { func convertCsvToTVs(data [][]string) ([]TV, error) {
ret := make([]TV, 0, len(data)) ret := make([]TV, 0, len(data))
for _, row := range data[1:] { for _, row := range data {
if len(row) > 3 { if len(row) > 3 {
ns, err := strconv.ParseInt(row[2], 10, 64) ns, err := strconv.ParseInt(row[2], 10, 64)
if err != nil { if err != nil {

View File

@ -58,6 +58,8 @@ func GetDB(tp string) (string, error) {
return dbphasor, nil return dbphasor, nil
case "sample": case "sample":
return dbsample, nil return dbsample, nil
case "104":
return db104, nil
} }
return "", errors.New("invalid type") return "", errors.New("invalid type")
@ -78,6 +80,8 @@ func GetTable(tp string, mainPos string) (string, error) {
} }
case "sample": case "sample":
return "sample", nil return "sample", nil
case "104":
return tb104, nil
} }
return "", errors.New("invalid type") return "", errors.New("invalid type")

View File

@ -9,8 +9,8 @@ import (
) )
const ( const (
dbphasor = "ssuBucket" dbphasor = "influxBucket"
dbsample = "ssuBucket" dbsample = "influxBucket"
) )
// keep consistent with telegraf // keep consistent with telegraf

View File

@ -5,6 +5,7 @@ import (
"database/sql/driver" "database/sql/driver"
"encoding/json" "encoding/json"
"errors" "errors"
"strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
) )
@ -83,8 +84,8 @@ type measurement struct {
type ChannelSize struct { type ChannelSize struct {
Type int Type int
Station string Station string
Device string MainPos string
Channel string SubPos string
Size int Size int
IRatio float64 IRatio float64
IPolar int IPolar int
@ -234,8 +235,8 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize,
channelSize := ChannelSize{ channelSize := ChannelSize{
Type: 1, Type: 1,
Station: station, Station: station,
Device: device, MainPos: device,
Channel: channel, SubPos: channel,
Size: record.Size, Size: record.Size,
IRatio: 1, IRatio: 1,
IPolar: 1, IPolar: 1,
@ -262,7 +263,52 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize,
} }
case 2: case 2:
if rawAddr, ok := record.DataSource.Addr.(map[string]any); ok {
station, ok := rawAddr["station"].(string)
if !ok {
return errors.New("invalid station")
}
pack, ok := rawAddr["packet"].(float64)
if !ok {
return errors.New("invalid packet")
}
offset, ok := rawAddr["offset"].(float64)
if !ok {
return errors.New("invalid offset")
}
mainPos := strconv.Itoa(int(pack))
subPos := strconv.Itoa(int(offset))
if _, ok := ChannelSizes.Load(record.Tag); !ok {
channelSize := ChannelSize{
Type: 2,
Station: station,
MainPos: mainPos,
SubPos: subPos,
Size: record.Size,
IRatio: 1,
IPolar: 1,
URatio: 1,
UPolar: 1,
}
if record.Binding != nil {
if record.Binding.CT.Index > 0 { // depends on value of index
channelSize.IRatio = record.Binding.CT.Ratio
channelSize.IPolar = record.Binding.CT.Polarity
}
if record.Binding.PT.Index > 0 { // depends on value of index
channelSize.URatio = record.Binding.PT.Ratio
channelSize.UPolar = record.Binding.PT.Polarity
}
}
ChannelSizes.Store(record.Tag, channelSize)
} else {
return errors.New("invalid io_address")
}
}
default: default:
return errors.New("invalid data_source.type") return errors.New("invalid data_source.type")
} }

124
data/update_104.go Normal file
View File

@ -0,0 +1,124 @@
package data
import (
"context"
"datart/data/influx"
"datart/data/postgres"
"datart/data/redis"
"datart/log"
"sync"
"time"
)
const (
workerNum = 5
)
const (
update104Duration time.Duration = time.Second * 1
)
type cl104Q struct {
Station string
CA string
IOA string
// Begin int64
// End int64
}
type workerPool struct {
workerNum int
wg sync.WaitGroup
qChan chan cl104Q
}
var cl104Pool *workerPool
func init() {
cl104Pool = newWorkerPool(workerNum)
}
func newWorkerPool(workerNum int) *workerPool {
return &workerPool{
workerNum: workerNum,
qChan: make(chan cl104Q, 128),
}
}
func updatingRedisCL104(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(update104Duration):
postgres.ChannelSizes.Range(func(key, value any) bool {
if channelSize, ok := value.(postgres.ChannelSize); ok && channelSize.Type == 2 {
cl104Pool.qChan <- cl104Q{
Station: channelSize.Station,
CA: channelSize.MainPos,
IOA: channelSize.SubPos,
}
}
return true
})
}
}
}()
for range workerNum {
go func() {
for {
select {
case <-ctx.Done():
return
default:
q := <-cl104Pool.qChan
tvs, err := cl104Pool.queryInflux104(ctx, q)
if err != nil {
log.Error(err)
} else {
cl104Pool.refreshRedis(ctx, q, tvs)
}
}
}
}()
}
}
func (wp *workerPool) queryInflux104(ctx context.Context, q cl104Q) ([]influx.TV, error) {
db, err := influx.GetDB("104")
if err != nil {
return nil, err
}
tb, err := influx.GetTable("104", q.CA)
if err != nil {
return nil, err
}
tvs, err := influx.Get104PointLast(ctx, &influx.Request{
DB: db,
Table: tb,
Type: "104",
Station: q.Station,
MainPos: q.CA,
SubPos: q.IOA,
})
if err != nil {
return nil, err
}
return tvs, nil
}
func (wp *workerPool) refreshRedis(ctx context.Context, q cl104Q, tvs []influx.TV) {
key := genRedis104Key(q.Station, q.CA, q.IOA)
members := convertTVsToMenmbers(tvs)
redis.ZAtomicReplace(ctx, key, members)
}
func genRedis104Key(station string, ca string, ioa string) string {
return station + ":104:" + ca + ":" + ioa
}

View File

@ -19,7 +19,7 @@ func updatingRedisPhasor(ctx context.Context) {
ssuType := config.Conf().ServerConf().GetSSUType() ssuType := config.Conf().ServerConf().GetSSUType()
ssuChans := make(map[string]chan zUnit, len(ssuType)) ssuChans := make(map[string]chan zUnit, len(ssuType))
for ssu := range ssuType { for ssu := range ssuType {
ssuChans[ssu] = make(chan zUnit, 32) ssuChans[ssu] = make(chan zUnit, 64)
} }
go queringSSUInfluxPhasor(ctx, ssuChans) go queringSSUInfluxPhasor(ctx, ssuChans)
@ -65,8 +65,8 @@ func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit)
} }
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) { func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
fields := GenPhasorFields(channelSize.Channel) fields := GenPhasorFields(channelSize.SubPos)
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device, f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.MainPos,
fields, channelSize.Size) fields, channelSize.Size)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
@ -78,7 +78,7 @@ func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan
// } // }
for f, tvs := range f2tvs { for f, tvs := range f2tvs {
key := genRedisPhasorKey(channelSize.Station, channelSize.Device, f) key := genRedisPhasorKey(channelSize.Station, channelSize.MainPos, f)
for i := range tvs { for i := range tvs {
tvs[i].Value = ToPrimary(f, tvs[i].Value, tvs[i].Value = ToPrimary(f, tvs[i].Value,

View File

@ -27,18 +27,30 @@ func (a *Api) GetPoints(ctx *gin.Context) {
} }
var ftvs map[string][]influx.TV var ftvs map[string][]influx.TV
switch { switch request.Type {
case request.Begin > 0 && request.End > 0: case "104":
ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request) var tvs []influx.TV
switch {
case request.Begin > 0 && request.End > 0:
tvs, err = influx.Get104PointDuration(ctx.Request.Context(), request)
default:
tvs, err = influx.Get104PointLast(ctx.Request.Context(), request)
}
ftvs = map[string][]influx.TV{strings.Join(request.Tags, "."): tvs}
case "phasor":
switch {
case request.Begin > 0 && request.End > 0:
ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request)
case request.Begin > 0 && request.End < 0: case request.Begin > 0 && request.End < 0:
ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1) ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1)
case request.Begin < 0 && request.End > 0: case request.Begin < 0 && request.End > 0:
ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1) ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1)
default: default:
ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1) ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1)
}
} }
if err != nil { if err != nil {
@ -83,10 +95,16 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er
v, ok := postgres.ChannelSizes.Load(tags[len(tags)-1]) v, ok := postgres.ChannelSizes.Load(tags[len(tags)-1])
if ok { if ok {
if channelSize, ok := v.(postgres.ChannelSize); ok { if channelSize, ok := v.(postgres.ChannelSize); ok {
fields := data.GenPhasorFields(channelSize.Channel) var fields []string
if typeStr == "phasor" {
fields = data.GenPhasorFields(channelSize.SubPos)
}
if typeStr == "104" {
fields = []string{channelSize.SubPos}
}
station = channelSize.Station station = channelSize.Station
mainPos = channelSize.Device mainPos = channelSize.MainPos
subPos = strings.Join(fields, ",") subPos = strings.Join(fields, ",")
} }
} else { } else {

View File

@ -10,10 +10,10 @@ type Ws struct{}
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
ReadBufferSize: 1024, ReadBufferSize: 1024,
WriteBufferSize: 4096, WriteBufferSize: 1024,
WriteBufferPool: &sync.Pool{ WriteBufferPool: &sync.Pool{
New: func() any { New: func() any {
return make([]byte, 4096) return make([]byte, 1024)
}, },
}, },
} }