add data_injection func to mock real time data in redis

This commit is contained in:
douxu 2025-11-20 17:37:12 +08:00
parent 357d06868e
commit 36f267aec7
1 changed files with 802 additions and 0 deletions

View File

@ -0,0 +1,802 @@
// Package main implement redis test data injection
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"modelRT/orm"
redis "github.com/redis/go-redis/v9"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// Redis配置
const (
RedisAddr = "localhost:6379" // 您的Redis地址
KeyName = "telemetry:sensor_101" // 存储实时数据的 ZSet 键名
)
func initRedisClient() *redis.Client {
rdb := redis.NewClient(&redis.Options{
Addr: RedisAddr,
Password: "", // 如果有密码,请填写
DB: 0, // 使用默认数据库
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := rdb.Ping(ctx).Result()
if err != nil {
return nil
}
return rdb
}
// simulateDataWrite 定时生成并写入模拟数据到 Redis ZSet
func simulateDataWrite(ctx context.Context, rdb *redis.Client) {
// 设定写入频率:每秒写入一次
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
// 用于生成模拟数据的初始值
currentValue := 25.0
// 模拟数据变化的方向
direction := 0.1
for {
select {
case <-ctx.Done():
fmt.Printf("\n[%s] 写入程序已停止。\n", KeyName)
return // 收到停止信号,退出 goroutine
case <-ticker.C:
// 1. 生成新的模拟数据
currentValue += direction
if currentValue > 30.0 || currentValue < 20.0 {
direction *= -1 // 达到边界,反转变化方向
}
// 2. 准备写入数据
// ZSet的 Score 使用当前时间戳(秒级)
score := float64(time.Now().Unix())
// ZSet的 Member 使用值(转换为字符串)
// 在您的实时数据处理场景中Member 通常存储值Score 存储时间戳
memberValue := fmt.Sprintf("%.2f", currentValue)
z := redis.Z{
Score: score,
Member: memberValue,
}
// 3. 执行 ZADD 写入操作
err := rdb.ZAdd(ctx, KeyName, z).Err()
if err != nil {
log.Printf("写入 Redis 失败 (%s)%v", KeyName, err)
} else {
fmt.Printf("[%s] 成功写入:时间戳=%.0f, 值=%s\n", KeyName, score, memberValue)
}
}
}
}
func processMeasurements(measurements []orm.Measurement) map[string]calculationResult {
results := make(map[string]calculationResult, len(measurements))
for _, measurement := range measurements {
// 检查 DataSource 是否存在且 type 为 1
if measurement.DataSource == nil {
fmt.Println(11111)
continue
}
// 检查 type 是否为 1
dataType, typeExists := measurement.DataSource["type"]
if !typeExists {
fmt.Println(22222)
continue
}
// 类型断言,处理不同的数字类型
var typeValue int
switch v := dataType.(type) {
case int:
typeValue = v
case float64:
typeValue = int(v)
case int64:
typeValue = int(v)
default:
continue
}
if typeValue != 1 {
fmt.Println(33333)
continue
}
// 获取 io_address
ioAddressRaw, ioExists := measurement.DataSource["io_address"]
if !ioExists {
fmt.Println(44444)
continue
}
ioAddress, ok := ioAddressRaw.(map[string]any)
if !ok {
fmt.Println(44444)
continue
}
station, _ := ioAddress["station"].(string)
device, _ := ioAddress["device"].(string)
channel, _ := ioAddress["channel"].(string)
result := fmt.Sprintf("%s:%s:phasor:%s", station, device, channel)
if measurement.EventPlan == nil {
fmt.Println(55555)
continue
}
causeValue, causeExist := measurement.EventPlan["cause"]
if !causeExist {
fmt.Println(66666)
continue
}
causeMap, ok := causeValue.(map[string]any)
if !ok {
fmt.Println(77777)
continue
}
calResult, err := calculateBaseValueEnhanced(causeMap)
if err != nil {
fmt.Println(88888, err)
continue
}
calResult.Size = measurement.Size
results[result] = calResult
}
return results
}
func calculateBaseValue(data map[string]any) (float64, error) {
// 检查边沿触发类型
if edge, exists := data["edge"]; exists {
return calculateEdgeValue(edge)
}
// 检查上下限和预警上下限
hasUpDown := hasKeys(data, "up", "down")
hasUpUpDownDown := hasKeys(data, "upup", "downdown")
switch {
case hasUpDown && hasUpUpDownDown:
// 同时包含四个键,使用 up 和 down 计算基准值
return calculateAverage(data, "up", "down")
case hasUpDown:
// 只有上下限
return calculateAverage(data, "up", "down")
case hasUpUpDownDown:
// 只有预警上上限与下下限
return calculateAverage(data, "upup", "downdown")
default:
return 0, fmt.Errorf("不支持的数据结构: %v", data)
}
}
func calculateEdgeValue(edge any) (float64, error) {
edgeStr, ok := edge.(string)
if !ok {
return 0, fmt.Errorf("edge 字段类型错误,期望 string,得到 %T", edge)
}
switch edgeStr {
case "raising":
return 1.0, nil
case "falling":
return 0.0, nil
default:
return 0, fmt.Errorf("不支持的 edge 值: %s", edgeStr)
}
}
func calculateAverage(data map[string]any, key1, key2 string) (float64, error) {
val1, err := getFloatValue(data, key1)
if err != nil {
return 0, err
}
val2, err := getFloatValue(data, key2)
if err != nil {
return 0, err
}
return (val1 + val2) / 2.0, nil
}
func calculateChanges(data map[string]any, baseValue float64, maxLimt bool, limitNum int) ([]float64, error) {
results := make([]float64, 0, limitNum)
switch limitNum {
case 2:
var key1, key2 string
if maxLimt {
key1 = "upup"
key2 = "downdown"
} else {
key1 = "up"
key2 = "down"
}
val1, err := getFloatValue(data, key1)
if err != nil {
return nil, err
}
results = append(results, val1-baseValue)
val2, err := getFloatValue(data, key2)
if err != nil {
return nil, err
}
results = append(results, val2-baseValue)
case 4:
key1 := "up"
key2 := "down"
key3 := "upup"
key4 := "downdown"
val1, err := getFloatValue(data, key1)
if err != nil {
return nil, err
}
results = append(results, val1-baseValue)
val2, err := getFloatValue(data, key2)
if err != nil {
return nil, err
}
results = append(results, val2-baseValue)
val3, err := getFloatValue(data, key3)
if err != nil {
return nil, err
}
results = append(results, val3-baseValue)
val4, err := getFloatValue(data, key4)
if err != nil {
return nil, err
}
results = append(results, val4-baseValue)
}
return results, nil
}
func getFloatValue(data map[string]any, key string) (float64, error) {
value, exists := data[key]
if !exists {
return 0, fmt.Errorf("缺少必需的键:%s", key)
}
switch v := value.(type) {
case float64:
return v, nil
case int:
return float64(v), nil
case float32:
return float64(v), nil
default:
return 0, fmt.Errorf("键 %s 的值类型错误,期望数字类型,得到 %T", key, value)
}
}
func hasKeys(data map[string]any, keys ...string) bool {
for _, key := range keys {
if _, exists := data[key]; !exists {
return false
}
}
return true
}
type calculationResult struct {
BaseValue float64
Changes []float64
Size int
BaseType string // "normal", "warning", "edge"
Message string
}
func calculateBaseValueEnhanced(data map[string]any) (calculationResult, error) {
result := calculationResult{}
if edge, exists := data["edge"]; exists {
value, err := calculateEdgeValue(edge)
if err != nil {
return result, err
}
if edge == "raising" {
result.Changes = []float64{1.0}
} else {
result.Changes = []float64{0.0}
}
result.BaseValue = value
result.BaseType = "TI"
result.Message = "边沿触发基准值"
return result, nil
}
hasUpDown := hasKeys(data, "up", "down")
hasUpUpDownDown := hasKeys(data, "upup", "downdown")
result.BaseType = "TE"
switch {
case hasUpDown && hasUpUpDownDown:
value, err := calculateAverage(data, "up", "down")
if err != nil {
return result, err
}
result.BaseValue = value
result.Changes, err = calculateChanges(data, value, false, 4)
if err != nil {
return result, err
}
result.Message = "上下限基准值(忽略预警上上下下限)"
return result, nil
case hasUpDown:
value, err := calculateAverage(data, "up", "down")
if err != nil {
return result, err
}
result.BaseValue = value
result.Changes, err = calculateChanges(data, value, false, 2)
if err != nil {
return result, err
}
result.Message = "上下限基准值"
return result, nil
case hasUpUpDownDown:
value, err := calculateAverage(data, "upup", "downdown")
if err != nil {
return result, err
}
result.BaseValue = value
result.Changes, err = calculateChanges(data, value, true, 2)
if err != nil {
return result, err
}
result.Message = "上上下下限基准值"
return result, nil
default:
return result, fmt.Errorf("不支持的数据结构: %v", data)
}
}
// OutlierConfig 异常段配置
type OutlierConfig struct {
Enabled bool // 是否启用异常段
Count int // 异常段数量 (0=随机, 1-5=指定数量)
MinLength int // 异常段最小长度
MaxLength int // 异常段最大长度
Intensity float64 // 异常强度系数 (1.0=轻微超出, 2.0=显著超出)
Distribution string // 分布类型 "both"-上下都有, "upper"-只向上, "lower"-只向下
}
// GenerateFloatSliceWithOutliers 生成包含连续异常段的数据
// baseValue: 基准值
// changes: 变化范围每2个元素为一组 [minChange1, maxChange1, minChange2, maxChange2, ...]
// size: 生成的切片长度
// variationType: 变化类型
// outlierConfig: 异常段配置
func generateFloatSliceWithOutliers(baseValue float64, changes []float64, size int, variationType string, outlierConfig OutlierConfig) ([]float64, error) {
// 先生成正常数据
data, err := generateFloatSlice(baseValue, changes, size, variationType)
if err != nil {
return nil, err
}
// 插入异常段
if outlierConfig.Enabled {
data = insertOutliers(data, baseValue, changes, outlierConfig)
}
return data, nil
}
// 插入异常段
func insertOutliers(data []float64, baseValue float64, changes []float64, config OutlierConfig) []float64 {
if len(data) == 0 || !config.Enabled {
return data
}
// 获取变化范围的边界
minBound, maxBound := getChangeBounds(baseValue, changes)
// TODO delete
fmt.Printf("获取变化范围的边界,min:%.4f,max:%.4f\n", minBound, maxBound)
// 确定异常段数量
outlierCount := config.Count
if outlierCount == 0 {
// 随机生成1-3个异常段
outlierCount = rand.Intn(3) + 1
}
// 计算最大可能的异常段数量
maxPossibleOutliers := len(data) / (config.MinLength + 10)
if outlierCount > maxPossibleOutliers {
outlierCount = maxPossibleOutliers
}
// 生成异常段位置
segments := generateOutlierSegments(len(data), config.MinLength, config.MaxLength, outlierCount, config.Distribution)
// TODO 调试信息待删除
fmt.Printf("生成异常段位置:%+v\n", segments)
// 插入异常数据
for _, segment := range segments {
data = insertOutlierSegment(data, segment, minBound, maxBound, config)
}
return data
}
// 获取变化范围的边界
func getChangeBounds(baseValue float64, changes []float64) (minBound, maxBound float64) {
if len(changes) == 0 {
return baseValue - 10, baseValue + 10
}
ranges := normalizeRanges(changes)
minBound, maxBound = baseValue+ranges[0][0], baseValue+ranges[0][1]
for _, r := range ranges {
if baseValue+r[0] < minBound {
minBound = baseValue + r[0]
}
if baseValue+r[1] > maxBound {
maxBound = baseValue + r[1]
}
}
return minBound, maxBound
}
// OutlierSegment 异常段定义
type OutlierSegment struct {
Start int
Length int
Type string // "upper"-向上异常, "lower"-向下异常
}
func generateOutlierSegments(totalSize, minLength, maxLength, count int, distribution string) []OutlierSegment {
if count == 0 {
return nil
}
segments := make([]OutlierSegment, 0, count)
usedPositions := make(map[int]bool)
for i := 0; i < count; i++ {
// 尝试多次寻找合适的位置
for attempt := 0; attempt < 10; attempt++ {
length := rand.Intn(maxLength-minLength+1) + minLength
start := rand.Intn(totalSize - length)
// 检查是否与已有段重叠
overlap := false
for pos := start; pos < start+length; pos++ {
if usedPositions[pos] {
overlap = true
break
}
}
if !overlap {
// 标记已使用的位置
for pos := start; pos < start+length; pos++ {
usedPositions[pos] = true
}
// 根据 distribution 配置决定异常类型
var outlierType string
switch distribution {
case "upper":
outlierType = "upper"
case "lower":
outlierType = "lower"
case "both":
fallthrough
default:
if rand.Float64() < 0.5 {
outlierType = "upper"
} else {
outlierType = "lower"
}
}
segments = append(segments, OutlierSegment{
Start: start,
Length: length,
Type: outlierType,
})
break
}
}
}
return segments
}
func insertOutlierSegment(data []float64, segment OutlierSegment, minBound, maxBound float64, config OutlierConfig) []float64 {
rangeWidth := maxBound - minBound
// 确定整个异常段的方向
outlierType := segment.Type
if outlierType == "" {
switch config.Distribution {
case "upper":
outlierType = "upper"
case "lower":
outlierType = "lower"
default:
if rand.Float64() < 0.5 {
outlierType = "upper"
} else {
outlierType = "lower"
}
}
}
// 为整个段生成同方向异常值
for i := segment.Start; i < segment.Start+segment.Length && i < len(data); i++ {
excess := rangeWidth * (0.3 + rand.Float64()*config.Intensity)
if outlierType == "upper" {
data[i] = maxBound + excess
} else {
data[i] = minBound - excess
}
}
return data
}
func detectOutlierSegments(data []float64, baseValue float64, changes []float64, minSegmentLength int) []OutlierSegment {
if len(data) == 0 {
return nil
}
minBound, maxBound := getChangeBounds(baseValue, changes)
var segments []OutlierSegment
currentStart := -1
currentType := ""
for i, value := range data {
isOutlier := value > maxBound || value < minBound
if isOutlier {
outlierType := "upper"
if value < minBound {
outlierType = "lower"
}
if currentStart == -1 {
// 开始新的异常段
currentStart = i
currentType = outlierType
} else if currentType != outlierType {
// 类型变化,结束当前段
if i-currentStart >= minSegmentLength {
segments = append(segments, OutlierSegment{
Start: currentStart,
Length: i - currentStart,
Type: currentType,
})
}
currentStart = i
currentType = outlierType
}
} else {
if currentStart != -1 {
// 结束当前异常段
if i-currentStart >= minSegmentLength {
segments = append(segments, OutlierSegment{
Start: currentStart,
Length: i - currentStart,
Type: currentType,
})
}
currentStart = -1
currentType = ""
}
}
}
// 处理最后的异常段
if currentStart != -1 && len(data)-currentStart >= minSegmentLength {
segments = append(segments, OutlierSegment{
Start: currentStart,
Length: len(data) - currentStart,
Type: currentType,
})
}
return segments
}
func generateFloatSlice(baseValue float64, changes []float64, size int, variationType string) ([]float64, error) {
return generateRandomData(baseValue, changes, size), nil
}
func normalizeRanges(changes []float64) [][2]float64 {
ranges := make([][2]float64, len(changes)/2)
for i := 0; i < len(changes); i += 2 {
min, max := changes[i], changes[i+1]
if min > max {
min, max = max, min
}
ranges[i/2] = [2]float64{min, max}
}
return ranges
}
func generateRandomData(baseValue float64, changes []float64, size int) []float64 {
data := make([]float64, size)
ranges := normalizeRanges(changes)
for i := range data {
rangeIdx := rand.Intn(len(ranges))
minChange := ranges[rangeIdx][0]
maxChange := ranges[rangeIdx][1]
change := minChange + rand.Float64()*(maxChange-minChange)
data[i] = baseValue + change
}
return data
}
func main() {
ctx := context.Background()
pgURI := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", "192.168.1.101", 5432, "postgres", "coslight", "demo")
db, err := gorm.Open(postgres.Open(pgURI))
if err != nil {
panic(err)
}
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var measurements []orm.Measurement
result := db.WithContext(cancelCtx).Find(&measurements)
if result.Error != nil {
panic(result.Error)
}
fmt.Println("总共读取到测量点数量:", len(measurements))
measInfos := processMeasurements(measurements)
// 测量点数据生成(包含异常数据)
// 配置异常段参数
outlierConfig := OutlierConfig{
Enabled: true, // 是否产生异常段数据
Count: 2, // 异常段数量
MinLength: 10, // 异常段最小连续长度
MaxLength: 15, // 异常段最大连续长度
Intensity: 1.5, // 异常强度
Distribution: "both", // 分布类型
}
// 为每个测量点生成包含异常的数据
generatedData := make(map[string][]float64)
detectedSegments := make(map[string][]OutlierSegment)
for key, measInfo := range measInfos {
fmt.Printf("\n处理测量点: %s\n", key)
fmt.Printf(" 基准值: %.4f, 变化范围: %v, 数据长度: %d, 类型: %s\n",
measInfo.BaseValue, measInfo.Changes, measInfo.Size, measInfo.BaseType)
minBound, maxBound := getChangeBounds(measInfo.BaseValue, measInfo.Changes)
fmt.Printf(" 计算边界: [%.4f, %.4f]\n", minBound, maxBound)
// 根据基准值类型决定如何处理
switch measInfo.BaseType {
case "TI":
// 边沿触发类型,生成特殊处理的数据
fmt.Printf(" 边沿触发类型,跳过异常数据生成\n")
continue
case "TE":
// 正常上下限类型,生成包含异常的数据
if len(measInfo.Changes) == 0 {
fmt.Printf(" 无变化范围数据,跳过\n")
continue
}
// 根据变化范围数量调整异常配置
config := outlierConfig
if len(measInfo.Changes) == 2 {
// 只有上下限
config.Distribution = "both"
} else if len(measInfo.Changes) == 4 {
// 有上下限和预警上下限
config.Distribution = "both"
config.Intensity = 2.0 // 增强异常强度
}
// 生成包含异常的数据
data, err := generateFloatSliceWithOutliers(
measInfo.BaseValue,
measInfo.Changes,
measInfo.Size,
"random",
config,
)
if err != nil {
fmt.Printf(" 生成异常数据失败: %v\n", err)
continue
}
// 保存生成的数据
generatedData[key] = data
// 检测异常段
segments := detectOutlierSegments(data, measInfo.BaseValue, measInfo.Changes, config.MinLength)
detectedSegments[key] = segments
fmt.Printf(" 成功生成数据,长度: %d\n", len(data))
fmt.Printf(" 检测到异常段数量: %d\n", len(segments))
// 显示异常段详情
for i, segment := range segments {
fmt.Printf(" 异常段%d: 位置[%d-%d], 长度=%d, 类型=%s\n",
i+1, segment.Start, segment.Start+segment.Length-1, segment.Length, segment.Type)
}
}
}
// 统计信息
fmt.Println("\n=== 生成结果统计 ===")
fmt.Printf("成功处理的测量点数量: %d/%d\n", len(generatedData), len(measInfos))
totalOutliers := 0
for key, segments := range detectedSegments {
totalOutliers += len(segments)
fmt.Printf(" %s: %d个异常段\n", key, len(segments))
}
fmt.Printf("总共检测到异常段: %d\n", totalOutliers)
// 示例:显示特定测量点的详细数据
if len(generatedData) > 0 {
fmt.Println("\n=== 示例数据详情 ===")
// 取第一个测量点显示详细数据
for key, data := range generatedData {
fmt.Printf("测量点: %s\n", key)
fmt.Printf("前30个数据值:\n")
minBound, maxBound := getChangeBounds(measInfos[key].BaseValue, measInfos[key].Changes)
for i := 0; i < min(30, len(data)); i++ {
status := "正常"
if data[i] > maxBound {
status = "↑异常"
} else if data[i] < minBound {
status = "↓异常"
}
fmt.Printf(" [%d] %.4f (%s)\n", i, data[i], status)
}
break // 只显示第一个
}
}
// Redis写入代码
// rdb := initRedisClient()
// gCtx, cancel := context.WithCancel(ctx)
// defer cancel()
// go simulateDataWrite(gCtx, rdb)
// TODO 添加singal监听优雅退出
// cancel()
// time.Sleep(2 * time.Second)
// fmt.Println("程序已退出。")
}