// Package realtimedata define real time data operation functions package realtimedata import ( "context" "errors" "fmt" "strings" "modelRT/constants" "modelRT/logger" "modelRT/real-time-data/event" ) // RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering type RealTimeAnalyzer interface { AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) } // teEventThresholds define struct of store the telemetry float point threshold parsed from conf field cause type teEventThresholds struct { up float64 upup float64 down float64 downdown float64 isFloatCause bool } // parseTEThresholds define func to parse telemetry thresholds by casue map func parseTEThresholds(cause map[string]any) (teEventThresholds, error) { t := teEventThresholds{} floatKeys := map[string]*float64{ "upup": &t.upup, "up": &t.up, "down": &t.down, "downdown": &t.downdown, } for key, ptr := range floatKeys { if value, exists := cause[key]; exists { if floatVal, ok := value.(float64); ok { *ptr = floatVal t.isFloatCause = true } else { return teEventThresholds{}, fmt.Errorf("key:%s type is incorrect. expected float64, actual %T", key, value) } } } // quickly check mutual exclusion if _, exists := cause["edge"]; exists && t.isFloatCause { return teEventThresholds{}, errors.New("cause config error: 'up/down' keys and 'edge' key are mutually exclusive, but both found") } return t, nil } // getTEBreachType define func to determine which type of out-of-limit the telemetry real time data belongs to func getTEBreachType(value float64, t teEventThresholds) string { if t.upup > 0 && value > t.upup { return constants.TelemetryUpUpLimit } if t.up > 0 && value > t.up { return constants.TelemetryUpLimit } if t.downdown > 0 && value < t.downdown { return constants.TelemetryDownDownLimit } if t.down > 0 && value < t.down { return constants.TelemetryDownLimit } return "" } // TEAnalyzer define struct of store the thresholds required for telemetry and implements the analysis logic. type TEAnalyzer struct { Thresholds teEventThresholds } // AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) { analyzeTEDataLogic(ctx, conf, t.Thresholds, realTimeValues) } // analyzeTEDataLogic define func to processing telemetry data and event triggering func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) { windowSize := conf.minBreachCount if windowSize <= 0 { logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize) return } // mark whether any events have been triggered in this batch var eventTriggered bool breachTriggers := map[string]bool{ "up": false, "upup": false, "down": false, "downdown": false, } // implement slide window to determine breach counts for i := 0; i <= len(realTimeValues)-windowSize; i++ { window := realTimeValues[i : i+windowSize] firstValueBreachType := getTEBreachType(window[0], thresholds) if firstValueBreachType == "" { continue } allMatch := true for j := 1; j < windowSize; j++ { currentValueBreachType := getTEBreachType(window[j], thresholds) if currentValueBreachType != firstValueBreachType { allMatch = false break } } if allMatch { // in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data if !breachTriggers[firstValueBreachType] { // trigger event logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1]) breachTriggers[firstValueBreachType] = true eventTriggered = true } } } if eventTriggered { command, content := genTEEventCommandAndContent(ctx, conf.Action) // TODO 考虑 content 是否可以为空,先期不允许 if command == "" || content == "" { logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content) return } event.TriggerEventAction(ctx, command, content) return } } func genTEEventCommandAndContent(ctx context.Context, action map[string]any) (command string, content string) { cmdValue, exist := action["command"] if !exist { logger.Error(ctx, "can not find command variable into action map", "action", action) return "", "" } commandStr, ok := cmdValue.(string) if !ok { logger.Error(ctx, "convert command to string type failed", "command", cmdValue, "type", fmt.Sprintf("%T", cmdValue)) return "", "" } command = commandStr paramsValue, exist := action["parameters"] if !exist { logger.Error(ctx, "can not find parameters variable into action map", "action", action) return command, "" } parameterSlice, ok := paramsValue.([]any) if !ok { logger.Error(ctx, "convert parameters to []any type failed", "parameters", paramsValue, "type", fmt.Sprintf("%T", paramsValue)) return command, "" } var builder strings.Builder for i, parameter := range parameterSlice { if i > 0 { builder.WriteString(",") } parameterStr, ok := parameter.(string) if !ok { logger.Warn(ctx, "parameter type is incorrect, skip this parameter", "parameter", parameter, "type", fmt.Sprintf("%T", parameter)) continue } builder.WriteString(parameterStr) } return command, builder.String() } // tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause type tiEventThresholds struct { edge string isFloatCause bool } // parseTEThresholds define func to parse telesignal thresholds by casue map func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) { edgeKey := "edge" t := tiEventThresholds{ isFloatCause: false, } if value, exists := cause[edgeKey]; exists { if strVal, ok := value.(string); ok { switch strVal { case "raising", "falling": t.edge = strVal return t, nil default: return tiEventThresholds{}, fmt.Errorf("key:%s value is incorrect, actual value %s. expected 'raising' or 'falling'", edgeKey, strVal) } } else { return tiEventThresholds{}, fmt.Errorf("key:%s already exists but type is incorrect. expected string, actual %T", edgeKey, value) } } return tiEventThresholds{}, fmt.Errorf("cause map is invalid for telesignal: missing required key '%s'", edgeKey) } // getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string { if t.edge == constants.TelesignalRaising { if previousValue == 0.0 && currentValue == 1.0 { return constants.TIBreachTriggerType } } else if t.edge == constants.TelesignalFalling { if previousValue == 1.0 && currentValue == 0.0 { return constants.TIBreachTriggerType } } return "" } // TIAnalyzer define struct of store the thresholds required for remote signaling and implements the analysis logic type TIAnalyzer struct { Thresholds tiEventThresholds } // AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface func (t *TIAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) { analyzeTIDataLogic(ctx, conf, t.Thresholds, realTimeValues) } // analyzeTIDataLogic define func to processing telesignal data and event triggering func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiEventThresholds, realTimeValues []float64) { windowSize := conf.minBreachCount if windowSize <= 0 { logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize) return } numDataPoints := len(realTimeValues) if numDataPoints < 2 { logger.Info(ctx, "data points less than 2, no change event possible, analysis skipped", "data_points", numDataPoints) return } // pre calculate the change event type for all adjacent point pairs numChanges := numDataPoints - 1 changeBreachTypes := make([]string, numChanges) for i := range numChanges { previousValue := realTimeValues[i] currentValue := realTimeValues[i+1] changeBreachTypes[i] = getTIBreachType(currentValue, previousValue, thresholds) } if numChanges < windowSize { logger.Error(ctx, "number of change events is less than window size, analysis skipped", "num_changes", numChanges, "window_size", windowSize) return } var eventTriggered bool breachTriggers := map[string]bool{ constants.TIBreachTriggerType: false, } for i := 0; i <= numChanges-windowSize; i++ { windowBreachTypes := changeBreachTypes[i : i+windowSize] firstBreachType := windowBreachTypes[0] if firstBreachType == "" { continue } allMatch := true for j := 1; j < windowSize; j++ { if windowBreachTypes[j] != firstBreachType { allMatch = false break } } if allMatch { if !breachTriggers[firstBreachType] { finalValueIndex := i + windowSize logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstBreachType, "value", realTimeValues[finalValueIndex]) breachTriggers[firstBreachType] = true eventTriggered = true } } } if eventTriggered { command, content := genTIEventCommandAndContent(conf.Action) // TODO 考虑 content 是否可以为空,先期不允许 if command == "" || content == "" { logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content) return } event.TriggerEventAction(ctx, command, content) return } } func genTIEventCommandAndContent(action map[string]any) (command string, content string) { cmdValue, exist := action["command"] if !exist { return "", "" } commandStr, ok := cmdValue.(string) if !ok { return "", "" } command = commandStr paramsValue, exist := action["parametes"] if !exist { return command, "" } parameterSlice, ok := paramsValue.([]string) if !ok { return command, "" } var builder strings.Builder for i, parameter := range parameterSlice { if i > 0 { builder.WriteString(",") } builder.WriteString(parameter) } return command, builder.String() }