diff --git a/deploy/redis-test-data/data_injection.go b/deploy/redis-test-data/data_injection.go new file mode 100644 index 0000000..7a3cbe9 --- /dev/null +++ b/deploy/redis-test-data/data_injection.go @@ -0,0 +1,802 @@ +// Package main implement redis test data injection +package main + +import ( + "context" + "fmt" + "log" + "math/rand" + "time" + + "modelRT/orm" + + redis "github.com/redis/go-redis/v9" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +// Redis配置 +const ( + RedisAddr = "localhost:6379" // 您的Redis地址 + KeyName = "telemetry:sensor_101" // 存储实时数据的 ZSet 键名 +) + +func initRedisClient() *redis.Client { + rdb := redis.NewClient(&redis.Options{ + Addr: RedisAddr, + Password: "", // 如果有密码,请填写 + DB: 0, // 使用默认数据库 + }) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := rdb.Ping(ctx).Result() + if err != nil { + return nil + } + return rdb +} + +// simulateDataWrite 定时生成并写入模拟数据到 Redis ZSet +func simulateDataWrite(ctx context.Context, rdb *redis.Client) { + // 设定写入频率:每秒写入一次 + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + // 用于生成模拟数据的初始值 + currentValue := 25.0 + // 模拟数据变化的方向 + direction := 0.1 + + for { + select { + case <-ctx.Done(): + fmt.Printf("\n[%s] 写入程序已停止。\n", KeyName) + return // 收到停止信号,退出 goroutine + + case <-ticker.C: + // 1. 生成新的模拟数据 + currentValue += direction + if currentValue > 30.0 || currentValue < 20.0 { + direction *= -1 // 达到边界,反转变化方向 + } + + // 2. 准备写入数据 + // ZSet的 Score 使用当前时间戳(秒级) + score := float64(time.Now().Unix()) + + // ZSet的 Member 使用值(转换为字符串) + // 在您的实时数据处理场景中,Member 通常存储值,Score 存储时间戳 + memberValue := fmt.Sprintf("%.2f", currentValue) + + z := redis.Z{ + Score: score, + Member: memberValue, + } + + // 3. 执行 ZADD 写入操作 + err := rdb.ZAdd(ctx, KeyName, z).Err() + if err != nil { + log.Printf("写入 Redis 失败 (%s):%v", KeyName, err) + } else { + fmt.Printf("[%s] 成功写入:时间戳=%.0f, 值=%s\n", KeyName, score, memberValue) + } + } + } +} + +func processMeasurements(measurements []orm.Measurement) map[string]calculationResult { + results := make(map[string]calculationResult, len(measurements)) + for _, measurement := range measurements { + // 检查 DataSource 是否存在且 type 为 1 + if measurement.DataSource == nil { + fmt.Println(11111) + continue + } + + // 检查 type 是否为 1 + dataType, typeExists := measurement.DataSource["type"] + if !typeExists { + fmt.Println(22222) + continue + } + + // 类型断言,处理不同的数字类型 + var typeValue int + switch v := dataType.(type) { + case int: + typeValue = v + case float64: + typeValue = int(v) + case int64: + typeValue = int(v) + default: + continue + } + + if typeValue != 1 { + fmt.Println(33333) + continue + } + + // 获取 io_address + ioAddressRaw, ioExists := measurement.DataSource["io_address"] + if !ioExists { + fmt.Println(44444) + continue + } + + ioAddress, ok := ioAddressRaw.(map[string]any) + if !ok { + fmt.Println(44444) + continue + } + + station, _ := ioAddress["station"].(string) + device, _ := ioAddress["device"].(string) + channel, _ := ioAddress["channel"].(string) + + result := fmt.Sprintf("%s:%s:phasor:%s", station, device, channel) + if measurement.EventPlan == nil { + fmt.Println(55555) + continue + } + + causeValue, causeExist := measurement.EventPlan["cause"] + if !causeExist { + fmt.Println(66666) + continue + } + causeMap, ok := causeValue.(map[string]any) + if !ok { + fmt.Println(77777) + continue + } + calResult, err := calculateBaseValueEnhanced(causeMap) + if err != nil { + fmt.Println(88888, err) + continue + } + calResult.Size = measurement.Size + results[result] = calResult + } + return results +} + +func calculateBaseValue(data map[string]any) (float64, error) { + // 检查边沿触发类型 + if edge, exists := data["edge"]; exists { + return calculateEdgeValue(edge) + } + + // 检查上下限和预警上下限 + hasUpDown := hasKeys(data, "up", "down") + hasUpUpDownDown := hasKeys(data, "upup", "downdown") + + switch { + case hasUpDown && hasUpUpDownDown: + // 同时包含四个键,使用 up 和 down 计算基准值 + return calculateAverage(data, "up", "down") + + case hasUpDown: + // 只有上下限 + return calculateAverage(data, "up", "down") + + case hasUpUpDownDown: + // 只有预警上上限与下下限 + return calculateAverage(data, "upup", "downdown") + + default: + return 0, fmt.Errorf("不支持的数据结构: %v", data) + } +} + +func calculateEdgeValue(edge any) (float64, error) { + edgeStr, ok := edge.(string) + if !ok { + return 0, fmt.Errorf("edge 字段类型错误,期望 string,得到 %T", edge) + } + + switch edgeStr { + case "raising": + return 1.0, nil + case "falling": + return 0.0, nil + default: + return 0, fmt.Errorf("不支持的 edge 值: %s", edgeStr) + } +} + +func calculateAverage(data map[string]any, key1, key2 string) (float64, error) { + val1, err := getFloatValue(data, key1) + if err != nil { + return 0, err + } + + val2, err := getFloatValue(data, key2) + if err != nil { + return 0, err + } + + return (val1 + val2) / 2.0, nil +} + +func calculateChanges(data map[string]any, baseValue float64, maxLimt bool, limitNum int) ([]float64, error) { + results := make([]float64, 0, limitNum) + switch limitNum { + case 2: + var key1, key2 string + if maxLimt { + key1 = "upup" + key2 = "downdown" + } else { + key1 = "up" + key2 = "down" + } + val1, err := getFloatValue(data, key1) + if err != nil { + return nil, err + } + results = append(results, val1-baseValue) + + val2, err := getFloatValue(data, key2) + if err != nil { + return nil, err + } + results = append(results, val2-baseValue) + case 4: + key1 := "up" + key2 := "down" + key3 := "upup" + key4 := "downdown" + + val1, err := getFloatValue(data, key1) + if err != nil { + return nil, err + } + results = append(results, val1-baseValue) + + val2, err := getFloatValue(data, key2) + if err != nil { + return nil, err + } + results = append(results, val2-baseValue) + + val3, err := getFloatValue(data, key3) + if err != nil { + return nil, err + } + results = append(results, val3-baseValue) + + val4, err := getFloatValue(data, key4) + if err != nil { + return nil, err + } + results = append(results, val4-baseValue) + } + + return results, nil +} + +func getFloatValue(data map[string]any, key string) (float64, error) { + value, exists := data[key] + if !exists { + return 0, fmt.Errorf("缺少必需的键:%s", key) + } + + switch v := value.(type) { + case float64: + return v, nil + case int: + return float64(v), nil + case float32: + return float64(v), nil + default: + return 0, fmt.Errorf("键 %s 的值类型错误,期望数字类型,得到 %T", key, value) + } +} + +func hasKeys(data map[string]any, keys ...string) bool { + for _, key := range keys { + if _, exists := data[key]; !exists { + return false + } + } + return true +} + +type calculationResult struct { + BaseValue float64 + Changes []float64 + Size int + BaseType string // "normal", "warning", "edge" + Message string +} + +func calculateBaseValueEnhanced(data map[string]any) (calculationResult, error) { + result := calculationResult{} + if edge, exists := data["edge"]; exists { + value, err := calculateEdgeValue(edge) + if err != nil { + return result, err + } + if edge == "raising" { + result.Changes = []float64{1.0} + } else { + result.Changes = []float64{0.0} + } + + result.BaseValue = value + result.BaseType = "TI" + result.Message = "边沿触发基准值" + return result, nil + } + + hasUpDown := hasKeys(data, "up", "down") + hasUpUpDownDown := hasKeys(data, "upup", "downdown") + result.BaseType = "TE" + switch { + case hasUpDown && hasUpUpDownDown: + value, err := calculateAverage(data, "up", "down") + if err != nil { + return result, err + } + result.BaseValue = value + result.Changes, err = calculateChanges(data, value, false, 4) + if err != nil { + return result, err + } + result.Message = "上下限基准值(忽略预警上上下下限)" + return result, nil + + case hasUpDown: + value, err := calculateAverage(data, "up", "down") + if err != nil { + return result, err + } + result.BaseValue = value + result.Changes, err = calculateChanges(data, value, false, 2) + if err != nil { + return result, err + } + result.Message = "上下限基准值" + return result, nil + + case hasUpUpDownDown: + value, err := calculateAverage(data, "upup", "downdown") + if err != nil { + return result, err + } + result.BaseValue = value + result.Changes, err = calculateChanges(data, value, true, 2) + if err != nil { + return result, err + } + result.Message = "上上下下限基准值" + return result, nil + + default: + return result, fmt.Errorf("不支持的数据结构: %v", data) + } +} + +// OutlierConfig 异常段配置 +type OutlierConfig struct { + Enabled bool // 是否启用异常段 + Count int // 异常段数量 (0=随机, 1-5=指定数量) + MinLength int // 异常段最小长度 + MaxLength int // 异常段最大长度 + Intensity float64 // 异常强度系数 (1.0=轻微超出, 2.0=显著超出) + Distribution string // 分布类型 "both"-上下都有, "upper"-只向上, "lower"-只向下 +} + +// GenerateFloatSliceWithOutliers 生成包含连续异常段的数据 +// baseValue: 基准值 +// changes: 变化范围,每2个元素为一组 [minChange1, maxChange1, minChange2, maxChange2, ...] +// size: 生成的切片长度 +// variationType: 变化类型 +// outlierConfig: 异常段配置 +func generateFloatSliceWithOutliers(baseValue float64, changes []float64, size int, variationType string, outlierConfig OutlierConfig) ([]float64, error) { + // 先生成正常数据 + data, err := generateFloatSlice(baseValue, changes, size, variationType) + if err != nil { + return nil, err + } + + // 插入异常段 + if outlierConfig.Enabled { + data = insertOutliers(data, baseValue, changes, outlierConfig) + } + + return data, nil +} + +// 插入异常段 +func insertOutliers(data []float64, baseValue float64, changes []float64, config OutlierConfig) []float64 { + if len(data) == 0 || !config.Enabled { + return data + } + + // 获取变化范围的边界 + minBound, maxBound := getChangeBounds(baseValue, changes) + // TODO delete + fmt.Printf("获取变化范围的边界,min:%.4f,max:%.4f\n", minBound, maxBound) + + // 确定异常段数量 + outlierCount := config.Count + if outlierCount == 0 { + // 随机生成1-3个异常段 + outlierCount = rand.Intn(3) + 1 + } + + // 计算最大可能的异常段数量 + maxPossibleOutliers := len(data) / (config.MinLength + 10) + if outlierCount > maxPossibleOutliers { + outlierCount = maxPossibleOutliers + } + + // 生成异常段位置 + segments := generateOutlierSegments(len(data), config.MinLength, config.MaxLength, outlierCount, config.Distribution) + // TODO 调试信息待删除 + fmt.Printf("生成异常段位置:%+v\n", segments) + // 插入异常数据 + for _, segment := range segments { + data = insertOutlierSegment(data, segment, minBound, maxBound, config) + } + return data +} + +// 获取变化范围的边界 +func getChangeBounds(baseValue float64, changes []float64) (minBound, maxBound float64) { + if len(changes) == 0 { + return baseValue - 10, baseValue + 10 + } + + ranges := normalizeRanges(changes) + minBound, maxBound = baseValue+ranges[0][0], baseValue+ranges[0][1] + + for _, r := range ranges { + if baseValue+r[0] < minBound { + minBound = baseValue + r[0] + } + if baseValue+r[1] > maxBound { + maxBound = baseValue + r[1] + } + } + + return minBound, maxBound +} + +// OutlierSegment 异常段定义 +type OutlierSegment struct { + Start int + Length int + Type string // "upper"-向上异常, "lower"-向下异常 +} + +func generateOutlierSegments(totalSize, minLength, maxLength, count int, distribution string) []OutlierSegment { + if count == 0 { + return nil + } + + segments := make([]OutlierSegment, 0, count) + usedPositions := make(map[int]bool) + + for i := 0; i < count; i++ { + // 尝试多次寻找合适的位置 + for attempt := 0; attempt < 10; attempt++ { + length := rand.Intn(maxLength-minLength+1) + minLength + start := rand.Intn(totalSize - length) + + // 检查是否与已有段重叠 + overlap := false + for pos := start; pos < start+length; pos++ { + if usedPositions[pos] { + overlap = true + break + } + } + + if !overlap { + // 标记已使用的位置 + for pos := start; pos < start+length; pos++ { + usedPositions[pos] = true + } + + // 根据 distribution 配置决定异常类型 + var outlierType string + switch distribution { + case "upper": + outlierType = "upper" + case "lower": + outlierType = "lower" + case "both": + fallthrough + default: + if rand.Float64() < 0.5 { + outlierType = "upper" + } else { + outlierType = "lower" + } + } + + segments = append(segments, OutlierSegment{ + Start: start, + Length: length, + Type: outlierType, + }) + break + } + } + } + + return segments +} + +func insertOutlierSegment(data []float64, segment OutlierSegment, minBound, maxBound float64, config OutlierConfig) []float64 { + rangeWidth := maxBound - minBound + + // 确定整个异常段的方向 + outlierType := segment.Type + if outlierType == "" { + switch config.Distribution { + case "upper": + outlierType = "upper" + case "lower": + outlierType = "lower" + default: + if rand.Float64() < 0.5 { + outlierType = "upper" + } else { + outlierType = "lower" + } + } + } + + // 为整个段生成同方向异常值 + for i := segment.Start; i < segment.Start+segment.Length && i < len(data); i++ { + excess := rangeWidth * (0.3 + rand.Float64()*config.Intensity) + + if outlierType == "upper" { + data[i] = maxBound + excess + } else { + data[i] = minBound - excess + } + } + + return data +} + +func detectOutlierSegments(data []float64, baseValue float64, changes []float64, minSegmentLength int) []OutlierSegment { + if len(data) == 0 { + return nil + } + + minBound, maxBound := getChangeBounds(baseValue, changes) + var segments []OutlierSegment + currentStart := -1 + currentType := "" + + for i, value := range data { + isOutlier := value > maxBound || value < minBound + + if isOutlier { + outlierType := "upper" + if value < minBound { + outlierType = "lower" + } + + if currentStart == -1 { + // 开始新的异常段 + currentStart = i + currentType = outlierType + } else if currentType != outlierType { + // 类型变化,结束当前段 + if i-currentStart >= minSegmentLength { + segments = append(segments, OutlierSegment{ + Start: currentStart, + Length: i - currentStart, + Type: currentType, + }) + } + currentStart = i + currentType = outlierType + } + } else { + if currentStart != -1 { + // 结束当前异常段 + if i-currentStart >= minSegmentLength { + segments = append(segments, OutlierSegment{ + Start: currentStart, + Length: i - currentStart, + Type: currentType, + }) + } + currentStart = -1 + currentType = "" + } + } + } + + // 处理最后的异常段 + if currentStart != -1 && len(data)-currentStart >= minSegmentLength { + segments = append(segments, OutlierSegment{ + Start: currentStart, + Length: len(data) - currentStart, + Type: currentType, + }) + } + + return segments +} + +func generateFloatSlice(baseValue float64, changes []float64, size int, variationType string) ([]float64, error) { + return generateRandomData(baseValue, changes, size), nil +} + +func normalizeRanges(changes []float64) [][2]float64 { + ranges := make([][2]float64, len(changes)/2) + for i := 0; i < len(changes); i += 2 { + min, max := changes[i], changes[i+1] + if min > max { + min, max = max, min + } + ranges[i/2] = [2]float64{min, max} + } + return ranges +} + +func generateRandomData(baseValue float64, changes []float64, size int) []float64 { + data := make([]float64, size) + ranges := normalizeRanges(changes) + for i := range data { + rangeIdx := rand.Intn(len(ranges)) + minChange := ranges[rangeIdx][0] + maxChange := ranges[rangeIdx][1] + change := minChange + rand.Float64()*(maxChange-minChange) + data[i] = baseValue + change + } + return data +} + +func main() { + ctx := context.Background() + + pgURI := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", "192.168.1.101", 5432, "postgres", "coslight", "demo") + + db, err := gorm.Open(postgres.Open(pgURI)) + if err != nil { + panic(err) + } + + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + var measurements []orm.Measurement + result := db.WithContext(cancelCtx).Find(&measurements) + if result.Error != nil { + panic(result.Error) + } + fmt.Println("总共读取到测量点数量:", len(measurements)) + measInfos := processMeasurements(measurements) + + // 测量点数据生成(包含异常数据) + // 配置异常段参数 + outlierConfig := OutlierConfig{ + Enabled: true, // 是否产生异常段数据 + Count: 2, // 异常段数量 + MinLength: 10, // 异常段最小连续长度 + MaxLength: 15, // 异常段最大连续长度 + Intensity: 1.5, // 异常强度 + Distribution: "both", // 分布类型 + } + + // 为每个测量点生成包含异常的数据 + generatedData := make(map[string][]float64) + detectedSegments := make(map[string][]OutlierSegment) + + for key, measInfo := range measInfos { + fmt.Printf("\n处理测量点: %s\n", key) + fmt.Printf(" 基准值: %.4f, 变化范围: %v, 数据长度: %d, 类型: %s\n", + measInfo.BaseValue, measInfo.Changes, measInfo.Size, measInfo.BaseType) + + minBound, maxBound := getChangeBounds(measInfo.BaseValue, measInfo.Changes) + fmt.Printf(" 计算边界: [%.4f, %.4f]\n", minBound, maxBound) + + // 根据基准值类型决定如何处理 + switch measInfo.BaseType { + case "TI": + // 边沿触发类型,生成特殊处理的数据 + fmt.Printf(" 边沿触发类型,跳过异常数据生成\n") + continue + case "TE": + // 正常上下限类型,生成包含异常的数据 + if len(measInfo.Changes) == 0 { + fmt.Printf(" 无变化范围数据,跳过\n") + continue + } + + // 根据变化范围数量调整异常配置 + config := outlierConfig + if len(measInfo.Changes) == 2 { + // 只有上下限 + config.Distribution = "both" + } else if len(measInfo.Changes) == 4 { + // 有上下限和预警上下限 + config.Distribution = "both" + config.Intensity = 2.0 // 增强异常强度 + } + + // 生成包含异常的数据 + data, err := generateFloatSliceWithOutliers( + measInfo.BaseValue, + measInfo.Changes, + measInfo.Size, + "random", + config, + ) + if err != nil { + fmt.Printf(" 生成异常数据失败: %v\n", err) + continue + } + + // 保存生成的数据 + generatedData[key] = data + + // 检测异常段 + segments := detectOutlierSegments(data, measInfo.BaseValue, measInfo.Changes, config.MinLength) + detectedSegments[key] = segments + + fmt.Printf(" 成功生成数据,长度: %d\n", len(data)) + fmt.Printf(" 检测到异常段数量: %d\n", len(segments)) + + // 显示异常段详情 + for i, segment := range segments { + fmt.Printf(" 异常段%d: 位置[%d-%d], 长度=%d, 类型=%s\n", + i+1, segment.Start, segment.Start+segment.Length-1, segment.Length, segment.Type) + } + } + } + + // 统计信息 + fmt.Println("\n=== 生成结果统计 ===") + fmt.Printf("成功处理的测量点数量: %d/%d\n", len(generatedData), len(measInfos)) + + totalOutliers := 0 + for key, segments := range detectedSegments { + totalOutliers += len(segments) + fmt.Printf(" %s: %d个异常段\n", key, len(segments)) + } + fmt.Printf("总共检测到异常段: %d\n", totalOutliers) + + // 示例:显示特定测量点的详细数据 + if len(generatedData) > 0 { + fmt.Println("\n=== 示例数据详情 ===") + // 取第一个测量点显示详细数据 + for key, data := range generatedData { + fmt.Printf("测量点: %s\n", key) + fmt.Printf("前30个数据值:\n") + minBound, maxBound := getChangeBounds(measInfos[key].BaseValue, measInfos[key].Changes) + for i := 0; i < min(30, len(data)); i++ { + status := "正常" + if data[i] > maxBound { + status = "↑异常" + } else if data[i] < minBound { + status = "↓异常" + } + fmt.Printf(" [%d] %.4f (%s)\n", i, data[i], status) + } + break // 只显示第一个 + } + } + + // Redis写入代码 + // rdb := initRedisClient() + // gCtx, cancel := context.WithCancel(ctx) + // defer cancel() + // go simulateDataWrite(gCtx, rdb) + // TODO 添加singal监听,优雅退出 + // cancel() + // time.Sleep(2 * time.Second) + // fmt.Println("程序已退出。") +}