From 329b4827f851b74a550116677549f5c50065fa17 Mon Sep 17 00:00:00 2001 From: douxu Date: Mon, 1 Dec 2025 11:27:38 +0800 Subject: [PATCH] fix bug of real time data injection shell --- .../compute_data_injection.go | 67 +++++++++++++------ .../real-time-subpull/sub_data_injection.go | 21 +----- deploy/redis-test-data/util/redis.go | 25 +++++++ 3 files changed, 73 insertions(+), 40 deletions(-) create mode 100644 deploy/redis-test-data/util/redis.go diff --git a/deploy/redis-test-data/real-time-compute/compute_data_injection.go b/deploy/redis-test-data/real-time-compute/compute_data_injection.go index 40897ef..a03318f 100644 --- a/deploy/redis-test-data/real-time-compute/compute_data_injection.go +++ b/deploy/redis-test-data/real-time-compute/compute_data_injection.go @@ -13,10 +13,17 @@ import ( util "modelRT/deploy/redis-test-data/util" + "github.com/redis/go-redis/v9" "gorm.io/driver/postgres" "gorm.io/gorm" ) +const ( + redisAddr = "localhost:6379" +) + +var globalRedisClient *redis.Client + var ( highEnd, highStart, lowStart, lowEnd int totalLength int @@ -30,8 +37,8 @@ func selectRandomInt() int { return options[randomIndex] } -// GenerateMixedData 生成满足特定条件的一组浮点数数据 -func GenerateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase float64) []string { +// generateMixedData define func to generate a set of floating-point data that meets specific conditions +func generateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase float64) []float64 { totalLength = 500 highSegmentLength = 20 lowSegmentLength = 20 @@ -40,7 +47,7 @@ func GenerateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase source := rand.NewSource(seed) r := rand.New(source) - data := make([]string, totalLength) + data := make([]float64, totalLength) highStart = rand.Intn(totalLength - highSegmentLength - lowSegmentLength - 1) highEnd = highStart + highSegmentLength lowStart = rand.Intn(totalLength-lowSegmentLength-highEnd) + highEnd @@ -50,15 +57,15 @@ func GenerateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase if i >= highStart && i < highStart+highSegmentLength { // 数据值均大于 55.0,在 [55.5, 60.0] 范围内随机 // rand.Float64() 生成 [0.0, 1.0) 范围的浮点数 - data[i] = strconv.FormatFloat(highMin+r.Float64()*(highBase), 'f', 2, 64) + data[i] = highMin + r.Float64()*(highBase) } else if i >= lowStart && i < lowStart+lowSegmentLength { // 数据值均小于 45.0,在 [40.0, 44.5] 范围内随机 - data[i] = strconv.FormatFloat(lowMin+r.Float64()*(lowBase), 'f', 2, 64) + data[i] = lowMin + r.Float64()*(lowBase) } else { // 数据在 [45.0, 55.0] 范围内随机 (baseValue ± 5) // 50 + rand.Float64() * 10 - 5 change := normalBase - r.Float64()*normalBase*2 - data[i] = strconv.FormatFloat(baseValue+change, 'f', 2, 64) + data[i] = baseValue + change } } return data @@ -91,6 +98,10 @@ func main() { log.Println("总共读取到测量点数量:", len(measurements)) measInfos := util.ProcessMeasurements(measurements) + globalRedisClient = util.InitRedisClient(redisAddr) + rCancelCtx, cancel := context.WithCancel(rootCtx) + defer cancel() + for key, measInfo := range measInfos { var highMin, highBase float64 var lowMin, lowBase float64 @@ -107,7 +118,6 @@ func main() { normalBase = changes[0] } else { randomIndex := selectRandomInt() - fmt.Println(randomIndex) highMin = baseValue + changes[randomIndex] lowMin = baseValue + changes[randomIndex+1] highBase = changes[randomIndex] @@ -115,42 +125,57 @@ func main() { normalBase = changes[0] } - datas := GenerateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase) + datas := generateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase) + // log.Printf("key:%s\n datas:%v\n", key, datas) - fmt.Printf("key:%s\n datas:%v\n", key, datas) - // 检查高值段是否满足 > 55 allHigh := true - for i := highStart; i < highEnd; i++ { - value, _ := strconv.ParseFloat(datas[i], 10) - if value <= highMin { + if datas[i] <= highMin { allHigh = false break } } - fmt.Printf("\n// 验证结果 (高值段在 %d-%d): 所有值是否 > %.2f? %t\n", highStart, highEnd-1, highMin, allHigh) + log.Printf("\n// 验证结果 (高值段在 %d-%d): 所有值是否 > %.2f? %t\n", highStart, highEnd-1, highMin, allHigh) - // 检查低值段是否满足 < 45 allLow := true for i := lowStart; i < lowEnd; i++ { - value, _ := strconv.ParseFloat(datas[i], 10) - if value >= lowMin { + if datas[i] >= lowMin { allLow = false break } } - fmt.Printf("// 验证结果 (低值段在 %d-%d): 所有值是否 < %.2f? %t\n", lowStart, lowEnd-1, lowMin, allLow) + log.Printf("// 验证结果 (低值段在 %d-%d): 所有值是否 < %.2f? %t\n", lowStart, lowEnd-1, lowMin, allLow) allTrue := true for i := 0; i < totalLength-1; i++ { - value, _ := strconv.ParseFloat(datas[i], 10) + value := datas[i] if i < highStart || (i >= highEnd && i < lowStart) || i >= lowEnd { - fmt.Printf("index:%d, value:%.2f\n", i, value) + log.Printf("index:%d, value:%.2f\n", i, value) if value >= highMin && value <= lowMin { allTrue = false } } } - fmt.Printf("// 验证结果 (正常段在 %d-%d): 所有值是否 <= %.2f或>= %.2f? %t\n", 0, totalLength-1, highMin, lowMin, allTrue) + log.Printf("// 验证结果 (正常段在 %d-%d): 所有值是否 <= %.2f或>= %.2f %t\n", 0, totalLength-1, highMin, lowMin, allTrue) + + log.Printf("启动数据写入程序, Redis Key: %s, 基准值: %.4f, 变化范围: %+v\n", key, measInfo.BaseValue, measInfo.Changes) + + pipe := globalRedisClient.Pipeline() + redisZs := make([]redis.Z, 0, totalLength) + currentTime := time.Now().UnixNano() + for i := range totalLength { + sequentialTime := currentTime + int64(i) + z := redis.Z{ + Score: datas[i], + Member: strconv.FormatInt(sequentialTime, 10), + } + redisZs = append(redisZs, z) + } + log.Printf("启动数据写入程序, Redis Key: %s, 写入数据量: %d\n", key, len(redisZs)) + pipe.ZAdd(rCancelCtx, key, redisZs...) + _, err = pipe.Exec(rCancelCtx) + if err != nil { + log.Printf("redis pipeline execution failed: %v\n", err) + } } } diff --git a/deploy/redis-test-data/real-time-subpull/sub_data_injection.go b/deploy/redis-test-data/real-time-subpull/sub_data_injection.go index fb0c8e5..a15cef6 100644 --- a/deploy/redis-test-data/real-time-subpull/sub_data_injection.go +++ b/deploy/redis-test-data/real-time-subpull/sub_data_injection.go @@ -22,28 +22,11 @@ import ( // Redis配置 const ( - RedisAddr = "localhost:6379" + redisAddr = "localhost:6379" ) var globalRedisClient *redis.Client -func initRedisClient() *redis.Client { - rdb := redis.NewClient(&redis.Options{ - Addr: RedisAddr, - Password: "", // 如果有密码,请填写 - DB: 0, // 使用默认数据库 - }) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - _, err := rdb.Ping(ctx).Result() - if err != nil { - return nil - } - return rdb -} - // outlierConfig 异常段配置 type outlierConfig struct { Enabled bool // 是否启用异常段 @@ -451,7 +434,7 @@ func main() { Distribution: "both", // 分布类型 } - globalRedisClient = initRedisClient() + globalRedisClient = util.InitRedisClient(redisAddr) rCancelCtx, cancel := context.WithCancel(rootCtx) defer cancel() diff --git a/deploy/redis-test-data/util/redis.go b/deploy/redis-test-data/util/redis.go new file mode 100644 index 0000000..d09e5c5 --- /dev/null +++ b/deploy/redis-test-data/util/redis.go @@ -0,0 +1,25 @@ +package util + +import ( + "context" + "time" + + "github.com/redis/go-redis/v9" +) + +func InitRedisClient(redisAddr string) *redis.Client { + rdb := redis.NewClient(&redis.Options{ + Addr: redisAddr, + Password: "", + DB: 0, + }) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := rdb.Ping(ctx).Result() + if err != nil { + return nil + } + return rdb +}