diff --git a/real-time-data/compute_analyzer.go b/real-time-data/compute_analyzer.go index 4fc92d5..4afac84 100644 --- a/real-time-data/compute_analyzer.go +++ b/real-time-data/compute_analyzer.go @@ -5,12 +5,14 @@ import ( "context" "errors" "fmt" + "strings" "modelRT/constants" "modelRT/logger" + "modelRT/real-time-data/event" ) -// RealTimeAnalyzer 接口定义了实时数据分析和事件触发的通用方法 +// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering type RealTimeAnalyzer interface { AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) } @@ -74,12 +76,12 @@ type TEAnalyzer struct { Thresholds teEventThresholds } -// AnalyzeAndTriggerEvent 实现了 RealTimeAnalyzer 接口 +// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) { analyzeTEDataLogic(ctx, conf, t.Thresholds, realTimeValues) } -// 封装原 analyzeTEDataAndTriggerEvent 的核心逻辑 +// analyzeTEDataLogic define func to processing telemetry data and event triggering func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) { windowSize := conf.minBreachCount if windowSize <= 0 { @@ -123,12 +125,50 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE } } - // TODO 调用 EventRT 接口进行时间推送 if eventTriggered { - fmt.Println("--- 本次数据切片分析结束:已标记触发事件 ---") - } else { - fmt.Println("--- 本次数据切片分析结束:未检测到持续越限,不触发事件 ---") + command, content := genTEEventCommandAndContent(conf.Action) + // TODO 考虑 content 是否可以为空,先期不允许 + if command == "" || content == "" { + logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content) + return + } + event.TriggerEventAction(ctx, command, content) + return } + logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.") +} + +func genTEEventCommandAndContent(action map[string]any) (command string, content string) { + cmdValue, exist := action["command"] + if !exist { + return "", "" + } + + commandStr, ok := cmdValue.(string) + if !ok { + return "", "" + } + command = commandStr + + paramsValue, exist := action["parametes"] + if !exist { + return command, "" + } + + parameterSlice, ok := paramsValue.([]string) + if !ok { + return command, "" + } + + var builder strings.Builder + for i, parameter := range parameterSlice { + if i > 0 { + builder.WriteString(",") + } + builder.WriteString(parameter) + } + + return command, builder.String() } // tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause @@ -181,12 +221,12 @@ type TIAnalyzer struct { Thresholds tiEventThresholds } -// AnalyzeAndTriggerEvent 实现了 RealTimeAnalyzer 接口 +// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface func (t *TIAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) { analyzeTIDataLogic(ctx, conf, t.Thresholds, realTimeValues) } -// 封装原 analyzeTIDataAndTriggerEvent 的核心逻辑 (使用预计算优化版本) +// analyzeTIDataLogic define func to processing telesignal data and event triggering func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiEventThresholds, realTimeValues []float64) { windowSize := conf.minBreachCount if windowSize <= 0 { @@ -248,10 +288,48 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE } } - // TODO 调用 EventRT 接口进行时间推送 if eventTriggered { - fmt.Println("--- 本次数据切片分析结束:已标记触发事件 ---") - } else { - fmt.Println("--- 本次数据切片分析结束:未检测到持续越限,不触发事件 ---") + command, content := genTIEventCommandAndContent(conf.Action) + // TODO 考虑 content 是否可以为空,先期不允许 + if command == "" || content == "" { + logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content) + return + } + event.TriggerEventAction(ctx, command, content) + return } + logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.") +} + +func genTIEventCommandAndContent(action map[string]any) (command string, content string) { + cmdValue, exist := action["command"] + if !exist { + return "", "" + } + + commandStr, ok := cmdValue.(string) + if !ok { + return "", "" + } + command = commandStr + + paramsValue, exist := action["parametes"] + if !exist { + return command, "" + } + + parameterSlice, ok := paramsValue.([]string) + if !ok { + return command, "" + } + + var builder strings.Builder + for i, parameter := range parameterSlice { + if i > 0 { + builder.WriteString(",") + } + builder.WriteString(parameter) + } + + return command, builder.String() } diff --git a/real-time-data/event/event_handlers.go b/real-time-data/event/event_handlers.go new file mode 100644 index 0000000..63eae64 --- /dev/null +++ b/real-time-data/event/event_handlers.go @@ -0,0 +1,74 @@ +// Package event define real time data evnet operation functions +package event + +import ( + "context" + + "modelRT/logger" +) + +type actionHandler func(ctx context.Context, content string) error + +// actionDispatchMap define variable to store all action handler into map +var actionDispatchMap = map[string]actionHandler{ + "info": handleInfoAction, + "warning": handleWarningAction, + "error": handleErrorAction, + "critical": handleCriticalAction, + "exception": handleExceptionAction, +} + +// TriggerEventAction define func to trigger event by action in compute config +func TriggerEventAction(ctx context.Context, command string, content string) { + handler, exists := actionDispatchMap[command] + if !exists { + logger.Error(ctx, "unknown action command", "command", command) + return + } + err := handler(ctx, content) + if err != nil { + logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err) + return + } + logger.Info(ctx, "action handler success", "command", command, "content", content) +} + +func handleInfoAction(ctx context.Context, content string) error { + // 实际执行发送警告、记录日志等操作 + actionParams := content + // ... logic to send info level event using actionParams ... + logger.Warn(ctx, "trigger info event", "message", actionParams) + return nil +} + +func handleWarningAction(ctx context.Context, content string) error { + // 实际执行发送警告、记录日志等操作 + actionParams := content + // ... logic to send warning level event using actionParams ... + logger.Warn(ctx, "trigger Warning event", "message", actionParams) + return nil +} + +func handleErrorAction(ctx context.Context, content string) error { + // 实际执行发送警告、记录日志等操作 + actionParams := content + // ... logic to send error level event using actionParams ... + logger.Warn(ctx, "trigger error event", "message", actionParams) + return nil +} + +func handleCriticalAction(ctx context.Context, content string) error { + // 实际执行发送警告、记录日志等操作 + actionParams := content + // ... logic to send critical level event using actionParams ... + logger.Warn(ctx, "trigger critical event", "message", actionParams) + return nil +} + +func handleExceptionAction(ctx context.Context, content string) error { + // 实际执行发送警告、记录日志等操作 + actionParams := content + // ... logic to send except level event using actionParams ... + logger.Warn(ctx, "trigger except event", "message", actionParams) + return nil +}