From 5e311a7071c2492ef80939b5efe46849cc58fbc9 Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 21 Nov 2025 17:02:07 +0800 Subject: [PATCH] optimize redis real time data injection func --- deploy/redis-test-data/data_injection.go | 300 ++++++++++------------- util/token.go | 3 +- 2 files changed, 126 insertions(+), 177 deletions(-) diff --git a/deploy/redis-test-data/data_injection.go b/deploy/redis-test-data/data_injection.go index 7a3cbe9..6146f3b 100644 --- a/deploy/redis-test-data/data_injection.go +++ b/deploy/redis-test-data/data_injection.go @@ -6,6 +6,10 @@ import ( "fmt" "log" "math/rand" + "os" + "os/signal" + "strconv" + "syscall" "time" "modelRT/orm" @@ -17,10 +21,11 @@ import ( // Redis配置 const ( - RedisAddr = "localhost:6379" // 您的Redis地址 - KeyName = "telemetry:sensor_101" // 存储实时数据的 ZSet 键名 + RedisAddr = "localhost:6379" ) +var globalRedisClient *redis.Client + func initRedisClient() *redis.Client { rdb := redis.NewClient(&redis.Options{ Addr: RedisAddr, @@ -38,67 +43,17 @@ func initRedisClient() *redis.Client { return rdb } -// simulateDataWrite 定时生成并写入模拟数据到 Redis ZSet -func simulateDataWrite(ctx context.Context, rdb *redis.Client) { - // 设定写入频率:每秒写入一次 - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - // 用于生成模拟数据的初始值 - currentValue := 25.0 - // 模拟数据变化的方向 - direction := 0.1 - - for { - select { - case <-ctx.Done(): - fmt.Printf("\n[%s] 写入程序已停止。\n", KeyName) - return // 收到停止信号,退出 goroutine - - case <-ticker.C: - // 1. 生成新的模拟数据 - currentValue += direction - if currentValue > 30.0 || currentValue < 20.0 { - direction *= -1 // 达到边界,反转变化方向 - } - - // 2. 准备写入数据 - // ZSet的 Score 使用当前时间戳(秒级) - score := float64(time.Now().Unix()) - - // ZSet的 Member 使用值(转换为字符串) - // 在您的实时数据处理场景中,Member 通常存储值,Score 存储时间戳 - memberValue := fmt.Sprintf("%.2f", currentValue) - - z := redis.Z{ - Score: score, - Member: memberValue, - } - - // 3. 执行 ZADD 写入操作 - err := rdb.ZAdd(ctx, KeyName, z).Err() - if err != nil { - log.Printf("写入 Redis 失败 (%s):%v", KeyName, err) - } else { - fmt.Printf("[%s] 成功写入:时间戳=%.0f, 值=%s\n", KeyName, score, memberValue) - } - } - } -} - func processMeasurements(measurements []orm.Measurement) map[string]calculationResult { results := make(map[string]calculationResult, len(measurements)) for _, measurement := range measurements { // 检查 DataSource 是否存在且 type 为 1 if measurement.DataSource == nil { - fmt.Println(11111) continue } // 检查 type 是否为 1 dataType, typeExists := measurement.DataSource["type"] if !typeExists { - fmt.Println(22222) continue } @@ -116,20 +71,17 @@ func processMeasurements(measurements []orm.Measurement) map[string]calculationR } if typeValue != 1 { - fmt.Println(33333) continue } // 获取 io_address ioAddressRaw, ioExists := measurement.DataSource["io_address"] if !ioExists { - fmt.Println(44444) continue } ioAddress, ok := ioAddressRaw.(map[string]any) if !ok { - fmt.Println(44444) continue } @@ -139,23 +91,19 @@ func processMeasurements(measurements []orm.Measurement) map[string]calculationR result := fmt.Sprintf("%s:%s:phasor:%s", station, device, channel) if measurement.EventPlan == nil { - fmt.Println(55555) continue } causeValue, causeExist := measurement.EventPlan["cause"] if !causeExist { - fmt.Println(66666) continue } causeMap, ok := causeValue.(map[string]any) if !ok { - fmt.Println(77777) continue } calResult, err := calculateBaseValueEnhanced(causeMap) if err != nil { - fmt.Println(88888, err) continue } calResult.Size = measurement.Size @@ -381,8 +329,8 @@ func calculateBaseValueEnhanced(data map[string]any) (calculationResult, error) } } -// OutlierConfig 异常段配置 -type OutlierConfig struct { +// outlierConfig 异常段配置 +type outlierConfig struct { Enabled bool // 是否启用异常段 Count int // 异常段数量 (0=随机, 1-5=指定数量) MinLength int // 异常段最小长度 @@ -397,7 +345,7 @@ type OutlierConfig struct { // size: 生成的切片长度 // variationType: 变化类型 // outlierConfig: 异常段配置 -func generateFloatSliceWithOutliers(baseValue float64, changes []float64, size int, variationType string, outlierConfig OutlierConfig) ([]float64, error) { +func generateFloatSliceWithOutliers(baseValue float64, changes []float64, size int, variationType string, outlierConfig outlierConfig) ([]float64, error) { // 先生成正常数据 data, err := generateFloatSlice(baseValue, changes, size, variationType) if err != nil { @@ -413,7 +361,7 @@ func generateFloatSliceWithOutliers(baseValue float64, changes []float64, size i } // 插入异常段 -func insertOutliers(data []float64, baseValue float64, changes []float64, config OutlierConfig) []float64 { +func insertOutliers(data []float64, baseValue float64, changes []float64, config outlierConfig) []float64 { if len(data) == 0 || !config.Enabled { return data } @@ -421,7 +369,7 @@ func insertOutliers(data []float64, baseValue float64, changes []float64, config // 获取变化范围的边界 minBound, maxBound := getChangeBounds(baseValue, changes) // TODO delete - fmt.Printf("获取变化范围的边界,min:%.4f,max:%.4f\n", minBound, maxBound) + log.Printf("获取变化范围的边界,min:%.4f,max:%.4f\n", minBound, maxBound) // 确定异常段数量 outlierCount := config.Count @@ -439,7 +387,7 @@ func insertOutliers(data []float64, baseValue float64, changes []float64, config // 生成异常段位置 segments := generateOutlierSegments(len(data), config.MinLength, config.MaxLength, outlierCount, config.Distribution) // TODO 调试信息待删除 - fmt.Printf("生成异常段位置:%+v\n", segments) + log.Printf("生成异常段位置:%+v\n", segments) // 插入异常数据 for _, segment := range segments { data = insertOutlierSegment(data, segment, minBound, maxBound, config) @@ -534,7 +482,7 @@ func generateOutlierSegments(totalSize, minLength, maxLength, count int, distrib return segments } -func insertOutlierSegment(data []float64, segment OutlierSegment, minBound, maxBound float64, config OutlierConfig) []float64 { +func insertOutlierSegment(data []float64, segment OutlierSegment, minBound, maxBound float64, config outlierConfig) []float64 { rangeWidth := maxBound - minBound // 确定整个异常段的方向 @@ -660,29 +608,125 @@ func generateRandomData(baseValue float64, changes []float64, size int) []float6 return data } +// simulateDataWrite 定时生成并写入模拟数据到 Redis ZSet +func simulateDataWrite(ctx context.Context, rdb *redis.Client, redisKey string, config outlierConfig, measInfo calculationResult) { + log.Printf("启动数据写入程序, Redis Key: %s, 基准值: %.4f, 变化范围: %+v\n", redisKey, measInfo.BaseValue, measInfo.Changes) + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + pipe := rdb.Pipeline() + for { + select { + case <-ctx.Done(): + log.Printf("\n[%s] 写入程序已停止\n", redisKey) + return + case <-ticker.C: + minBound, maxBound := getChangeBounds(measInfo.BaseValue, measInfo.Changes) + log.Printf("计算边界: [%.4f, %.4f]\n", minBound, maxBound) + + // 根据基准值类型决定如何处理 + switch measInfo.BaseType { + case "TI": + // 边沿触发类型,生成特殊处理的数据 + log.Printf("边沿触发类型,跳过异常数据生成\n") + return + case "TE": + // 正常上下限类型,生成包含异常的数据 + if len(measInfo.Changes) == 0 { + log.Printf("无变化范围数据,跳过\n") + return + } + + // 根据变化范围数量调整异常配置 + if len(measInfo.Changes) == 2 { + // 只有上下限 + config.Distribution = "both" + } else if len(measInfo.Changes) == 4 { + // 有上下限和预警上下限 + config.Distribution = "both" + config.Intensity = 2.0 // 增强异常强度 + } + + // 生成包含异常的数据 + data, err := generateFloatSliceWithOutliers( + measInfo.BaseValue, + measInfo.Changes, + measInfo.Size, + "random", + config, + ) + if err != nil { + log.Printf("生成异常数据失败:%v\n", err) + continue + } + + segments := detectOutlierSegments(data, measInfo.BaseValue, measInfo.Changes, config.MinLength) + log.Printf("检测到异常段数量:%d\n", len(segments)) + for i, segment := range segments { + log.Printf("异常段%d: 位置[%d-%d], 长度=%d, 类型=%s\n", + i+1, segment.Start, segment.Start+segment.Length-1, segment.Length, segment.Type) + } + + redisZs := make([]redis.Z, 0, len(data)) + for i := range len(data) { + z := redis.Z{ + Score: data[i], + Member: strconv.FormatInt(time.Now().UnixNano(), 10), + } + redisZs = append(redisZs, z) + } + pipe.ZAdd(ctx, redisKey, redisZs...) + _, err = pipe.Exec(ctx) + if err != nil { + log.Printf("redis pipeline execution failed: %v", err) + } + } + } + } +} + +func gracefulShutdown() { + if globalRedisClient != nil { + if err := globalRedisClient.Close(); err != nil { + log.Printf("关闭 Redis 客户端失败:%v", err) + } else { + log.Println("关闭 Redis 客户端成功") + } + } + time.Sleep(500 * time.Millisecond) + os.Exit(0) +} + func main() { - ctx := context.Background() + rootCtx := context.Background() pgURI := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", "192.168.1.101", 5432, "postgres", "coslight", "demo") - db, err := gorm.Open(postgres.Open(pgURI)) + postgresDBClient, err := gorm.Open(postgres.Open(pgURI)) if err != nil { panic(err) } + defer func() { + sqlDB, err := postgresDBClient.DB() + if err != nil { + panic(err) + } + sqlDB.Close() + }() - cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + cancelCtx, cancel := context.WithTimeout(rootCtx, 5*time.Second) defer cancel() var measurements []orm.Measurement - result := db.WithContext(cancelCtx).Find(&measurements) + result := postgresDBClient.WithContext(cancelCtx).Find(&measurements) if result.Error != nil { panic(result.Error) } - fmt.Println("总共读取到测量点数量:", len(measurements)) + log.Println("总共读取到测量点数量:", len(measurements)) measInfos := processMeasurements(measurements) // 测量点数据生成(包含异常数据) // 配置异常段参数 - outlierConfig := OutlierConfig{ + outlierConfig := outlierConfig{ Enabled: true, // 是否产生异常段数据 Count: 2, // 异常段数量 MinLength: 10, // 异常段最小连续长度 @@ -691,112 +735,16 @@ func main() { Distribution: "both", // 分布类型 } - // 为每个测量点生成包含异常的数据 - generatedData := make(map[string][]float64) - detectedSegments := make(map[string][]OutlierSegment) + globalRedisClient = initRedisClient() + rCancelCtx, cancel := context.WithCancel(rootCtx) + defer cancel() for key, measInfo := range measInfos { - fmt.Printf("\n处理测量点: %s\n", key) - fmt.Printf(" 基准值: %.4f, 变化范围: %v, 数据长度: %d, 类型: %s\n", - measInfo.BaseValue, measInfo.Changes, measInfo.Size, measInfo.BaseType) - - minBound, maxBound := getChangeBounds(measInfo.BaseValue, measInfo.Changes) - fmt.Printf(" 计算边界: [%.4f, %.4f]\n", minBound, maxBound) - - // 根据基准值类型决定如何处理 - switch measInfo.BaseType { - case "TI": - // 边沿触发类型,生成特殊处理的数据 - fmt.Printf(" 边沿触发类型,跳过异常数据生成\n") - continue - case "TE": - // 正常上下限类型,生成包含异常的数据 - if len(measInfo.Changes) == 0 { - fmt.Printf(" 无变化范围数据,跳过\n") - continue - } - - // 根据变化范围数量调整异常配置 - config := outlierConfig - if len(measInfo.Changes) == 2 { - // 只有上下限 - config.Distribution = "both" - } else if len(measInfo.Changes) == 4 { - // 有上下限和预警上下限 - config.Distribution = "both" - config.Intensity = 2.0 // 增强异常强度 - } - - // 生成包含异常的数据 - data, err := generateFloatSliceWithOutliers( - measInfo.BaseValue, - measInfo.Changes, - measInfo.Size, - "random", - config, - ) - if err != nil { - fmt.Printf(" 生成异常数据失败: %v\n", err) - continue - } - - // 保存生成的数据 - generatedData[key] = data - - // 检测异常段 - segments := detectOutlierSegments(data, measInfo.BaseValue, measInfo.Changes, config.MinLength) - detectedSegments[key] = segments - - fmt.Printf(" 成功生成数据,长度: %d\n", len(data)) - fmt.Printf(" 检测到异常段数量: %d\n", len(segments)) - - // 显示异常段详情 - for i, segment := range segments { - fmt.Printf(" 异常段%d: 位置[%d-%d], 长度=%d, 类型=%s\n", - i+1, segment.Start, segment.Start+segment.Length-1, segment.Length, segment.Type) - } - } + go simulateDataWrite(rCancelCtx, globalRedisClient, key, outlierConfig, measInfo) } - // 统计信息 - fmt.Println("\n=== 生成结果统计 ===") - fmt.Printf("成功处理的测量点数量: %d/%d\n", len(generatedData), len(measInfos)) - - totalOutliers := 0 - for key, segments := range detectedSegments { - totalOutliers += len(segments) - fmt.Printf(" %s: %d个异常段\n", key, len(segments)) - } - fmt.Printf("总共检测到异常段: %d\n", totalOutliers) - - // 示例:显示特定测量点的详细数据 - if len(generatedData) > 0 { - fmt.Println("\n=== 示例数据详情 ===") - // 取第一个测量点显示详细数据 - for key, data := range generatedData { - fmt.Printf("测量点: %s\n", key) - fmt.Printf("前30个数据值:\n") - minBound, maxBound := getChangeBounds(measInfos[key].BaseValue, measInfos[key].Changes) - for i := 0; i < min(30, len(data)); i++ { - status := "正常" - if data[i] > maxBound { - status = "↑异常" - } else if data[i] < minBound { - status = "↓异常" - } - fmt.Printf(" [%d] %.4f (%s)\n", i, data[i], status) - } - break // 只显示第一个 - } - } - - // Redis写入代码 - // rdb := initRedisClient() - // gCtx, cancel := context.WithCancel(ctx) - // defer cancel() - // go simulateDataWrite(gCtx, rdb) - // TODO 添加singal监听,优雅退出 - // cancel() - // time.Sleep(2 * time.Second) - // fmt.Println("程序已退出。") + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + gracefulShutdown() } diff --git a/util/token.go b/util/token.go index 8811922..1fba5f3 100644 --- a/util/token.go +++ b/util/token.go @@ -7,6 +7,7 @@ import ( "encoding/base64" "fmt" "os" + "strconv" "strings" "time" ) @@ -22,7 +23,7 @@ func GenerateClientToken(host string, serviceName string, secretKey string) (str return "", fmt.Errorf("TOKEN_SECRET_KEY environment variable not set and no key provided in parameters") } - uniqueID := fmt.Sprintf("%d", time.Now().UnixNano()) + uniqueID := strconv.FormatInt(time.Now().UnixNano(), 10) clientInfo := fmt.Sprintf("host=%s;service=%s;id=%s", host, serviceName, uniqueID) mac := hmac.New(sha256.New, []byte(finalSecretKey))