From a7d894d2de902b8533b94b59e43af05280b75a38 Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 28 Nov 2025 17:17:58 +0800 Subject: [PATCH] write code for real time data compute shell --- .../compute_data_injection.go | 156 ++++++++++ .../sub_data_injection.go} | 291 +----------------- deploy/redis-test-data/util/rand.go | 266 ++++++++++++++++ main.go | 1 - real-time-data/compute_analyzer.go | 2 - real-time-data/real_time_data_computing.go | 2 +- util/convert.go | 32 +- 7 files changed, 439 insertions(+), 311 deletions(-) create mode 100644 deploy/redis-test-data/real-time-compute/compute_data_injection.go rename deploy/redis-test-data/{data_injection.go => real-time-subpull/sub_data_injection.go} (65%) create mode 100644 deploy/redis-test-data/util/rand.go diff --git a/deploy/redis-test-data/real-time-compute/compute_data_injection.go b/deploy/redis-test-data/real-time-compute/compute_data_injection.go new file mode 100644 index 0000000..40897ef --- /dev/null +++ b/deploy/redis-test-data/real-time-compute/compute_data_injection.go @@ -0,0 +1,156 @@ +// Package main implement redis test data injection +package main + +import ( + "context" + "fmt" + "log" + "math/rand" + "strconv" + "time" + + "modelRT/orm" + + util "modelRT/deploy/redis-test-data/util" + + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +var ( + highEnd, highStart, lowStart, lowEnd int + totalLength int + highSegmentLength int + lowSegmentLength int +) + +func selectRandomInt() int { + options := []int{0, 2} + randomIndex := rand.Intn(len(options)) + return options[randomIndex] +} + +// GenerateMixedData 生成满足特定条件的一组浮点数数据 +func GenerateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase float64) []string { + totalLength = 500 + highSegmentLength = 20 + lowSegmentLength = 20 + + seed := time.Now().UnixNano() + source := rand.NewSource(seed) + r := rand.New(source) + + data := make([]string, totalLength) + highStart = rand.Intn(totalLength - highSegmentLength - lowSegmentLength - 1) + highEnd = highStart + highSegmentLength + lowStart = rand.Intn(totalLength-lowSegmentLength-highEnd) + highEnd + lowEnd = lowStart + lowSegmentLength + + for i := 0; i < totalLength; i++ { + if i >= highStart && i < highStart+highSegmentLength { + // 数据值均大于 55.0,在 [55.5, 60.0] 范围内随机 + // rand.Float64() 生成 [0.0, 1.0) 范围的浮点数 + data[i] = strconv.FormatFloat(highMin+r.Float64()*(highBase), 'f', 2, 64) + } else if i >= lowStart && i < lowStart+lowSegmentLength { + // 数据值均小于 45.0,在 [40.0, 44.5] 范围内随机 + data[i] = strconv.FormatFloat(lowMin+r.Float64()*(lowBase), 'f', 2, 64) + } else { + // 数据在 [45.0, 55.0] 范围内随机 (baseValue ± 5) + // 50 + rand.Float64() * 10 - 5 + change := normalBase - r.Float64()*normalBase*2 + data[i] = strconv.FormatFloat(baseValue+change, 'f', 2, 64) + } + } + return data +} + +func main() { + rootCtx := context.Background() + + pgURI := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", "192.168.1.101", 5432, "postgres", "coslight", "demo") + + postgresDBClient, err := gorm.Open(postgres.Open(pgURI)) + if err != nil { + panic(err) + } + defer func() { + sqlDB, err := postgresDBClient.DB() + if err != nil { + panic(err) + } + sqlDB.Close() + }() + + cancelCtx, cancel := context.WithTimeout(rootCtx, 5*time.Second) + defer cancel() + var measurements []orm.Measurement + result := postgresDBClient.WithContext(cancelCtx).Find(&measurements) + if result.Error != nil { + panic(result.Error) + } + log.Println("总共读取到测量点数量:", len(measurements)) + measInfos := util.ProcessMeasurements(measurements) + + for key, measInfo := range measInfos { + var highMin, highBase float64 + var lowMin, lowBase float64 + var normalBase float64 + + // TODO 生成一次测试数据 + changes := measInfo.Changes + baseValue := measInfo.BaseValue + if len(changes) == 2 { + highMin = baseValue + changes[0] + lowMin = baseValue + changes[1] + highBase = changes[0] + lowBase = changes[1] + normalBase = changes[0] + } else { + randomIndex := selectRandomInt() + fmt.Println(randomIndex) + highMin = baseValue + changes[randomIndex] + lowMin = baseValue + changes[randomIndex+1] + highBase = changes[randomIndex] + lowBase = changes[randomIndex+1] + normalBase = changes[0] + } + + datas := GenerateMixedData(highMin, lowMin, highBase, lowBase, baseValue, normalBase) + + fmt.Printf("key:%s\n datas:%v\n", key, datas) + // 检查高值段是否满足 > 55 + allHigh := true + + for i := highStart; i < highEnd; i++ { + value, _ := strconv.ParseFloat(datas[i], 10) + if value <= highMin { + allHigh = false + break + } + } + fmt.Printf("\n// 验证结果 (高值段在 %d-%d): 所有值是否 > %.2f? %t\n", highStart, highEnd-1, highMin, allHigh) + + // 检查低值段是否满足 < 45 + allLow := true + for i := lowStart; i < lowEnd; i++ { + value, _ := strconv.ParseFloat(datas[i], 10) + if value >= lowMin { + allLow = false + break + } + } + fmt.Printf("// 验证结果 (低值段在 %d-%d): 所有值是否 < %.2f? %t\n", lowStart, lowEnd-1, lowMin, allLow) + + allTrue := true + for i := 0; i < totalLength-1; i++ { + value, _ := strconv.ParseFloat(datas[i], 10) + if i < highStart || (i >= highEnd && i < lowStart) || i >= lowEnd { + fmt.Printf("index:%d, value:%.2f\n", i, value) + if value >= highMin && value <= lowMin { + allTrue = false + } + } + } + fmt.Printf("// 验证结果 (正常段在 %d-%d): 所有值是否 <= %.2f或>= %.2f? %t\n", 0, totalLength-1, highMin, lowMin, allTrue) + } +} diff --git a/deploy/redis-test-data/data_injection.go b/deploy/redis-test-data/real-time-subpull/sub_data_injection.go similarity index 65% rename from deploy/redis-test-data/data_injection.go rename to deploy/redis-test-data/real-time-subpull/sub_data_injection.go index c8f918a..fb0c8e5 100644 --- a/deploy/redis-test-data/data_injection.go +++ b/deploy/redis-test-data/real-time-subpull/sub_data_injection.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + "modelRT/deploy/redis-test-data/util" "modelRT/orm" redis "github.com/redis/go-redis/v9" @@ -43,292 +44,6 @@ func initRedisClient() *redis.Client { return rdb } -func processMeasurements(measurements []orm.Measurement) map[string]calculationResult { - results := make(map[string]calculationResult, len(measurements)) - for _, measurement := range measurements { - // 检查 DataSource 是否存在且 type 为 1 - if measurement.DataSource == nil { - continue - } - - // 检查 type 是否为 1 - dataType, typeExists := measurement.DataSource["type"] - if !typeExists { - continue - } - - // 类型断言,处理不同的数字类型 - var typeValue int - switch v := dataType.(type) { - case int: - typeValue = v - case float64: - typeValue = int(v) - case int64: - typeValue = int(v) - default: - continue - } - - if typeValue != 1 { - continue - } - - // 获取 io_address - ioAddressRaw, ioExists := measurement.DataSource["io_address"] - if !ioExists { - continue - } - - ioAddress, ok := ioAddressRaw.(map[string]any) - if !ok { - continue - } - - station, _ := ioAddress["station"].(string) - device, _ := ioAddress["device"].(string) - channel, _ := ioAddress["channel"].(string) - - result := fmt.Sprintf("%s:%s:phasor:%s", station, device, channel) - if measurement.EventPlan == nil { - continue - } - - causeValue, causeExist := measurement.EventPlan["cause"] - if !causeExist { - continue - } - causeMap, ok := causeValue.(map[string]any) - if !ok { - continue - } - calResult, err := calculateBaseValueEnhanced(causeMap) - if err != nil { - continue - } - calResult.Size = measurement.Size - results[result] = calResult - } - return results -} - -func calculateBaseValue(data map[string]any) (float64, error) { - // 检查边沿触发类型 - if edge, exists := data["edge"]; exists { - return calculateEdgeValue(edge) - } - - // 检查上下限和预警上下限 - hasUpDown := hasKeys(data, "up", "down") - hasUpUpDownDown := hasKeys(data, "upup", "downdown") - - switch { - case hasUpDown && hasUpUpDownDown: - // 同时包含四个键,使用 up 和 down 计算基准值 - return calculateAverage(data, "up", "down") - - case hasUpDown: - // 只有上下限 - return calculateAverage(data, "up", "down") - - case hasUpUpDownDown: - // 只有预警上上限与下下限 - return calculateAverage(data, "upup", "downdown") - - default: - return 0, fmt.Errorf("不支持的数据结构: %v", data) - } -} - -func calculateEdgeValue(edge any) (float64, error) { - edgeStr, ok := edge.(string) - if !ok { - return 0, fmt.Errorf("edge 字段类型错误,期望 string,得到 %T", edge) - } - - switch edgeStr { - case "raising": - return 1.0, nil - case "falling": - return 0.0, nil - default: - return 0, fmt.Errorf("不支持的 edge 值: %s", edgeStr) - } -} - -func calculateAverage(data map[string]any, key1, key2 string) (float64, error) { - val1, err := getFloatValue(data, key1) - if err != nil { - return 0, err - } - - val2, err := getFloatValue(data, key2) - if err != nil { - return 0, err - } - - return (val1 + val2) / 2.0, nil -} - -func calculateChanges(data map[string]any, baseValue float64, maxLimt bool, limitNum int) ([]float64, error) { - results := make([]float64, 0, limitNum) - switch limitNum { - case 2: - var key1, key2 string - if maxLimt { - key1 = "upup" - key2 = "downdown" - } else { - key1 = "up" - key2 = "down" - } - val1, err := getFloatValue(data, key1) - if err != nil { - return nil, err - } - results = append(results, val1-baseValue) - - val2, err := getFloatValue(data, key2) - if err != nil { - return nil, err - } - results = append(results, val2-baseValue) - case 4: - key1 := "up" - key2 := "down" - key3 := "upup" - key4 := "downdown" - - val1, err := getFloatValue(data, key1) - if err != nil { - return nil, err - } - results = append(results, val1-baseValue) - - val2, err := getFloatValue(data, key2) - if err != nil { - return nil, err - } - results = append(results, val2-baseValue) - - val3, err := getFloatValue(data, key3) - if err != nil { - return nil, err - } - results = append(results, val3-baseValue) - - val4, err := getFloatValue(data, key4) - if err != nil { - return nil, err - } - results = append(results, val4-baseValue) - } - - return results, nil -} - -func getFloatValue(data map[string]any, key string) (float64, error) { - value, exists := data[key] - if !exists { - return 0, fmt.Errorf("缺少必需的键:%s", key) - } - - switch v := value.(type) { - case float64: - return v, nil - case int: - return float64(v), nil - case float32: - return float64(v), nil - default: - return 0, fmt.Errorf("键 %s 的值类型错误,期望数字类型,得到 %T", key, value) - } -} - -func hasKeys(data map[string]any, keys ...string) bool { - for _, key := range keys { - if _, exists := data[key]; !exists { - return false - } - } - return true -} - -type calculationResult struct { - BaseValue float64 - Changes []float64 - Size int - BaseType string // "normal", "warning", "edge" - Message string -} - -func calculateBaseValueEnhanced(data map[string]any) (calculationResult, error) { - result := calculationResult{} - if edge, exists := data["edge"]; exists { - value, err := calculateEdgeValue(edge) - if err != nil { - return result, err - } - if edge == "raising" { - result.Changes = []float64{1.0} - } else { - result.Changes = []float64{0.0} - } - - result.BaseValue = value - result.BaseType = "TI" - result.Message = "边沿触发基准值" - return result, nil - } - - hasUpDown := hasKeys(data, "up", "down") - hasUpUpDownDown := hasKeys(data, "upup", "downdown") - result.BaseType = "TE" - switch { - case hasUpDown && hasUpUpDownDown: - value, err := calculateAverage(data, "up", "down") - if err != nil { - return result, err - } - result.BaseValue = value - result.Changes, err = calculateChanges(data, value, false, 4) - if err != nil { - return result, err - } - result.Message = "上下限基准值(忽略预警上上下下限)" - return result, nil - - case hasUpDown: - value, err := calculateAverage(data, "up", "down") - if err != nil { - return result, err - } - result.BaseValue = value - result.Changes, err = calculateChanges(data, value, false, 2) - if err != nil { - return result, err - } - result.Message = "上下限基准值" - return result, nil - - case hasUpUpDownDown: - value, err := calculateAverage(data, "upup", "downdown") - if err != nil { - return result, err - } - result.BaseValue = value - result.Changes, err = calculateChanges(data, value, true, 2) - if err != nil { - return result, err - } - result.Message = "上上下下限基准值" - return result, nil - - default: - return result, fmt.Errorf("不支持的数据结构: %v", data) - } -} - // outlierConfig 异常段配置 type outlierConfig struct { Enabled bool // 是否启用异常段 @@ -609,7 +324,7 @@ func generateRandomData(baseValue float64, changes []float64, size int) []float6 } // simulateDataWrite 定时生成并写入模拟数据到 Redis ZSet -func simulateDataWrite(ctx context.Context, rdb *redis.Client, redisKey string, config outlierConfig, measInfo calculationResult) { +func simulateDataWrite(ctx context.Context, rdb *redis.Client, redisKey string, config outlierConfig, measInfo util.CalculationResult) { log.Printf("启动数据写入程序, Redis Key: %s, 基准值: %.4f, 变化范围: %+v\n", redisKey, measInfo.BaseValue, measInfo.Changes) ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() @@ -723,7 +438,7 @@ func main() { panic(result.Error) } log.Println("总共读取到测量点数量:", len(measurements)) - measInfos := processMeasurements(measurements) + measInfos := util.ProcessMeasurements(measurements) // 测量点数据生成(包含异常数据) // 配置异常段参数 diff --git a/deploy/redis-test-data/util/rand.go b/deploy/redis-test-data/util/rand.go new file mode 100644 index 0000000..7df9c92 --- /dev/null +++ b/deploy/redis-test-data/util/rand.go @@ -0,0 +1,266 @@ +// Package util provide some utility fun +package util + +import ( + "fmt" + + "modelRT/orm" +) + +type CalculationResult struct { + BaseValue float64 + Changes []float64 + Size int + BaseType string // "normal", "warning", "edge" + Message string +} + +func ProcessMeasurements(measurements []orm.Measurement) map[string]CalculationResult { + results := make(map[string]CalculationResult, len(measurements)) + for _, measurement := range measurements { + // 检查 DataSource 是否存在且 type 为 1 + if measurement.DataSource == nil { + continue + } + + // 检查 type 是否为 1 + dataType, typeExists := measurement.DataSource["type"] + if !typeExists { + continue + } + + // 类型断言,处理不同的数字类型 + var typeValue int + switch v := dataType.(type) { + case int: + typeValue = v + case float64: + typeValue = int(v) + case int64: + typeValue = int(v) + default: + continue + } + + if typeValue != 1 { + continue + } + + // 获取 io_address + ioAddressRaw, ioExists := measurement.DataSource["io_address"] + if !ioExists { + continue + } + + ioAddress, ok := ioAddressRaw.(map[string]any) + if !ok { + continue + } + + station, _ := ioAddress["station"].(string) + device, _ := ioAddress["device"].(string) + channel, _ := ioAddress["channel"].(string) + + result := fmt.Sprintf("%s:%s:phasor:%s", station, device, channel) + if measurement.EventPlan == nil { + continue + } + + causeValue, causeExist := measurement.EventPlan["cause"] + if !causeExist { + continue + } + causeMap, ok := causeValue.(map[string]any) + if !ok { + continue + } + calResult, err := calculateBaseValueEnhanced(causeMap) + if err != nil { + continue + } + calResult.Size = measurement.Size + results[result] = calResult + } + return results +} + +func calculateBaseValueEnhanced(data map[string]any) (CalculationResult, error) { + result := CalculationResult{} + if edge, exists := data["edge"]; exists { + value, err := calculateEdgeValue(edge) + if err != nil { + return result, err + } + if edge == "raising" { + result.Changes = []float64{1.0} + } else { + result.Changes = []float64{0.0} + } + + result.BaseValue = value + result.BaseType = "TI" + result.Message = "边沿触发基准值" + return result, nil + } + + hasUpDown := HasKeys(data, "up", "down") + hasUpUpDownDown := HasKeys(data, "upup", "downdown") + result.BaseType = "TE" + switch { + case hasUpDown && hasUpUpDownDown: + value, err := calculateAverage(data, "up", "down") + if err != nil { + return result, err + } + result.BaseValue = value + result.Changes, err = calculateChanges(data, value, false, 4) + if err != nil { + return result, err + } + result.Message = "上下限基准值(忽略预警上上下下限)" + return result, nil + + case hasUpDown: + value, err := calculateAverage(data, "up", "down") + if err != nil { + return result, err + } + result.BaseValue = value + result.Changes, err = calculateChanges(data, value, false, 2) + if err != nil { + return result, err + } + result.Message = "上下限基准值" + return result, nil + + case hasUpUpDownDown: + value, err := calculateAverage(data, "upup", "downdown") + if err != nil { + return result, err + } + result.BaseValue = value + result.Changes, err = calculateChanges(data, value, true, 2) + if err != nil { + return result, err + } + result.Message = "上上下下限基准值" + return result, nil + + default: + return result, fmt.Errorf("不支持的数据结构: %v", data) + } +} + +func calculateAverage(data map[string]any, key1, key2 string) (float64, error) { + val1, err := getFloatValue(data, key1) + if err != nil { + return 0, err + } + + val2, err := getFloatValue(data, key2) + if err != nil { + return 0, err + } + + return (val1 + val2) / 2.0, nil +} + +func calculateChanges(data map[string]any, baseValue float64, maxLimt bool, limitNum int) ([]float64, error) { + results := make([]float64, 0, limitNum) + switch limitNum { + case 2: + var key1, key2 string + if maxLimt { + key1 = "upup" + key2 = "downdown" + } else { + key1 = "up" + key2 = "down" + } + val1, err := getFloatValue(data, key1) + if err != nil { + return nil, err + } + results = append(results, val1-baseValue) + + val2, err := getFloatValue(data, key2) + if err != nil { + return nil, err + } + results = append(results, val2-baseValue) + case 4: + key1 := "up" + key2 := "down" + key3 := "upup" + key4 := "downdown" + + val1, err := getFloatValue(data, key1) + if err != nil { + return nil, err + } + results = append(results, val1-baseValue) + + val2, err := getFloatValue(data, key2) + if err != nil { + return nil, err + } + results = append(results, val2-baseValue) + + val3, err := getFloatValue(data, key3) + if err != nil { + return nil, err + } + results = append(results, val3-baseValue) + + val4, err := getFloatValue(data, key4) + if err != nil { + return nil, err + } + results = append(results, val4-baseValue) + } + + return results, nil +} + +func getFloatValue(data map[string]any, key string) (float64, error) { + value, exists := data[key] + if !exists { + return 0, fmt.Errorf("缺少必需的键:%s", key) + } + + switch v := value.(type) { + case float64: + return v, nil + case int: + return float64(v), nil + case float32: + return float64(v), nil + default: + return 0, fmt.Errorf("键 %s 的值类型错误,期望数字类型,得到 %T", key, value) + } +} + +func HasKeys(data map[string]any, keys ...string) bool { + for _, key := range keys { + if _, exists := data[key]; !exists { + return false + } + } + return true +} + +func calculateEdgeValue(edge any) (float64, error) { + edgeStr, ok := edge.(string) + if !ok { + return 0, fmt.Errorf("edge 字段类型错误,期望 string,得到 %T", edge) + } + + switch edgeStr { + case "raising": + return 1.0, nil + case "falling": + return 0.0, nil + default: + return 0, fmt.Errorf("不支持的 edge 值: %s", edgeStr) + } +} diff --git a/main.go b/main.go index f9f96f4..fa5b45b 100644 --- a/main.go +++ b/main.go @@ -158,7 +158,6 @@ func main() { logger.Error(ctx, "load topologic info from postgres failed", "error", err) panic(err) } - go realtimedata.StartRealTimeDataComputing(ctx, allMeasurement) tree, err := database.QueryTopologicFromDB(ctx, tx) diff --git a/real-time-data/compute_analyzer.go b/real-time-data/compute_analyzer.go index 4afac84..9733d57 100644 --- a/real-time-data/compute_analyzer.go +++ b/real-time-data/compute_analyzer.go @@ -135,7 +135,6 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE event.TriggerEventAction(ctx, command, content) return } - logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.") } func genTEEventCommandAndContent(action map[string]any) (command string, content string) { @@ -298,7 +297,6 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE event.TriggerEventAction(ctx, command, content) return } - logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.") } func genTIEventCommandAndContent(action map[string]any) (command string, content string) { diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go index a76fc7f..3616daa 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_computing.go @@ -216,7 +216,7 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) { continue } - realTimedatas := util.ConvertZSetMembersToFloat64(ctx, members) + realTimedatas := util.ConvertZSetMembersToFloat64(members) if conf.Analyzer != nil { conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas) } else { diff --git a/util/convert.go b/util/convert.go index c7b637b..6e4f04f 100644 --- a/util/convert.go +++ b/util/convert.go @@ -2,32 +2,26 @@ package util import ( - "context" - "strconv" - - "modelRT/logger" + "sort" "github.com/redis/go-redis/v9" ) // ConvertZSetMembersToFloat64 define func to conver zset member type to float64 -func ConvertZSetMembersToFloat64(ctx context.Context, members []redis.Z) []float64 { +func ConvertZSetMembersToFloat64(members []redis.Z) []float64 { dataFloats := make([]float64, 0, len(members)) - + // recovery time sorted in ascending order + sortRedisZByTimeMemberAscending(members) for _, member := range members { - valStr, ok := member.Member.(string) - if !ok { - logger.Warn(ctx, "redis zset member value is not a string,skipping") - continue - } - - valFloat, err := strconv.ParseFloat(valStr, 64) - if err != nil { - logger.Error(ctx, "failed to parse zset member string to float64", "value", valStr, "error", err) - continue - } - dataFloats = append(dataFloats, valFloat) + dataFloats = append(dataFloats, member.Score) } - return dataFloats } + +func sortRedisZByTimeMemberAscending(data []redis.Z) { + sort.Slice(data, func(i, j int) bool { + memberI := data[i].Member.(string) + memberJ := data[j].Member.(string) + return memberI < memberJ + }) +}