diff --git a/deploy/redis-test-data/real-time-compute/compute_data_injection.go b/deploy/redis-test-data/real-time-compute/compute_data_injection.go index a03318f..5d43f50 100644 --- a/deploy/redis-test-data/real-time-compute/compute_data_injection.go +++ b/deploy/redis-test-data/real-time-compute/compute_data_injection.go @@ -71,6 +71,20 @@ func generateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase return data } +func generateNormalData(baseValue, normalBase float64) []float64 { + totalLength = 500 + seed := time.Now().UnixNano() + source := rand.NewSource(seed) + r := rand.New(source) + + data := make([]float64, totalLength) + for i := 0; i < totalLength; i++ { + change := normalBase - r.Float64()*normalBase*2 + data[i] = baseValue + change + } + return data +} + func main() { rootCtx := context.Background() @@ -103,63 +117,86 @@ func main() { defer cancel() for key, measInfo := range measInfos { - var highMin, highBase float64 - var lowMin, lowBase float64 - var normalBase float64 + randomType := selectRandomType() + var datas []float64 + if randomType { + // 生成正常数据 + log.Printf("key:%s generate normal data type is %v\n", key, randomType) + baseValue := measInfo.BaseValue + changes := measInfo.Changes + normalBase := changes[0] + noramlMin := baseValue - normalBase + normalMax := baseValue + normalBase + datas = generateNormalData(baseValue, normalBase) + allTrue := true - // TODO 生成一次测试数据 - changes := measInfo.Changes - baseValue := measInfo.BaseValue - if len(changes) == 2 { - highMin = baseValue + changes[0] - lowMin = baseValue + changes[1] - highBase = changes[0] - lowBase = changes[1] - normalBase = changes[0] - } else { - randomIndex := selectRandomInt() - highMin = baseValue + changes[randomIndex] - lowMin = baseValue + changes[randomIndex+1] - highBase = changes[randomIndex] - lowBase = changes[randomIndex+1] - normalBase = changes[0] - } - - datas := generateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase) - // log.Printf("key:%s\n datas:%v\n", key, datas) - - allHigh := true - for i := highStart; i < highEnd; i++ { - if datas[i] <= highMin { - allHigh = false - break - } - } - log.Printf("\n// 验证结果 (高值段在 %d-%d): 所有值是否 > %.2f? %t\n", highStart, highEnd-1, highMin, allHigh) - - allLow := true - for i := lowStart; i < lowEnd; i++ { - if datas[i] >= lowMin { - allLow = false - break - } - } - log.Printf("// 验证结果 (低值段在 %d-%d): 所有值是否 < %.2f? %t\n", lowStart, lowEnd-1, lowMin, allLow) - - allTrue := true - for i := 0; i < totalLength-1; i++ { - value := datas[i] - if i < highStart || (i >= highEnd && i < lowStart) || i >= lowEnd { - log.Printf("index:%d, value:%.2f\n", i, value) - if value >= highMin && value <= lowMin { + for i := 0; i < totalLength-1; i++ { + value := datas[i] + // log.Printf("index:%d, value:%.2f\n", i, value) + if value < noramlMin && value > normalMax { allTrue = false } } + log.Printf("// 验证结果: 所有值是否 >= %.2f或 <= %.2f %t\n", noramlMin, normalMax, allTrue) + } else { + // 生成异常数据 + log.Printf("key:%s generate abnormal data type is %v\n", key, randomType) + var highMin, highBase float64 + var lowMin, lowBase float64 + var normalBase float64 + + // TODO 生成一次测试数据 + changes := measInfo.Changes + baseValue := measInfo.BaseValue + if len(changes) == 2 { + highMin = baseValue + changes[0] + lowMin = baseValue + changes[1] + highBase = changes[0] + lowBase = changes[1] + normalBase = changes[0] + } else { + randomIndex := selectRandomInt() + highMin = baseValue + changes[randomIndex] + lowMin = baseValue + changes[randomIndex+1] + highBase = changes[randomIndex] + lowBase = changes[randomIndex+1] + normalBase = changes[0] + } + + datas = generateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase) + // log.Printf("key:%s\n datas:%v\n", key, datas) + + allHigh := true + for i := highStart; i < highEnd; i++ { + if datas[i] <= highMin { + allHigh = false + break + } + } + log.Printf("// 验证结果 (高值段在 %d-%d): 所有值是否 > %.2f? %t\n", highStart, highEnd-1, highMin, allHigh) + + allLow := true + for i := lowStart; i < lowEnd; i++ { + if datas[i] >= lowMin { + allLow = false + break + } + } + log.Printf("// 验证结果 (低值段在 %d-%d): 所有值是否 < %.2f? %t\n", lowStart, lowEnd-1, lowMin, allLow) + + allTrue := true + for i := 0; i < totalLength-1; i++ { + value := datas[i] + if i < highStart || (i >= highEnd && i < lowStart) || i >= lowEnd { + // log.Printf("index:%d, value:%.2f\n", i, value) + if value >= highMin && value <= lowMin { + allTrue = false + } + } + } + log.Printf("// 验证结果 (正常段在 %d-%d): 所有值是否 <= %.2f或>= %.2f %t\n", 0, totalLength-1, highMin, lowMin, allTrue) } - log.Printf("// 验证结果 (正常段在 %d-%d): 所有值是否 <= %.2f或>= %.2f %t\n", 0, totalLength-1, highMin, lowMin, allTrue) - log.Printf("启动数据写入程序, Redis Key: %s, 基准值: %.4f, 变化范围: %+v\n", key, measInfo.BaseValue, measInfo.Changes) - pipe := globalRedisClient.Pipeline() redisZs := make([]redis.Z, 0, totalLength) currentTime := time.Now().UnixNano() @@ -179,3 +216,9 @@ func main() { } } } + +func selectRandomType() bool { + options := []int{0, 2} + randomValue := rand.Intn(len(options)) + return randomValue != 0 +} diff --git a/deploy/redis-test-data/util/redis.go b/deploy/redis-test-data/util/redis.go index d09e5c5..9b75558 100644 --- a/deploy/redis-test-data/util/redis.go +++ b/deploy/redis-test-data/util/redis.go @@ -1,3 +1,4 @@ +// Package util provide some utility fun package util import ( @@ -7,6 +8,7 @@ import ( "github.com/redis/go-redis/v9" ) +// InitRedisClient define func to initialize and return a redis client func InitRedisClient(redisAddr string) *redis.Client { rdb := redis.NewClient(&redis.Options{ Addr: redisAddr, diff --git a/real-time-data/compute_analyzer.go b/real-time-data/compute_analyzer.go index 9733d57..1525734 100644 --- a/real-time-data/compute_analyzer.go +++ b/real-time-data/compute_analyzer.go @@ -126,7 +126,7 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE } if eventTriggered { - command, content := genTEEventCommandAndContent(conf.Action) + command, content := genTEEventCommandAndContent(ctx, conf.Action) // TODO 考虑 content 是否可以为空,先期不允许 if command == "" || content == "" { logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content) @@ -137,25 +137,29 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE } } -func genTEEventCommandAndContent(action map[string]any) (command string, content string) { +func genTEEventCommandAndContent(ctx context.Context, action map[string]any) (command string, content string) { cmdValue, exist := action["command"] if !exist { + logger.Error(ctx, "can not find command variable into action map", "action", action) return "", "" } commandStr, ok := cmdValue.(string) if !ok { + logger.Error(ctx, "convert command to string type failed", "command", cmdValue, "type", fmt.Sprintf("%T", cmdValue)) return "", "" } command = commandStr - paramsValue, exist := action["parametes"] + paramsValue, exist := action["parameters"] if !exist { + logger.Error(ctx, "can not find parameters variable into action map", "action", action) return command, "" } - parameterSlice, ok := paramsValue.([]string) + parameterSlice, ok := paramsValue.([]any) if !ok { + logger.Error(ctx, "convert parameters to []any type failed", "parameters", paramsValue, "type", fmt.Sprintf("%T", paramsValue)) return command, "" } @@ -164,7 +168,12 @@ func genTEEventCommandAndContent(action map[string]any) (command string, content if i > 0 { builder.WriteString(",") } - builder.WriteString(parameter) + parameterStr, ok := parameter.(string) + if !ok { + logger.Warn(ctx, "parameter type is incorrect, skip this parameter", "parameter", parameter, "type", fmt.Sprintf("%T", parameter)) + continue + } + builder.WriteString(parameterStr) } return command, builder.String() diff --git a/real-time-data/event/event_handlers.go b/real-time-data/event/event_handlers.go index 63eae64..a161a7d 100644 --- a/real-time-data/event/event_handlers.go +++ b/real-time-data/event/event_handlers.go @@ -45,7 +45,7 @@ func handleWarningAction(ctx context.Context, content string) error { // 实际执行发送警告、记录日志等操作 actionParams := content // ... logic to send warning level event using actionParams ... - logger.Warn(ctx, "trigger Warning event", "message", actionParams) + logger.Warn(ctx, "trigger warning event", "message", actionParams) return nil }