dataRT/data/update_phasor.go

126 lines
2.9 KiB
Go

package data
import (
"context"
"datart/config"
"datart/data/influx"
"datart/data/postgres"
"datart/data/redis"
"datart/log"
"strings"
"time"
)
const (
updatePhasorDuration time.Duration = 5 * time.Second
)
func updatingRedisPhasor(ctx context.Context) {
ssuType := config.Conf().ServerConf().GetSSUType()
ssuChans := make(map[string]chan zUnit, len(ssuType))
for ssu := range ssuType {
ssuChans[ssu] = make(chan zUnit, 32)
}
go queringSSUInfluxPhasor(ctx, ssuChans)
go updatingSSURedisZUnit(ctx, ssuChans)
}
func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit) {
ssuType := config.Conf().ServerConf().GetSSUType()
for ssu := range ssuType {
go func(ssu string) {
timer := time.Tick(updatePhasorDuration)
for {
select {
case <-timer:
channelSizes := postgres.GetSSU2ChannelSizesFor(ssu)
for _, channelSize := range channelSizes {
sendSSUZUnit(ctx, channelSize, ssuChans[ssu])
}
case <-ctx.Done():
return
}
}
}(ssu)
}
}
func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) {
ssuType := config.Conf().ServerConf().GetSSUType()
for ssu := range ssuType {
go func(ssu string) {
for {
select {
case unit := <-ssuChans[ssu]:
if err := updateZUnitToRedis(ctx, unit); err != nil {
log.Error(err)
}
case <-ctx.Done():
return
}
}
}(ssu)
}
}
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
fields := GenPhasorFields(channelSize.Channel)
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device,
fields, channelSize.Size)
if err != nil {
log.Error(err)
}
// if len(f2tvs) <= 0 {
// log.Debug(channelSize.Station, " ", channelSize.Device, " ",
// fields, " query none of ", channelSize.Size)
// }
for f, tvs := range f2tvs {
sdf := strings.Split(f, ".")
if len(sdf) != 3 {
log.Error("invalid influx field")
return
}
key := genRedisPhasorKey(sdf[0], sdf[1], sdf[2])
members := convertTVsToMenmbers(tvs)
ssuChan <- zUnit{
Key: key,
Members: members,
}
}
}
func queryInfluxPhasor(ctx context.Context, station string, device string,
fileds []string, size int) (map[string][]influx.TV, error) {
bucket, err := influx.GetDB("phasor")
if err != nil {
return nil, err
}
measure, err := influx.GetTable("phasor", device)
if err != nil {
return nil, err
}
req := &influx.Request{
DB: bucket,
Table: measure,
Type: "phasor",
Station: station,
MainPos: device,
SubPos: strings.Join(fileds, ","),
}
return influx.GetSSUPointsLastLimit(ctx, req, size)
}
func updateZUnitToRedis(ctx context.Context, unit zUnit) error {
return redis.ZAtomicReplace(ctx, unit.Key, unit.Members)
}
func genRedisPhasorKey(station, device, field string) string {
return strings.Join([]string{station, device, "phasor", field}, ":")
}