package data import ( "context" "datart/config" "datart/data/influx" "datart/data/postgres" "datart/data/redis" "datart/log" "strings" "time" ) const ( duration time.Duration = time.Second * 5 ) func updatingRedisPhasor(ctx context.Context) { ssuType := config.Conf().ServerConf().GetSSUType() ssuChans := make(map[string]chan zUnit, len(ssuType)) for ssu := range ssuType { ssuChans[ssu] = make(chan zUnit, 32) } go queringSSUInfluxPhasor(ctx, ssuChans) go updatingSSURedisZUnit(ctx, ssuChans) } func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit) { ssuType := config.Conf().ServerConf().GetSSUType() for ssu := range ssuType { go func(ssu string) { timer := time.Tick(duration) for { select { case <-timer: channelSizes := postgres.GetSSU2ChannelSizesFor(ssu) for _, channelSize := range channelSizes { sendSSUZUnit(ctx, channelSize, ssuChans[ssu]) } case <-ctx.Done(): return } } }(ssu) } } func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) { ssuType := config.Conf().ServerConf().GetSSUType() for ssu := range ssuType { go func(ssu string) { for { select { case unit := <-ssuChans[ssu]: if err := updateZUnitToRedis(ctx, unit); err != nil { log.Error(err) } case <-ctx.Done(): return } } }(ssu) } } func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) { fields := genPhasorFields(channelSize.Channel) f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device, fields, channelSize.Size) if err != nil { log.Error(err) } for field, tvs := range f2tvs { key := genRedisPhasorKey(channelSize.Station, channelSize.Device, field) members := convertTVsToMenmbers(tvs) ssuChan <- zUnit{ Key: key, Members: members, } } } func queryInfluxPhasor(ctx context.Context, station string, device string, fileds []string, size int) (map[string][]influx.TV, error) { bucket, err := influx.GetDB("phasor") if err != nil { return nil, err } measure, err := influx.GetTable("phasor", device) if err != nil { return nil, err } req := &influx.Request{ DB: bucket, Table: measure, Type: "phasor", Station: station, MainPos: device, SubPos: strings.Join(fileds, ","), } return influx.GetSSUPointsLastLimit(ctx, req, size) } func updateZUnitToRedis(ctx context.Context, unit zUnit) error { return redis.ZAtomicReplace(ctx, unit.Key, unit.Members) } func genPhasorFields(channel string) []string { fields := make([]string, 0, 3) switch { case strings.HasPrefix(channel, postgres.ChannelYCPrefix): fieldPrefix := strings.ToLower(channel) fields = append(fields, fieldPrefix+influx.FieldSuffixAMP, fieldPrefix+influx.FieldSuffixPA, fieldPrefix+influx.FieldSuffixRMS) case strings.HasPrefix(channel, postgres.ChannelYXPrefix): fields = append(fields, strings.ToLower(channel)) case strings.HasPrefix(channel, postgres.ChannelUPrefix): fieldUPrefix := strings.ToLower(channel) fields = append(fields, fieldUPrefix+influx.FieldSuffixAMP, fieldUPrefix+influx.FieldSuffixPA, fieldUPrefix+influx.FieldSuffixRMS) case channel == postgres.ChannelP, channel == postgres.ChannelQ, channel == postgres.ChannelS, channel == postgres.ChannelPF, channel == postgres.ChannelF, channel == postgres.ChannelDF: fields = append(fields, strings.ToLower(channel)) } return fields } func genRedisPhasorKey(station, device, field string) string { return strings.Join([]string{station, device, "phasor", field}, ":") }