126 lines
2.9 KiB
Go
126 lines
2.9 KiB
Go
package data
|
|
|
|
import (
|
|
"context"
|
|
"datart/config"
|
|
"datart/data/influx"
|
|
"datart/data/postgres"
|
|
"datart/data/redis"
|
|
"datart/log"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
updatePhasorDuration time.Duration = 5 * time.Second
|
|
)
|
|
|
|
func updatingRedisPhasor(ctx context.Context) {
|
|
ssuType := config.Conf().ServerConf().GetSSUType()
|
|
ssuChans := make(map[string]chan zUnit, len(ssuType))
|
|
for ssu := range ssuType {
|
|
ssuChans[ssu] = make(chan zUnit, 32)
|
|
}
|
|
|
|
go queringSSUInfluxPhasor(ctx, ssuChans)
|
|
go updatingSSURedisZUnit(ctx, ssuChans)
|
|
}
|
|
|
|
func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit) {
|
|
ssuType := config.Conf().ServerConf().GetSSUType()
|
|
for ssu := range ssuType {
|
|
go func(ssu string) {
|
|
timer := time.Tick(updatePhasorDuration)
|
|
for {
|
|
select {
|
|
case <-timer:
|
|
channelSizes := postgres.GetSSU2ChannelSizesFor(ssu)
|
|
for _, channelSize := range channelSizes {
|
|
sendSSUZUnit(ctx, channelSize, ssuChans[ssu])
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}(ssu)
|
|
}
|
|
}
|
|
|
|
func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) {
|
|
ssuType := config.Conf().ServerConf().GetSSUType()
|
|
for ssu := range ssuType {
|
|
go func(ssu string) {
|
|
for {
|
|
select {
|
|
case unit := <-ssuChans[ssu]:
|
|
if err := updateZUnitToRedis(ctx, unit); err != nil {
|
|
log.Error(err)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}(ssu)
|
|
}
|
|
}
|
|
|
|
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
|
|
fields := GenPhasorFields(channelSize.Channel)
|
|
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device,
|
|
fields, channelSize.Size)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
|
|
// if len(f2tvs) <= 0 {
|
|
// log.Debug(channelSize.Station, " ", channelSize.Device, " ",
|
|
// fields, " query none of ", channelSize.Size)
|
|
// }
|
|
|
|
for f, tvs := range f2tvs {
|
|
sdf := strings.Split(f, ".")
|
|
if len(sdf) != 3 {
|
|
log.Error("invalid influx field")
|
|
return
|
|
}
|
|
|
|
key := genRedisPhasorKey(sdf[0], sdf[1], sdf[2])
|
|
members := convertTVsToMenmbers(tvs)
|
|
ssuChan <- zUnit{
|
|
Key: key,
|
|
Members: members,
|
|
}
|
|
}
|
|
}
|
|
|
|
func queryInfluxPhasor(ctx context.Context, station string, device string,
|
|
fileds []string, size int) (map[string][]influx.TV, error) {
|
|
bucket, err := influx.GetDB("phasor")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
measure, err := influx.GetTable("phasor", device)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req := &influx.Request{
|
|
DB: bucket,
|
|
Table: measure,
|
|
Type: "phasor",
|
|
Station: station,
|
|
MainPos: device,
|
|
SubPos: strings.Join(fileds, ","),
|
|
}
|
|
return influx.GetSSUPointsLastLimit(ctx, req, size)
|
|
}
|
|
|
|
func updateZUnitToRedis(ctx context.Context, unit zUnit) error {
|
|
return redis.ZAtomicReplace(ctx, unit.Key, unit.Members)
|
|
}
|
|
|
|
func genRedisPhasorKey(station, device, field string) string {
|
|
return strings.Join([]string{station, device, "phasor", field}, ":")
|
|
}
|