dataRT/data/update_phasor.go

143 lines
3.8 KiB
Go
Raw Normal View History

2025-10-11 14:56:11 +08:00
package data
2025-10-23 18:02:29 +08:00
import (
"context"
"datart/config"
"datart/data/influx"
"datart/data/postgres"
"datart/data/redis"
"datart/log"
"strings"
"time"
)
const (
duration time.Duration = time.Second * 5
)
func updatingRedisPhasor(ctx context.Context) {
ssuType := config.Conf().ServerConf().GetSSUType()
ssuChans := make(map[string]chan zUnit, len(ssuType))
for ssu := range ssuType {
ssuChans[ssu] = make(chan zUnit, 32)
}
go queringSSUInfluxPhasor(ctx, ssuChans)
go updatingSSURedisZUnit(ctx, ssuChans)
}
func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit) {
ssuType := config.Conf().ServerConf().GetSSUType()
for ssu := range ssuType {
go func(ssu string) {
timer := time.Tick(duration)
for {
select {
case <-timer:
channelSizes := postgres.GetSSU2ChannelSizesFor(ssu)
for _, channelSize := range channelSizes {
sendSSUZUnit(ctx, channelSize, ssuChans[ssu])
}
case <-ctx.Done():
return
}
}
}(ssu)
}
}
func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) {
ssuType := config.Conf().ServerConf().GetSSUType()
for ssu := range ssuType {
go func(ssu string) {
for {
select {
case unit := <-ssuChans[ssu]:
if err := updateZUnitToRedis(ctx, unit); err != nil {
log.Error(err)
}
case <-ctx.Done():
return
}
}
}(ssu)
}
}
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
fields := genPhasorFields(channelSize.Channel)
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device,
fields, channelSize.Size)
if err != nil {
log.Error(err)
}
for field, tvs := range f2tvs {
key := genRedisPhasorKey(channelSize.Station, channelSize.Device, field)
members := convertTVsToMenmbers(tvs)
ssuChan <- zUnit{
Key: key,
Members: members,
}
}
}
func queryInfluxPhasor(ctx context.Context, station string, device string,
fileds []string, size int) (map[string][]influx.TV, error) {
measure, err := influx.GetMeasurement("phasor", device)
if err != nil {
return nil, err
}
bucket, err := influx.GetBucket("phasor")
if err != nil {
return nil, err
}
req := &influx.Request{
RespType: "csv",
Bucket: bucket,
Measure: measure,
Station: station,
MainPos: device,
SubPos: strings.Join(fileds, ","),
}
return influx.GetSSUPointsLastLimit(ctx, req, size)
}
func updateZUnitToRedis(ctx context.Context, unit zUnit) error {
return redis.ZAtomicReplace(ctx, unit.Key, unit.Members)
}
func genPhasorFields(channel string) []string {
fields := make([]string, 0, 3)
switch {
case strings.HasPrefix(channel, postgres.ChannelCPrefix):
if after, ok := strings.CutPrefix(channel, postgres.ChannelCPrefix); ok {
fields = append(fields,
influx.FieldCPrifix+after+influx.FieldSuffixAMP,
influx.FieldCPrifix+after+influx.FieldSuffixPA,
influx.FieldCPrifix+after+influx.FieldSuffixRMS)
}
case strings.HasPrefix(channel, postgres.ChannelIPrefix):
if after, ok := strings.CutPrefix(channel, postgres.ChannelCPrefix); ok {
fields = append(fields, influx.FieldIPrefix+after)
}
case strings.HasPrefix(channel, postgres.ChannelUPrefix):
fieldUPrefix := strings.ToLower(channel)
fields = append(fields, fieldUPrefix+influx.FieldSuffixAMP,
fieldUPrefix+influx.FieldSuffixPA, fieldUPrefix+influx.FieldSuffixRMS)
case channel == postgres.ChannelDF:
fields = append(fields, influx.FieldDF)
case channel == postgres.ChannelP, channel == postgres.ChannelQ,
channel == postgres.ChannelS, channel == postgres.ChannelPF,
channel == postgres.ChannelF:
fields = append(fields, strings.ToLower(channel))
}
return fields
}
func genRedisPhasorKey(station, device, field string) string {
return strings.Join([]string{station, device, "phasor", field}, ":")
}