optimize code of push event to rabbitmq

This commit is contained in:
douxu 2026-02-26 16:48:12 +08:00
parent 2ececc38d9
commit f6bb3fb985
6 changed files with 132 additions and 67 deletions

View File

@ -204,7 +204,7 @@ func main() {
logger.Error(ctx, "load topologic info from postgres failed", "error", err) logger.Error(ctx, "load topologic info from postgres failed", "error", err)
panic(err) panic(err)
} }
go realtimedata.StartRealTimeDataComputing(ctx, allMeasurement) go realtimedata.StartComputingRealTimeDataLimit(ctx, allMeasurement)
tree, err := database.QueryTopologicFromDB(ctx, tx) tree, err := database.QueryTopologicFromDB(ctx, tx)
if err != nil { if err != nil {

View File

@ -11,10 +11,10 @@ import (
) )
// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ // MsgChan define variable of channel to store messages that need to be sent to rabbitMQ
var MsgChan chan string var MsgChan chan []byte
func init() { func init() {
MsgChan = make(chan string, 10000) MsgChan = make(chan []byte, 10000)
} }
const ( const (
@ -78,7 +78,7 @@ func initEventAlarmChannel(ctx context.Context) (*amqp.Channel, error) {
} }
// PushEventToRabbitMQ define func to push event alarm message to rabbitMQ // PushEventToRabbitMQ define func to push event alarm message to rabbitMQ
func PushEventToRabbitMQ(ctx context.Context, msgChan chan string) { func PushEventToRabbitMQ(ctx context.Context, msgChan chan []byte) {
channel, err := initEventAlarmChannel(ctx) channel, err := initEventAlarmChannel(ctx)
if err != nil { if err != nil {
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err) logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
@ -125,7 +125,7 @@ func PushEventToRabbitMQ(ctx context.Context, msgChan chan string) {
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: "text/plain", ContentType: "text/plain",
Body: []byte(msg), Body: msg,
}) })
cancel() cancel()

View File

@ -26,6 +26,13 @@ type teEventThresholds struct {
isFloatCause bool isFloatCause bool
} }
type teBreachTrigger struct {
breachType string
triggered bool
triggeredValues []float64
eventOpts []event.EventOption
}
// parseTEThresholds define func to parse telemetry thresholds by casue map // parseTEThresholds define func to parse telemetry thresholds by casue map
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) { func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
t := teEventThresholds{} t := teEventThresholds{}
@ -84,66 +91,69 @@ func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeCo
// analyzeTEDataLogic define func to processing telemetry data and event triggering // analyzeTEDataLogic define func to processing telemetry data and event triggering
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) { func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
windowSize := conf.minBreachCount windowSize := conf.minBreachCount
if windowSize <= 0 { dataLen := len(realTimeValues)
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize) if dataLen < windowSize || windowSize <= 0 {
return return
} }
// mark whether any events have been triggered in this batch statusArray := make([]string, dataLen)
var eventTriggered bool for i, val := range realTimeValues {
breachTriggers := map[string]bool{ statusArray[i] = getTEBreachType(val, thresholds)
"up": false, "upup": false, "down": false, "downdown": false,
} }
// implement slide window to determine breach counts breachTriggers := make(map[string]teBreachTrigger)
for i := 0; i <= len(realTimeValues)-windowSize; i++ { for i := 0; i <= dataLen-windowSize; i++ {
window := realTimeValues[i : i+windowSize] firstBreachType := statusArray[i]
firstValueBreachType := getTEBreachType(window[0], thresholds)
if firstValueBreachType == "" { // if the first value in the window does not breach, skip this window directly
if firstBreachType == "" {
continue continue
} }
allMatch := true allMatch := true
for j := 1; j < windowSize; j++ { for j := 1; j < windowSize; j++ {
currentValueBreachType := getTEBreachType(window[j], thresholds) if statusArray[i+j] != firstBreachType {
if currentValueBreachType != firstValueBreachType {
allMatch = false allMatch = false
break break
} }
} }
if allMatch { if allMatch {
triggerValues := realTimeValues[i : i+windowSize]
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data // in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
if !breachTriggers[firstValueBreachType] { _, exists := breachTriggers[firstBreachType]
// trigger event if !exists {
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1]) logger.Warn(ctx, "event triggered by sliding window",
"breach_type", firstBreachType,
"trigger_values", triggerValues)
breachTriggers[firstValueBreachType] = true // build Options
eventTriggered = true opts := []event.EventOption{
event.WithConditionValue(triggerValues, conf.Cause),
event.WithTEAnalysisResult(firstBreachType),
// TODO 生成 operations并考虑如何放入 event 中
// event.WithOperations(nil)
}
breachTriggers[firstBreachType] = teBreachTrigger{
breachType: firstBreachType,
triggered: false,
triggeredValues: triggerValues,
eventOpts: opts,
}
} }
} }
} }
// TODO 记录触发trigger的type即firstValueBreachType并考虑如何融合更多错误信息
if eventTriggered { for breachType, trigger := range breachTriggers {
command, content := genTEEventCommandAndContent(ctx, conf.Action) // trigger Action
// TODO 生成 condition 并考虑如何放入 event 中 command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action)
event.WithCondition(conf.Cause) eventName := fmt.Sprintf("telemetry_%s_%s_Breach_Event", mainBody, breachType)
// TODO 生成 result 并考虑如何放入 event 中 event.TriggerEventAction(ctx, command, eventName, trigger.eventOpts...)
event.WithResult(map[string]any{"real_time_values": realTimeValues})
// TODO 生成 operations并考虑如何放入 event 中
event.WithOperations(nil)
// TODO 考虑 content 是否可以为空,先期不允许
if command == "" || content == "" {
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
return
}
event.TriggerEventAction(ctx, command, content)
return
} }
} }
func genTEEventCommandAndContent(ctx context.Context, action map[string]any) (command string, content string) { func genTEEventCommandAndMainBody(ctx context.Context, action map[string]any) (command string, mainBody string) {
cmdValue, exist := action["command"] cmdValue, exist := action["command"]
if !exist { if !exist {
logger.Error(ctx, "can not find command variable into action map", "action", action) logger.Error(ctx, "can not find command variable into action map", "action", action)
@ -191,7 +201,7 @@ type tiEventThresholds struct {
isFloatCause bool isFloatCause bool
} }
// parseTEThresholds define func to parse telesignal thresholds by casue map // parseTIThresholds define func to parse telesignal thresholds by casue map
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) { func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
edgeKey := "edge" edgeKey := "edge"
t := tiEventThresholds{ t := tiEventThresholds{
@ -304,18 +314,17 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE
} }
if eventTriggered { if eventTriggered {
command, content := genTIEventCommandAndContent(conf.Action) command, mainBody := genTIEventCommandAndMainBody(conf.Action)
// TODO 考虑 content 是否可以为空,先期不允许 if command == "" || mainBody == "" {
if command == "" || content == "" { logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "main_body", mainBody)
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
return return
} }
event.TriggerEventAction(ctx, command, content) event.TriggerEventAction(ctx, command, mainBody)
return return
} }
} }
func genTIEventCommandAndContent(action map[string]any) (command string, content string) { func genTIEventCommandAndMainBody(action map[string]any) (command string, mainBody string) {
cmdValue, exist := action["command"] cmdValue, exist := action["command"]
if !exist { if !exist {
return "", "" return "", ""

View File

@ -3,7 +3,7 @@ package event
import ( import (
"context" "context"
"fmt" "encoding/json"
"modelRT/logger" "modelRT/logger"
"modelRT/mq" "modelRT/mq"
@ -21,47 +21,64 @@ var actionDispatchMap = map[string]actionHandler{
} }
// TriggerEventAction define func to trigger event by action in compute config // TriggerEventAction define func to trigger event by action in compute config
func TriggerEventAction(ctx context.Context, command string, content string, ops ...EventOption) { func TriggerEventAction(ctx context.Context, command string, eventName string, ops ...EventOption) {
handler, exists := actionDispatchMap[command] handler, exists := actionDispatchMap[command]
if !exists { if !exists {
logger.Error(ctx, "unknown action command", "command", command) logger.Error(ctx, "unknown action command", "command", command)
return return
} }
err := handler(ctx, content, ops...) err := handler(ctx, eventName, ops...)
if err != nil { if err != nil {
logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err) logger.Error(ctx, "action handler failed", "command", command, "event_name", eventName, "error", err)
return return
} }
logger.Info(ctx, "action handler success", "command", command, "content", content) logger.Info(ctx, "action handler success", "command", command, "event_name", eventName)
} }
func handleInfoAction(ctx context.Context, content string, ops ...EventOption) error { func handleInfoAction(ctx context.Context, eventName string, ops ...EventOption) error {
// 实际执行发送警告、记录日志等操作 eventRecord, err := NewGeneralPlatformSoftRecord(eventName, ops...)
actionParams := content if err != nil {
// ... logic to send info level event using actionParams ... logger.Error(ctx, "generate info event record failed", "error", err)
logger.Warn(ctx, "trigger info event", "message", actionParams) return err
}
recordBytes, err := json.Marshal(eventRecord)
if err != nil {
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
return err
}
mq.MsgChan <- recordBytes
logger.Info(ctx, "trigger info event", "event_name", eventName)
return nil return nil
} }
func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) error { func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) error {
eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...) eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...)
if err != nil { if err != nil {
logger.Error(ctx, "failed to create event record", "error", err) logger.Error(ctx, "generate warning event record failed", "error", err)
return err return err
} }
mq.MsgChan <- fmt.Sprintf("Generated event record: %+v", eventRecord) recordBytes, err := json.Marshal(eventRecord)
if err != nil {
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
return err
}
mq.MsgChan <- recordBytes
logger.Info(ctx, "trigger warning event", "event_name", eventName) logger.Info(ctx, "trigger warning event", "event_name", eventName)
return nil return nil
} }
func handleErrorAction(ctx context.Context, content string, ops ...EventOption) error { func handleErrorAction(ctx context.Context, eventName string, ops ...EventOption) error {
actionParams := content eventRecord, err := NewCriticalPlatformSoftRecord(eventName, ops...)
eventRecord, err := NewCriticalPlatformSoftRecord("ErrorEvent", WithCondition(map[string]any{"message": actionParams}))
if err != nil { if err != nil {
logger.Error(ctx, "failed to create event record", "error", err) logger.Error(ctx, "generate error event record failed", "error", err)
return err return err
} }
mq.MsgChan <- fmt.Sprintf("Generated event record: %+v", eventRecord) recordBytes, err := json.Marshal(eventRecord)
if err != nil {
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
return err
}
mq.MsgChan <- recordBytes
return nil return nil
} }

