diff --git a/main.go b/main.go index 2c1c4b8..486a7ef 100644 --- a/main.go +++ b/main.go @@ -204,7 +204,7 @@ func main() { logger.Error(ctx, "load topologic info from postgres failed", "error", err) panic(err) } - go realtimedata.StartRealTimeDataComputing(ctx, allMeasurement) + go realtimedata.StartComputingRealTimeDataLimit(ctx, allMeasurement) tree, err := database.QueryTopologicFromDB(ctx, tx) if err != nil { diff --git a/mq/publish_event.go b/mq/publish_event.go index 9270e93..7b0ee6f 100644 --- a/mq/publish_event.go +++ b/mq/publish_event.go @@ -11,10 +11,10 @@ import ( ) // MsgChan define variable of channel to store messages that need to be sent to rabbitMQ -var MsgChan chan string +var MsgChan chan []byte func init() { - MsgChan = make(chan string, 10000) + MsgChan = make(chan []byte, 10000) } const ( @@ -78,7 +78,7 @@ func initEventAlarmChannel(ctx context.Context) (*amqp.Channel, error) { } // PushEventToRabbitMQ define func to push event alarm message to rabbitMQ -func PushEventToRabbitMQ(ctx context.Context, msgChan chan string) { +func PushEventToRabbitMQ(ctx context.Context, msgChan chan []byte) { channel, err := initEventAlarmChannel(ctx) if err != nil { logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err) @@ -125,7 +125,7 @@ func PushEventToRabbitMQ(ctx context.Context, msgChan chan string) { false, // immediate amqp.Publishing{ ContentType: "text/plain", - Body: []byte(msg), + Body: msg, }) cancel() diff --git a/real-time-data/compute_analyzer.go b/real-time-data/compute_analyzer.go index 62a7ecc..c1ae86a 100644 --- a/real-time-data/compute_analyzer.go +++ b/real-time-data/compute_analyzer.go @@ -26,6 +26,13 @@ type teEventThresholds struct { isFloatCause bool } +type teBreachTrigger struct { + breachType string + triggered bool + triggeredValues []float64 + eventOpts []event.EventOption +} + // parseTEThresholds define func to parse telemetry thresholds by casue map func parseTEThresholds(cause map[string]any) (teEventThresholds, error) { t := teEventThresholds{} @@ -84,66 +91,69 @@ func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeCo // analyzeTEDataLogic define func to processing telemetry data and event triggering func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) { windowSize := conf.minBreachCount - if windowSize <= 0 { - logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize) + dataLen := len(realTimeValues) + if dataLen < windowSize || windowSize <= 0 { return } - // mark whether any events have been triggered in this batch - var eventTriggered bool - breachTriggers := map[string]bool{ - "up": false, "upup": false, "down": false, "downdown": false, + statusArray := make([]string, dataLen) + for i, val := range realTimeValues { + statusArray[i] = getTEBreachType(val, thresholds) } - // implement slide window to determine breach counts - for i := 0; i <= len(realTimeValues)-windowSize; i++ { - window := realTimeValues[i : i+windowSize] - firstValueBreachType := getTEBreachType(window[0], thresholds) + breachTriggers := make(map[string]teBreachTrigger) + for i := 0; i <= dataLen-windowSize; i++ { + firstBreachType := statusArray[i] - if firstValueBreachType == "" { + // if the first value in the window does not breach, skip this window directly + if firstBreachType == "" { continue } allMatch := true for j := 1; j < windowSize; j++ { - currentValueBreachType := getTEBreachType(window[j], thresholds) - if currentValueBreachType != firstValueBreachType { + if statusArray[i+j] != firstBreachType { allMatch = false break } } if allMatch { + triggerValues := realTimeValues[i : i+windowSize] // in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data - if !breachTriggers[firstValueBreachType] { - // trigger event - logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1]) + _, exists := breachTriggers[firstBreachType] + if !exists { + logger.Warn(ctx, "event triggered by sliding window", + "breach_type", firstBreachType, + "trigger_values", triggerValues) - breachTriggers[firstValueBreachType] = true - eventTriggered = true + // build Options + opts := []event.EventOption{ + event.WithConditionValue(triggerValues, conf.Cause), + event.WithTEAnalysisResult(firstBreachType), + // TODO 生成 operations并考虑如何放入 event 中 + // event.WithOperations(nil) + } + breachTriggers[firstBreachType] = teBreachTrigger{ + breachType: firstBreachType, + triggered: false, + triggeredValues: triggerValues, + eventOpts: opts, + } } } } - // TODO 记录触发trigger的type即firstValueBreachType,并考虑如何融合更多错误信息 - if eventTriggered { - command, content := genTEEventCommandAndContent(ctx, conf.Action) - // TODO 生成 condition 并考虑如何放入 event 中 - event.WithCondition(conf.Cause) - // TODO 生成 result 并考虑如何放入 event 中 - event.WithResult(map[string]any{"real_time_values": realTimeValues}) - // TODO 生成 operations并考虑如何放入 event 中 - event.WithOperations(nil) - // TODO 考虑 content 是否可以为空,先期不允许 - if command == "" || content == "" { - logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content) - return - } - event.TriggerEventAction(ctx, command, content) - return + + for breachType, trigger := range breachTriggers { + // trigger Action + command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action) + eventName := fmt.Sprintf("telemetry_%s_%s_Breach_Event", mainBody, breachType) + event.TriggerEventAction(ctx, command, eventName, trigger.eventOpts...) + } } -func genTEEventCommandAndContent(ctx context.Context, action map[string]any) (command string, content string) { +func genTEEventCommandAndMainBody(ctx context.Context, action map[string]any) (command string, mainBody string) { cmdValue, exist := action["command"] if !exist { logger.Error(ctx, "can not find command variable into action map", "action", action) @@ -191,7 +201,7 @@ type tiEventThresholds struct { isFloatCause bool } -// parseTEThresholds define func to parse telesignal thresholds by casue map +// parseTIThresholds define func to parse telesignal thresholds by casue map func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) { edgeKey := "edge" t := tiEventThresholds{ @@ -304,18 +314,17 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE } if eventTriggered { - command, content := genTIEventCommandAndContent(conf.Action) - // TODO 考虑 content 是否可以为空,先期不允许 - if command == "" || content == "" { - logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content) + command, mainBody := genTIEventCommandAndMainBody(conf.Action) + if command == "" || mainBody == "" { + logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "main_body", mainBody) return } - event.TriggerEventAction(ctx, command, content) + event.TriggerEventAction(ctx, command, mainBody) return } } -func genTIEventCommandAndContent(action map[string]any) (command string, content string) { +func genTIEventCommandAndMainBody(action map[string]any) (command string, mainBody string) { cmdValue, exist := action["command"] if !exist { return "", "" diff --git a/real-time-data/event/event_handlers.go b/real-time-data/event/event_handlers.go index 8370c0c..841b06e 100644 --- a/real-time-data/event/event_handlers.go +++ b/real-time-data/event/event_handlers.go @@ -3,7 +3,7 @@ package event import ( "context" - "fmt" + "encoding/json" "modelRT/logger" "modelRT/mq" @@ -21,47 +21,64 @@ var actionDispatchMap = map[string]actionHandler{ } // TriggerEventAction define func to trigger event by action in compute config -func TriggerEventAction(ctx context.Context, command string, content string, ops ...EventOption) { +func TriggerEventAction(ctx context.Context, command string, eventName string, ops ...EventOption) { handler, exists := actionDispatchMap[command] if !exists { logger.Error(ctx, "unknown action command", "command", command) return } - err := handler(ctx, content, ops...) + err := handler(ctx, eventName, ops...) if err != nil { - logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err) + logger.Error(ctx, "action handler failed", "command", command, "event_name", eventName, "error", err) return } - logger.Info(ctx, "action handler success", "command", command, "content", content) + logger.Info(ctx, "action handler success", "command", command, "event_name", eventName) } -func handleInfoAction(ctx context.Context, content string, ops ...EventOption) error { - // 实际执行发送警告、记录日志等操作 - actionParams := content - // ... logic to send info level event using actionParams ... - logger.Warn(ctx, "trigger info event", "message", actionParams) +func handleInfoAction(ctx context.Context, eventName string, ops ...EventOption) error { + eventRecord, err := NewGeneralPlatformSoftRecord(eventName, ops...) + if err != nil { + logger.Error(ctx, "generate info event record failed", "error", err) + return err + } + recordBytes, err := json.Marshal(eventRecord) + if err != nil { + logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err) + return err + } + mq.MsgChan <- recordBytes + logger.Info(ctx, "trigger info event", "event_name", eventName) return nil } func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) error { eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...) if err != nil { - logger.Error(ctx, "failed to create event record", "error", err) + logger.Error(ctx, "generate warning event record failed", "error", err) return err } - mq.MsgChan <- fmt.Sprintf("Generated event record: %+v", eventRecord) + recordBytes, err := json.Marshal(eventRecord) + if err != nil { + logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err) + return err + } + mq.MsgChan <- recordBytes logger.Info(ctx, "trigger warning event", "event_name", eventName) return nil } -func handleErrorAction(ctx context.Context, content string, ops ...EventOption) error { - actionParams := content - eventRecord, err := NewCriticalPlatformSoftRecord("ErrorEvent", WithCondition(map[string]any{"message": actionParams})) +func handleErrorAction(ctx context.Context, eventName string, ops ...EventOption) error { + eventRecord, err := NewCriticalPlatformSoftRecord(eventName, ops...) if err != nil { - logger.Error(ctx, "failed to create event record", "error", err) + logger.Error(ctx, "generate error event record failed", "error", err) return err } - mq.MsgChan <- fmt.Sprintf("Generated event record: %+v", eventRecord) + recordBytes, err := json.Marshal(eventRecord) + if err != nil { + logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err) + return err + } + mq.MsgChan <- recordBytes return nil } diff --git a/real-time-data/event/event_options.go b/real-time-data/event/event_options.go index fda0c59..38e7b45 100644 --- a/real-time-data/event/event_options.go +++ b/real-time-data/event/event_options.go @@ -1,6 +1,11 @@ // Package event define real time data evnet operation functions package event +import ( + "maps" + "strings" +) + // EventOption define option function type for event record creation type EventOption func(*EventRecord) @@ -44,3 +49,37 @@ func WithResult(result map[string]any) EventOption { e.Result = result } } + +func WithTEAnalysisResult(breachType string) EventOption { + return func(e *EventRecord) { + if e.Result == nil { + e.Result = make(map[string]any) + } + + description := "数据异常" + switch strings.ToLower(breachType) { + case "upup": + description = "超越上上限" + case "up": + description = "超越上限" + case "down": + description = "超越下限" + case "downdown": + description = "超越下下限" + } + + e.Result["analysis_desc"] = description + e.Result["breach_type"] = breachType + } +} + +// WithConditionValue define option function to set event condition with real time value and extra data +func WithConditionValue(realTimeValue []float64, extraData map[string]any) EventOption { + return func(e *EventRecord) { + if e.Condition == nil { + e.Condition = make(map[string]any) + } + e.Condition["real_time_value"] = realTimeValue + maps.Copy(e.Condition, extraData) + } +} diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_up_down_limit_computing.go similarity index 94% rename from real-time-data/real_time_data_computing.go rename to real-time-data/real_time_data_up_down_limit_computing.go index b41446d..53acc8d 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_up_down_limit_computing.go @@ -27,8 +27,8 @@ func init() { globalComputeState = NewMeasComputeState() } -// StartRealTimeDataComputing define func to start real time data process goroutines by measurement info -func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) { +// StartComputingRealTimeDataLimit define func to start compute real time data up or down limit process goroutines by measurement info +func StartComputingRealTimeDataLimit(ctx context.Context, measurements []orm.Measurement) { for _, measurement := range measurements { enableValue, exist := measurement.EventPlan["enable"] enable, ok := enableValue.(bool) @@ -57,7 +57,7 @@ func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurem enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr) conf.StopGchan = make(chan struct{}) globalComputeState.Store(uuidStr, conf) - logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID) + logger.Info(ctx, "starting computing real time data limit for measurement", "measurement_uuid", measurement.ComponentUUID) go continuousComputation(enrichedCtx, conf) } }