From b6e47177fbd28ef0466f105de063f0af7e4b9cf9 Mon Sep 17 00:00:00 2001 From: douxu Date: Tue, 25 Nov 2025 16:13:55 +0800 Subject: [PATCH] debugging API using single measurement point subscription case --- deploy/redis-test-data/data_injection.go | 3 +- diagram/redis_client.go | 19 ++--- handler/real_time_data_pull.go | 98 +++++++++++++++------- model/measurement_model.go | 26 +++++- real-time-data/real_time_data_computing.go | 8 +- 5 files changed, 106 insertions(+), 48 deletions(-) diff --git a/deploy/redis-test-data/data_injection.go b/deploy/redis-test-data/data_injection.go index 6146f3b..01008cd 100644 --- a/deploy/redis-test-data/data_injection.go +++ b/deploy/redis-test-data/data_injection.go @@ -611,7 +611,7 @@ func generateRandomData(baseValue float64, changes []float64, size int) []float6 // simulateDataWrite 定时生成并写入模拟数据到 Redis ZSet func simulateDataWrite(ctx context.Context, rdb *redis.Client, redisKey string, config outlierConfig, measInfo calculationResult) { log.Printf("启动数据写入程序, Redis Key: %s, 基准值: %.4f, 变化范围: %+v\n", redisKey, measInfo.BaseValue, measInfo.Changes) - ticker := time.NewTicker(10 * time.Second) + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() pipe := rdb.Pipeline() @@ -680,6 +680,7 @@ func simulateDataWrite(ctx context.Context, rdb *redis.Client, redisKey string, if err != nil { log.Printf("redis pipeline execution failed: %v", err) } + log.Printf("生成 redis 实时数据成功\n") } } } diff --git a/diagram/redis_client.go b/diagram/redis_client.go index c336fe7..283669d 100644 --- a/diagram/redis_client.go +++ b/diagram/redis_client.go @@ -20,18 +20,17 @@ func NewRedisClient() *RedisClient { } // QueryByZRangeByLex define func to query real time data from redis zset -func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64, startTimestamp, stopTimeStamp string) ([]redis.Z, error) { +func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64) ([]redis.Z, error) { client := rc.Client - - startStr := "[" + startTimestamp - stopStr := stopTimeStamp + "]" args := redis.ZRangeArgs{ - Key: key, - Start: startStr, - Stop: stopStr, - ByLex: true, - Rev: false, - Count: size, + Key: key, + Start: 0, + Stop: size, + ByScore: false, + ByLex: false, + Rev: false, + Offset: 0, + Count: 0, } return client.ZRangeArgsWithScores(ctx, args).Result() } diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index 3b8a955..05da5dc 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -5,6 +5,8 @@ import ( "context" "fmt" "net/http" + "sort" + "strconv" "time" "modelRT/constants" @@ -129,7 +131,7 @@ func readClientMessages(ctx context.Context, conn *websocket.Conn, clientID stri // process normal message from client if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage { - logger.Info(ctx, "read normal message from client", "client_id", clientID, "msg", string(msgBytes)) + logger.Info(ctx, "read normal message from client", "client_id", clientID, "content", string(msgBytes)) } } } @@ -153,7 +155,7 @@ func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget) { // ensure the fanInChan will not leak defer close(fanInChan) - + logger.Info(ctx, fmt.Sprintf("start processing real time data polling for clientID:%s", clientID)) stopChanMap := make(map[string]chan struct{}) s.globalMutex.RLock() config, confExist := s.subMap[clientID] @@ -164,8 +166,13 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin } s.globalMutex.RUnlock() + // TODO 测试log + fmt.Printf("found subscription config for clientID:%s, start initial polling goroutines, config: %+v\n", clientID, config.components) + logger.Info(ctx, fmt.Sprintf("found subscription config for clientID:%s, start initial polling goroutines", clientID), "components len", config.components) + config.mutex.RLock() for interval, componentItems := range config.components { + logger.Info(ctx, fmt.Sprintf("interval %s len of componentItems:%d\n", interval, len(componentItems.targets))) for _, target := range componentItems.targets { // add a secondary check to prevent the target from already existing in the stopChanMap if _, exists := stopChanMap[target]; exists { @@ -310,49 +317,80 @@ type redisPollingConfig struct { } func realTimeDataQueryFromRedis(ctx context.Context, config redisPollingConfig, fanInChan chan network.RealTimePullTarget, stopChan chan struct{}) { + // TODO 测试log,后续可删除 + logger.Info(ctx, "start a redis query goroutine for real time data pulling", "targetID", config.targetID, "queryKey", config.queryKey, "interval", config.interval, "dataSize", config.dataSize) duration, err := time.ParseDuration(config.interval) if err != nil { logger.Error(ctx, "failed to parse the time string", "interval", config.interval, "error", err) return } - ticker := time.NewTicker(duration * time.Second) + ticker := time.NewTicker(duration) defer ticker.Stop() client := diagram.NewRedisClient() startTimestamp := util.GenNanoTsStr() + + fmt.Printf("realTimeDataQueryFromRedis duration:%+v\n:", duration) + fmt.Printf("realTimeDataQueryFromRedis ticker:%+v\n:", ticker) + fmt.Printf("realTimeDataQueryFromRedis startTimestamp:%s\n", startTimestamp) + needPerformQuery := true for { + if needPerformQuery { + performQuery(ctx, client, config, fanInChan) + needPerformQuery = false + } + select { case <-ticker.C: - stopTimestamp := util.GenNanoTsStr() - members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize, startTimestamp, stopTimestamp) - if err != nil { - logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err) - continue - } - // use end timestamp reset start timestamp - startTimestamp = stopTimestamp - - pullDatas := make([]network.RealTimePullData, 0, len(members)) - for _, member := range members { - pullDatas = append(pullDatas, network.RealTimePullData{ - Time: member.Member.(string), - Value: member.Score, - }) - } - targetData := network.RealTimePullTarget{ - ID: config.targetID, - Datas: pullDatas, - } - - select { - case fanInChan <- targetData: - default: - // TODO[BACKPRESSURE-ISSUE] 考虑 fanInChan 阻塞,当出现大量数据阻塞查询循环并丢弃时,采取背压方式解决问题 #1 - logger.Warn(ctx, "fanInChan is full, dropping real-time data frame", "key", config.queryKey, "data_size", len(members)) - } + needPerformQuery = true case <-stopChan: logger.Info(ctx, "stop the redis query goroutine via a singal") return } } } + +func performQuery(ctx context.Context, client *diagram.RedisClient, config redisPollingConfig, fanInChan chan network.RealTimePullTarget) { + members, err := client.QueryByZRangeByLex(ctx, config.queryKey, config.dataSize) + if err != nil { + logger.Error(ctx, "query real time data from redis failed", "key", config.queryKey, "error", err) + return + } + + pullDatas := make([]network.RealTimePullData, 0, len(members)) + for _, member := range members { + pullDatas = append(pullDatas, network.RealTimePullData{ + Time: member.Member.(string), + Value: member.Score, + }) + } + sortPullDataByTimeAscending(ctx, pullDatas) + targetData := network.RealTimePullTarget{ + ID: config.targetID, + Datas: pullDatas, + } + + select { + case fanInChan <- targetData: + default: + // TODO[BACKPRESSURE-ISSUE] 考虑 fanInChan 阻塞,当出现大量数据阻塞查询循环并丢弃时,采取背压方式解决问题 #1 + logger.Warn(ctx, "fanInChan is full, dropping real-time data frame", "key", config.queryKey, "data_size", len(members)) + } +} + +func sortPullDataByTimeAscending(ctx context.Context, data []network.RealTimePullData) { + sort.Slice(data, func(i, j int) bool { + t1, err1 := strconv.ParseInt(data[i].Time, 10, 64) + if err1 != nil { + logger.Error(ctx, "parsing real time data timestamp failed", "index", i, "time", data[i].Time, "error", err1) + return false + } + + t2, err2 := strconv.ParseInt(data[j].Time, 10, 64) + if err2 != nil { + logger.Error(ctx, "parsing real time data timestamp failed", "index", j, "time", data[j].Time, "error", err2) + return true + } + return t1 < t2 + }) +} diff --git a/model/measurement_model.go b/model/measurement_model.go index c5d0925..9a5446b 100644 --- a/model/measurement_model.go +++ b/model/measurement_model.go @@ -186,8 +186,18 @@ func GenerateMeasureIdentifier(source map[string]any) (string, error) { switch v := regTypeVal.(type) { case int: regType = v + case float32: + if v != float32(int(v)) { + return "", fmt.Errorf("invalid type format in datasource field, expected integer value, got float: %f", v) + } + regType = int(v) + case float64: + if v != float64(int(v)) { + return "", fmt.Errorf("invalid type format in datasource field, expected integer value, got float: %f", v) + } + regType = int(v) default: - return "", fmt.Errorf("invalid type format in datasource field") + return "", fmt.Errorf("invalid type format in datasource field,%T", v) } ioAddrVal, ok := source["io_address"] @@ -215,7 +225,7 @@ func GenerateMeasureIdentifier(source map[string]any) (string, error) { if !ok { return "", fmt.Errorf("CL3611:invalid or missing channel field") } - return fmt.Sprintf("%s.%s.%s", station, device, channel), nil + return concatCL361WithPlus(station, device, channel), nil case constants.DataSourceTypePower104: station, ok := ioAddress["station"].(string) if !ok { @@ -244,8 +254,18 @@ func GenerateMeasureIdentifier(source map[string]any) (string, error) { default: return "", fmt.Errorf("Power104:invalid offset format") } - return fmt.Sprintf("%s.%d.%d", station, packet, offset), nil + return concatP104WithPlus(station, packet, offset), nil default: return "", fmt.Errorf("unsupport regulation type %d into datasource field", regType) } } + +func concatP104WithPlus(station string, packet int, offset int) string { + packetStr := strconv.Itoa(packet) + offsetStr := strconv.Itoa(offset) + return station + ":" + packetStr + ":" + offsetStr +} + +func concatCL361WithPlus(station, device, channel string) string { + return station + ":" + device + ":" + "phasor" + ":" + channel +} diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go index 2b6e9b3..a76fc7f 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_computing.go @@ -118,6 +118,8 @@ func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) { conf.DataSize = int64(measurement.Size) // TODO use constant values ​​for temporary settings conf.minBreachCount = constants.MinBreachCount + // TODO 后续优化 duration 创建方式 + conf.Duration = 10 isFloatCause := false if _, exists := conf.Cause["up"]; exists { @@ -198,7 +200,7 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) { duration := util.SecondsToDuration(conf.Duration) ticker := time.NewTicker(duration) defer ticker.Stop() - startTimestamp := util.GenNanoTsStr() + for { select { case <-conf.StopGchan: @@ -208,13 +210,11 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) { logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal") return case <-ticker.C: - stopTimestamp := util.GenNanoTsStr() - members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize, startTimestamp, stopTimestamp) + members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize) if err != nil { logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err) continue } - startTimestamp = stopTimestamp realTimedatas := util.ConvertZSetMembersToFloat64(ctx, members) if conf.Analyzer != nil {