package data import ( "context" "datart/config" "datart/data/influx" "datart/data/postgres" "datart/data/redis" "datart/log" "strings" "time" ) const ( duration time.Duration = 5 * time.Second ) func updatingRedisPhasor(ctx context.Context) { ssuType := config.Conf().ServerConf().GetSSUType() ssuChans := make(map[string]chan zUnit, len(ssuType)) for ssu := range ssuType { ssuChans[ssu] = make(chan zUnit, 32) } go queringSSUInfluxPhasor(ctx, ssuChans) go updatingSSURedisZUnit(ctx, ssuChans) } func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit) { ssuType := config.Conf().ServerConf().GetSSUType() for ssu := range ssuType { go func(ssu string) { timer := time.Tick(duration) for { select { case <-timer: channelSizes := postgres.GetSSU2ChannelSizesFor(ssu) for _, channelSize := range channelSizes { sendSSUZUnit(ctx, channelSize, ssuChans[ssu]) } case <-ctx.Done(): return } } }(ssu) } } func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) { ssuType := config.Conf().ServerConf().GetSSUType() for ssu := range ssuType { go func(ssu string) { for { select { case unit := <-ssuChans[ssu]: if err := updateZUnitToRedis(ctx, unit); err != nil { log.Error(err) } case <-ctx.Done(): return } } }(ssu) } } func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) { fields := GenPhasorFields(channelSize.Channel) f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device, fields, channelSize.Size) if err != nil { log.Error(err) } // if len(f2tvs) <= 0 { // log.Info(channelSize.Station, " ", channelSize.Device, " ", // fields, " query none of ", channelSize.Size) // } for f, tvs := range f2tvs { sdf := strings.Split(f, ".") if len(sdf) != 3 { log.Error("invalid influx field") return } key := genRedisPhasorKey(sdf[0], sdf[1], sdf[2]) members := convertTVsToMenmbers(tvs) ssuChan <- zUnit{ Key: key, Members: members, } } } func queryInfluxPhasor(ctx context.Context, station string, device string, fileds []string, size int) (map[string][]influx.TV, error) { bucket, err := influx.GetDB("phasor") if err != nil { return nil, err } measure, err := influx.GetTable("phasor", device) if err != nil { return nil, err } req := &influx.Request{ DB: bucket, Table: measure, Type: "phasor", Station: station, MainPos: device, SubPos: strings.Join(fileds, ","), } return influx.GetSSUPointsLastLimit(ctx, req, size) } func updateZUnitToRedis(ctx context.Context, unit zUnit) error { return redis.ZAtomicReplace(ctx, unit.Key, unit.Members) } func genRedisPhasorKey(station, device, field string) string { return strings.Join([]string{station, device, "phasor", field}, ":") }