diff --git a/constants/event.go b/constants/event.go new file mode 100644 index 0000000..67bb63a --- /dev/null +++ b/constants/event.go @@ -0,0 +1,31 @@ +// Package constants define constant variable +package constants + +const ( + // TIBreachTriggerType define out of bounds type constant + TIBreachTriggerType = "trigger" +) + +const ( + // TelemetryUpLimit define telemetry upper limit + TelemetryUpLimit = "up" + // TelemetryUpUpLimit define telemetry upper upper limit + TelemetryUpUpLimit = "upup" + + // TelemetryDownLimit define telemetry limit + TelemetryDownLimit = "down" + // TelemetryDownDownLimit define telemetry lower lower limit + TelemetryDownDownLimit = "downdown" +) + +const ( + // TelesignalRaising define telesignal raising edge + TelesignalRaising = "raising" + // TelesignalFalling define telesignal falling edge + TelesignalFalling = "falling" +) + +const ( + // MinBreachCount define min breach count of real time data + MinBreachCount = 10 +) diff --git a/real-time-data/compute_analyzer.go b/real-time-data/compute_analyzer.go new file mode 100644 index 0000000..4fc92d5 --- /dev/null +++ b/real-time-data/compute_analyzer.go @@ -0,0 +1,257 @@ +// Package realtimedata define real time data operation functions +package realtimedata + +import ( + "context" + "errors" + "fmt" + + "modelRT/constants" + "modelRT/logger" +) + +// RealTimeAnalyzer 接口定义了实时数据分析和事件触发的通用方法 +type RealTimeAnalyzer interface { + AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) +} + +// teEventThresholds define struct of store the telemetry float point threshold parsed from conf field cause +type teEventThresholds struct { + up float64 + upup float64 + down float64 + downdown float64 + isFloatCause bool +} + +// parseTEThresholds define func to parse telemetry thresholds by casue map +func parseTEThresholds(cause map[string]any) (teEventThresholds, error) { + t := teEventThresholds{} + floatKeys := map[string]*float64{ + "upup": &t.upup, + "up": &t.up, + "down": &t.down, + "downdown": &t.downdown, + } + + for key, ptr := range floatKeys { + if value, exists := cause[key]; exists { + if floatVal, ok := value.(float64); ok { + *ptr = floatVal + t.isFloatCause = true + } else { + return teEventThresholds{}, fmt.Errorf("key:%s type is incorrect. expected float64, actual %T", key, value) + } + } + } + + // quickly check mutual exclusion + if _, exists := cause["edge"]; exists && t.isFloatCause { + return teEventThresholds{}, errors.New("cause config error: 'up/down' keys and 'edge' key are mutually exclusive, but both found") + } + return t, nil +} + +// getTEBreachType define func to determine which type of out-of-limit the telemetry real time data belongs to +func getTEBreachType(value float64, t teEventThresholds) string { + if t.upup > 0 && value > t.upup { + return constants.TelemetryUpUpLimit + } + if t.up > 0 && value > t.up { + return constants.TelemetryUpLimit + } + if t.downdown > 0 && value < t.downdown { + return constants.TelemetryDownDownLimit + } + if t.down > 0 && value < t.down { + return constants.TelemetryDownLimit + } + return "" +} + +// TEAnalyzer define struct of store the thresholds required for telemetry and implements the analysis logic. +type TEAnalyzer struct { + Thresholds teEventThresholds +} + +// AnalyzeAndTriggerEvent 实现了 RealTimeAnalyzer 接口 +func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) { + analyzeTEDataLogic(ctx, conf, t.Thresholds, realTimeValues) +} + +// 封装原 analyzeTEDataAndTriggerEvent 的核心逻辑 +func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) { + windowSize := conf.minBreachCount + if windowSize <= 0 { + logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize) + return + } + + // mark whether any events have been triggered in this batch + var eventTriggered bool + breachTriggers := map[string]bool{ + "up": false, "upup": false, "down": false, "downdown": false, + } + + // implement slide window to determine breach counts + for i := 0; i <= len(realTimeValues)-windowSize; i++ { + window := realTimeValues[i : i+windowSize] + firstValueBreachType := getTEBreachType(window[0], thresholds) + + if firstValueBreachType == "" { + continue + } + + allMatch := true + for j := 1; j < windowSize; j++ { + currentValueBreachType := getTEBreachType(window[j], thresholds) + if currentValueBreachType != firstValueBreachType { + allMatch = false + break + } + } + + if allMatch { + // in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data + if !breachTriggers[firstValueBreachType] { + // trigger event + logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1]) + + breachTriggers[firstValueBreachType] = true + eventTriggered = true + } + } + } + + // TODO 调用 EventRT 接口进行时间推送 + if eventTriggered { + fmt.Println("--- 本次数据切片分析结束:已标记触发事件 ---") + } else { + fmt.Println("--- 本次数据切片分析结束:未检测到持续越限,不触发事件 ---") + } +} + +// tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause +type tiEventThresholds struct { + edge string + isFloatCause bool +} + +// parseTEThresholds define func to parse telesignal thresholds by casue map +func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) { + edgeKey := "edge" + t := tiEventThresholds{ + isFloatCause: false, + } + + if value, exists := cause[edgeKey]; exists { + if strVal, ok := value.(string); ok { + switch strVal { + case "raising", "falling": + t.edge = strVal + return t, nil + default: + return tiEventThresholds{}, fmt.Errorf("key:%s value is incorrect, actual value %s. expected 'raising' or 'falling'", edgeKey, strVal) + } + } else { + return tiEventThresholds{}, fmt.Errorf("key:%s already exists but type is incorrect. expected string, actual %T", edgeKey, value) + } + } + + return tiEventThresholds{}, fmt.Errorf("cause map is invalid for telesignal: missing required key '%s'", edgeKey) +} + +// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to +func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string { + if t.edge == constants.TelesignalRaising { + if previousValue == 0.0 && currentValue == 1.0 { + return constants.TIBreachTriggerType + } + } else if t.edge == constants.TelesignalFalling { + if previousValue == 1.0 && currentValue == 0.0 { + return constants.TIBreachTriggerType + } + } + + return "" +} + +// TIAnalyzer define struct of store the thresholds required for remote signaling and implements the analysis logic +type TIAnalyzer struct { + Thresholds tiEventThresholds +} + +// AnalyzeAndTriggerEvent 实现了 RealTimeAnalyzer 接口 +func (t *TIAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) { + analyzeTIDataLogic(ctx, conf, t.Thresholds, realTimeValues) +} + +// 封装原 analyzeTIDataAndTriggerEvent 的核心逻辑 (使用预计算优化版本) +func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiEventThresholds, realTimeValues []float64) { + windowSize := conf.minBreachCount + if windowSize <= 0 { + logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize) + return + } + + numDataPoints := len(realTimeValues) + if numDataPoints < 2 { + logger.Info(ctx, "data points less than 2, no change event possible, analysis skipped", "data_points", numDataPoints) + return + } + + // pre calculate the change event type for all adjacent point pairs + numChanges := numDataPoints - 1 + changeBreachTypes := make([]string, numChanges) + + for i := range numChanges { + previousValue := realTimeValues[i] + currentValue := realTimeValues[i+1] + + changeBreachTypes[i] = getTIBreachType(currentValue, previousValue, thresholds) + } + + if numChanges < windowSize { + logger.Error(ctx, "number of change events is less than window size, analysis skipped", "num_changes", numChanges, "window_size", windowSize) + return + } + + var eventTriggered bool + breachTriggers := map[string]bool{ + constants.TIBreachTriggerType: false, + } + + for i := 0; i <= numChanges-windowSize; i++ { + windowBreachTypes := changeBreachTypes[i : i+windowSize] + firstBreachType := windowBreachTypes[0] + + if firstBreachType == "" { + continue + } + + allMatch := true + for j := 1; j < windowSize; j++ { + if windowBreachTypes[j] != firstBreachType { + allMatch = false + break + } + } + + if allMatch { + if !breachTriggers[firstBreachType] { + finalValueIndex := i + windowSize + logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstBreachType, "value", realTimeValues[finalValueIndex]) + + breachTriggers[firstBreachType] = true + eventTriggered = true + } + } + } + + // TODO 调用 EventRT 接口进行时间推送 + if eventTriggered { + fmt.Println("--- 本次数据切片分析结束:已标记触发事件 ---") + } else { + fmt.Println("--- 本次数据切片分析结束:未检测到持续越限,不触发事件 ---") + } +} diff --git a/real-time-data/compute_state_manager.go b/real-time-data/compute_state_manager.go index 557429a..33754c4 100644 --- a/real-time-data/compute_state_manager.go +++ b/real-time-data/compute_state_manager.go @@ -10,10 +10,13 @@ type ComputeConfig struct { Cause map[string]any Action map[string]any // TODO 预留自由调整的入口 - Duration int - DataSize int64 - QueryKey string - StopGchan chan struct{} + // min consecutive breach count + minBreachCount int + Duration int + DataSize int64 + QueryKey string + StopGchan chan struct{} + Analyzer RealTimeAnalyzer } // MeasComputeState define struct of manages the state of measurement computations using sync.Map diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go index 7d92f41..2b6e9b3 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_computing.go @@ -68,6 +68,8 @@ func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurem } func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) { + var err error + enableValue, exist := measurement.EventPlan["enable"] enable, ok := enableValue.(bool) if !exist { @@ -93,7 +95,10 @@ func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) { if !ok { return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue) } - conf.Cause = cause + conf.Cause, err = processCauseMap(cause) + if err != nil { + return nil, fmt.Errorf("parse content of field cause failed:%w", err) + } actionValue, exist := measurement.EventPlan["action"] if !exist { @@ -110,17 +115,87 @@ func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) { return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err) } conf.QueryKey = queryKey - conf.DataSize = int64(measurement.Size) + // TODO use constant values ​​for temporary settings + conf.minBreachCount = constants.MinBreachCount + + isFloatCause := false + if _, exists := conf.Cause["up"]; exists { + isFloatCause = true + } else if _, exists := conf.Cause["down"]; exists { + isFloatCause = true + } else if _, exists := conf.Cause["upup"]; exists { + isFloatCause = true + } else if _, exists := conf.Cause["downdown"]; exists { + isFloatCause = true + } + + if isFloatCause { + // te config + teThresholds, err := parseTEThresholds(conf.Cause) + if err != nil { + return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err) + } + conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds} + } else { + // ti config + tiThresholds, err := parseTIThresholds(conf.Cause) + if err != nil { + return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err) + } + conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds} + } return conf, nil } +func processCauseMap(data map[string]any) (map[string]any, error) { + causeResult := make(map[string]any) + keysToExtract := []string{"up", "down", "upup", "downdown"} + + var foundFloatKey bool + for _, key := range keysToExtract { + if value, exists := data[key]; exists { + + foundFloatKey = true + + // check value type + if floatVal, ok := value.(float64); ok { + causeResult[key] = floatVal + } else { + return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value) + } + } + } + + if foundFloatKey == true { + return causeResult, nil + } + + edgeKey := "edge" + if value, exists := data[edgeKey]; exists { + if stringVal, ok := value.(string); ok { + switch stringVal { + case "raising": + fallthrough + case "falling": + causeResult[edgeKey] = stringVal + default: + return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value) + } + } else { + return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value) + } + } else { + return nil, fmt.Errorf("key:%s do not exists", edgeKey) + } + return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey) +} + func continuousComputation(ctx context.Context, conf *ComputeConfig) { client := diagram.NewRedisClient() uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string) - // TODO duration 优化为配置项 - duration := util.SecondsToDuration(1) + duration := util.SecondsToDuration(conf.Duration) ticker := time.NewTicker(duration) defer ticker.Stop() startTimestamp := util.GenNanoTsStr() @@ -140,44 +215,15 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) { continue } startTimestamp = stopTimestamp - // TODO 对 redis 数据进行分析 - fmt.Println(members) - } - } -} -func processCauseMap(data map[string]any) { - keysToExtract := []string{"up", "down", "upup", "downdown"} - - for _, key := range keysToExtract { - if value, exists := data[key]; exists { - // 检查类型是否为 float64 - if floatVal, ok := value.(float64); ok { - fmt.Printf("键 '%s' 存在且为 float64: %.2f\n", key, floatVal) + realTimedatas := util.ConvertZSetMembersToFloat64(ctx, members) + if conf.Analyzer != nil { + conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas) } else { - // 键存在,但类型不对,进行错误处理或转换尝试 - fmt.Printf("键 '%s' 存在但类型错误: 期望 float64, 实际 %T\n", key, value) + logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid) } } } - - edgeKey := "edge" - if value, exists := data[edgeKey]; exists { - if stringVal, ok := value.(string); ok { - switch stringVal { - case "raising": - fmt.Println(" -> 识别到 edge: raising") - case "falling": - fmt.Println(" -> 识别到 edge: falling") - default: - fmt.Println(" -> 识别到其他 edge 值") - } - } else { - fmt.Printf("键 '%s' 存在但类型错误: 期望 string, 实际 %T\n", edgeKey, value) - } - } else { - fmt.Printf("键 '%s' 不存在\n", edgeKey) - } } // ReceiveChan define func to real time data receive and process @@ -275,17 +321,17 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics [] } } -type RealTimeDataPayload struct { +type realTimeDataPayload struct { ComponentUUID string Values []float64 } -type RealTimeData struct { - Payload RealTimeDataPayload +type realTimeData struct { + Payload realTimeDataPayload } -func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) { - var realTimeData RealTimeData +func parseKafkaMessage(msgValue []byte) (*realTimeData, error) { + var realTimeData realTimeData err := json.Unmarshal(msgValue, &realTimeData) if err != nil { return nil, fmt.Errorf("unmarshal real time data failed: %w", err) @@ -293,7 +339,7 @@ func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) { return &realTimeData, nil } -func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) { +func processRealTimeData(ctx context.Context, realTimeData *realTimeData) { componentUUID := realTimeData.Payload.ComponentUUID component, err := diagram.GetComponentMap(componentUUID) if err != nil { @@ -314,7 +360,6 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) { var anchorRealTimeData []float64 var calculateFunc func(archorValue float64, args ...float64) float64 - // 收集实时数据 for _, param := range realTimeData.Payload.Values { anchorRealTimeData = append(anchorRealTimeData, param) } @@ -338,10 +383,8 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) { return } - // TODO 使用select避免channel阻塞 select { case anchorChan <- anchorConfig: - // 成功发送 case <-ctx.Done(): logger.Info(ctx, "context done while sending to anchor chan") case <-time.After(5 * time.Second): diff --git a/util/convert.go b/util/convert.go new file mode 100644 index 0000000..c7b637b --- /dev/null +++ b/util/convert.go @@ -0,0 +1,33 @@ +// Package util provide some utility fun +package util + +import ( + "context" + "strconv" + + "modelRT/logger" + + "github.com/redis/go-redis/v9" +) + +// ConvertZSetMembersToFloat64 define func to conver zset member type to float64 +func ConvertZSetMembersToFloat64(ctx context.Context, members []redis.Z) []float64 { + dataFloats := make([]float64, 0, len(members)) + + for _, member := range members { + valStr, ok := member.Member.(string) + if !ok { + logger.Warn(ctx, "redis zset member value is not a string,skipping") + continue + } + + valFloat, err := strconv.ParseFloat(valStr, 64) + if err != nil { + logger.Error(ctx, "failed to parse zset member string to float64", "value", valStr, "error", err) + continue + } + dataFloats = append(dataFloats, valFloat) + } + + return dataFloats +}