dataRT/data/update_phasor.go

152 lines
3.6 KiB
Go

package data
import (
"context"
"datart/config"
"datart/data/influx"
"datart/data/postgres"
"datart/data/redis"
"datart/log"
"strings"
"time"
)
const (
duration time.Duration = time.Second * 5
)
func updatingRedisPhasor(ctx context.Context) {
ssuType := config.Conf().ServerConf().GetSSUType()
ssuChans := make(map[string]chan zUnit, len(ssuType))
for ssu := range ssuType {
ssuChans[ssu] = make(chan zUnit, 32)
}
go queringSSUInfluxPhasor(ctx, ssuChans)
go updatingSSURedisZUnit(ctx, ssuChans)
}
func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit) {
ssuType := config.Conf().ServerConf().GetSSUType()
for ssu := range ssuType {
go func(ssu string) {
timer := time.Tick(duration)
for {
select {
case <-timer:
channelSizes := postgres.GetSSU2ChannelSizesFor(ssu)
for _, channelSize := range channelSizes {
sendSSUZUnit(ctx, channelSize, ssuChans[ssu])
}
case <-ctx.Done():
return
}
}
}(ssu)
}
}
func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) {
ssuType := config.Conf().ServerConf().GetSSUType()
for ssu := range ssuType {
go func(ssu string) {
for {
select {
case unit := <-ssuChans[ssu]:
if err := updateZUnitToRedis(ctx, unit); err != nil {
log.Error(err)
}
case <-ctx.Done():
return
}
}
}(ssu)
}
}
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
fields := genPhasorFields(channelSize.Channel)
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device,
fields, channelSize.Size)
if err != nil {
log.Error(err)
}
for field, tvs := range f2tvs {
key := genRedisPhasorKey(channelSize.Station, channelSize.Device, field)
members := convertTVsToMenmbers(tvs)
ssuChan <- zUnit{
Key: key,
Members: members,
}
}
}
func queryInfluxPhasor(ctx context.Context, station string, device string,
fileds []string, size int) (map[string][]influx.TV, error) {
bucket, err := influx.GetDB("phasor")
if err != nil {
return nil, err
}
measure, err := influx.GetTable("phasor", device)
if err != nil {
return nil, err
}
req := &influx.Request{
DB: bucket,
Table: measure,
Type: "phasor",
Station: station,
MainPos: device,
SubPos: strings.Join(fileds, ","),
}
return influx.GetSSUPointsLastLimit(ctx, req, size)
}
func updateZUnitToRedis(ctx context.Context, unit zUnit) error {
return redis.ZAtomicReplace(ctx, unit.Key, unit.Members)
}
func genPhasorFields(channel string) []string {
fields := make([]string, 0, 3)
switch {
case strings.HasPrefix(channel, postgres.ChannelYCPrefix):
fieldPrefix := strings.ToLower(channel)
fields = append(fields,
fieldPrefix+influx.FieldSuffixAMP,
fieldPrefix+influx.FieldSuffixPA,
fieldPrefix+influx.FieldSuffixRMS)
case strings.HasPrefix(channel, postgres.ChannelYXPrefix):
fields = append(fields, strings.ToLower(channel))
case strings.HasPrefix(channel, postgres.ChannelUPrefix):
fieldUPrefix := strings.ToLower(channel)
fields = append(fields,
fieldUPrefix+influx.FieldSuffixAMP,
fieldUPrefix+influx.FieldSuffixPA,
fieldUPrefix+influx.FieldSuffixRMS)
case channel == postgres.ChannelP,
channel == postgres.ChannelQ,
channel == postgres.ChannelS,
channel == postgres.ChannelPF,
channel == postgres.ChannelF,
channel == postgres.ChannelDF:
fields = append(fields, strings.ToLower(channel))
}
return fields
}
func genRedisPhasorKey(station, device, field string) string {
return strings.Join([]string{station, device, "phasor", field}, ":")
}