View File

@ -1,6 +1,11 @@
// Package event define real time data evnet operation functions // Package event define real time data evnet operation functions
package event package event
import (
"maps"
"strings"
)
// EventOption define option function type for event record creation // EventOption define option function type for event record creation
type EventOption func(*EventRecord) type EventOption func(*EventRecord)
@ -44,3 +49,37 @@ func WithResult(result map[string]any) EventOption {
e.Result = result e.Result = result
} }
} }
func WithTEAnalysisResult(breachType string) EventOption {
return func(e *EventRecord) {
if e.Result == nil {
e.Result = make(map[string]any)
}
description := "数据异常"
switch strings.ToLower(breachType) {
case "upup":
description = "超越上上限"
case "up":
description = "超越上限"
case "down":
description = "超越下限"
case "downdown":
description = "超越下下限"
}
e.Result["analysis_desc"] = description
e.Result["breach_type"] = breachType
}
}
// WithConditionValue define option function to set event condition with real time value and extra data
func WithConditionValue(realTimeValue []float64, extraData map[string]any) EventOption {
return func(e *EventRecord) {
if e.Condition == nil {
e.Condition = make(map[string]any)
}
e.Condition["real_time_value"] = realTimeValue
maps.Copy(e.Condition, extraData)
}
}

View File

@ -27,8 +27,8 @@ func init() {
globalComputeState = NewMeasComputeState() globalComputeState = NewMeasComputeState()
} }
// StartRealTimeDataComputing define func to start real time data process goroutines by measurement info // StartComputingRealTimeDataLimit define func to start compute real time data up or down limit process goroutines by measurement info
func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) { func StartComputingRealTimeDataLimit(ctx context.Context, measurements []orm.Measurement) {
for _, measurement := range measurements { for _, measurement := range measurements {
enableValue, exist := measurement.EventPlan["enable"] enableValue, exist := measurement.EventPlan["enable"]
enable, ok := enableValue.(bool) enable, ok := enableValue.(bool)
@ -57,7 +57,7 @@ func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurem
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr) enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
conf.StopGchan = make(chan struct{}) conf.StopGchan = make(chan struct{})
globalComputeState.Store(uuidStr, conf) globalComputeState.Store(uuidStr, conf)
logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID) logger.Info(ctx, "starting computing real time data limit for measurement", "measurement_uuid", measurement.ComponentUUID)
go continuousComputation(enrichedCtx, conf) go continuousComputation(enrichedCtx, conf)
} }
} }