From 2ececc38d90ca0ad66629e88bf78f17b5324a585 Mon Sep 17 00:00:00 2001 From: douxu Date: Wed, 25 Feb 2026 17:14:25 +0800 Subject: [PATCH] optimzie code organization structure of rabbitmq event --- alert/event_options.go | 39 ---- handler/alert_event_query.go | 2 +- handler/history_data_query.go | 2 +- main.go | 4 +- mq/publish_event.go | 7 + pool/concurrency_anchor_parse.go | 2 +- {alert => real-time-data/alert}/init.go | 0 real-time-data/compute_analyzer.go | 13 +- {alert => real-time-data/event}/event.go | 4 +- real-time-data/event/event_handlers.go | 38 ++-- real-time-data/event/event_options.go | 46 +++++ {alert => real-time-data/event}/gen_event.go | 4 +- real-time-data/real_time_data_computing.go | 178 ------------------- 13 files changed, 96 insertions(+), 243 deletions(-) delete mode 100644 alert/event_options.go rename {alert => real-time-data/alert}/init.go (100%) rename {alert => real-time-data/event}/event.go (94%) create mode 100644 real-time-data/event/event_options.go rename {alert => real-time-data/event}/gen_event.go (97%) diff --git a/alert/event_options.go b/alert/event_options.go deleted file mode 100644 index e114222..0000000 --- a/alert/event_options.go +++ /dev/null @@ -1,39 +0,0 @@ -// Package alert define alert event struct of modelRT project -package alert - -// EventOption 定义选项函数的类型 -type EventOption func(*EventRecord) - -// WithCondition 设置事件场景描述 -func WithCondition(cond map[string]any) EventOption { - return func(e *EventRecord) { - if cond != nil { - e.Condition = cond - } - } -} - -// WithSubscriptions 设置订阅信息 -func WithSubscriptions(subs []any) EventOption { - return func(e *EventRecord) { - if subs != nil { - e.AttachedSubscriptions = subs - } - } -} - -// WithOperations 设置操作记录 -func WithOperations(ops []OperationRecord) EventOption { - return func(e *EventRecord) { - if ops != nil { - e.Operations = ops - } - } -} - -// WithCategory 设置可选分类 -func WithCategory(cat string) EventOption { - return func(e *EventRecord) { - e.Category = cat - } -} diff --git a/handler/alert_event_query.go b/handler/alert_event_query.go index 58e1613..b49d954 100644 --- a/handler/alert_event_query.go +++ b/handler/alert_event_query.go @@ -5,10 +5,10 @@ import ( "net/http" "strconv" - "modelRT/alert" "modelRT/constants" "modelRT/logger" "modelRT/network" + "modelRT/real-time-data/alert" "github.com/gin-gonic/gin" ) diff --git a/handler/history_data_query.go b/handler/history_data_query.go index 294f4b9..d8718e1 100644 --- a/handler/history_data_query.go +++ b/handler/history_data_query.go @@ -6,10 +6,10 @@ import ( "net/http" "strconv" - "modelRT/alert" "modelRT/constants" "modelRT/logger" "modelRT/network" + "modelRT/real-time-data/alert" "github.com/gin-gonic/gin" ) diff --git a/main.go b/main.go index 8cc8a76..2c1c4b8 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,6 @@ import ( "path/filepath" "syscall" - "modelRT/alert" "modelRT/config" "modelRT/constants" "modelRT/database" @@ -22,6 +21,7 @@ import ( "modelRT/model" "modelRT/mq" "modelRT/pool" + "modelRT/real-time-data/alert" "modelRT/router" "modelRT/util" @@ -152,6 +152,8 @@ func main() { // init rabbitmq connection mq.InitRabbitProxy(ctx, modelRTConfig.RabbitMQConfig) + // async push event to rabbitMQ + go mq.PushEventToRabbitMQ(ctx, mq.MsgChan) postgresDBClient.Transaction(func(tx *gorm.DB) error { // load circuit diagram from postgres diff --git a/mq/publish_event.go b/mq/publish_event.go index 2ea438a..9270e93 100644 --- a/mq/publish_event.go +++ b/mq/publish_event.go @@ -10,6 +10,13 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) +// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ +var MsgChan chan string + +func init() { + MsgChan = make(chan string, 10000) +} + const ( routingKey = "event-alarm-routing-key" exchangeName = "event-alarm-exchange" diff --git a/pool/concurrency_anchor_parse.go b/pool/concurrency_anchor_parse.go index 443f1a7..85fa225 100644 --- a/pool/concurrency_anchor_parse.go +++ b/pool/concurrency_anchor_parse.go @@ -5,11 +5,11 @@ import ( "fmt" "time" - "modelRT/alert" "modelRT/config" "modelRT/constants" "modelRT/diagram" "modelRT/logger" + "modelRT/real-time-data/alert" "github.com/panjf2000/ants/v2" ) diff --git a/alert/init.go b/real-time-data/alert/init.go similarity index 100% rename from alert/init.go rename to real-time-data/alert/init.go diff --git a/real-time-data/compute_analyzer.go b/real-time-data/compute_analyzer.go index 1525734..62a7ecc 100644 --- a/real-time-data/compute_analyzer.go +++ b/real-time-data/compute_analyzer.go @@ -124,9 +124,15 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE } } } - + // TODO 记录触发trigger的type即firstValueBreachType,并考虑如何融合更多错误信息 if eventTriggered { command, content := genTEEventCommandAndContent(ctx, conf.Action) + // TODO 生成 condition 并考虑如何放入 event 中 + event.WithCondition(conf.Cause) + // TODO 生成 result 并考虑如何放入 event 中 + event.WithResult(map[string]any{"real_time_values": realTimeValues}) + // TODO 生成 operations并考虑如何放入 event 中 + event.WithOperations(nil) // TODO 考虑 content 是否可以为空,先期不允许 if command == "" || content == "" { logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content) @@ -211,11 +217,12 @@ func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) { // getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string { - if t.edge == constants.TelesignalRaising { + switch t.edge { + case constants.TelesignalRaising: if previousValue == 0.0 && currentValue == 1.0 { return constants.TIBreachTriggerType } - } else if t.edge == constants.TelesignalFalling { + case constants.TelesignalFalling: if previousValue == 1.0 && currentValue == 0.0 { return constants.TIBreachTriggerType } diff --git a/alert/event.go b/real-time-data/event/event.go similarity index 94% rename from alert/event.go rename to real-time-data/event/event.go index ca2ea81..6740251 100644 --- a/alert/event.go +++ b/real-time-data/event/event.go @@ -1,5 +1,5 @@ -// Package alert define alert event struct of modelRT project -package alert +// Package event define real time data evnet operation functions +package event // EventRecord define struct for CIM event record type EventRecord struct { diff --git a/real-time-data/event/event_handlers.go b/real-time-data/event/event_handlers.go index a161a7d..8370c0c 100644 --- a/real-time-data/event/event_handlers.go +++ b/real-time-data/event/event_handlers.go @@ -3,11 +3,13 @@ package event import ( "context" + "fmt" "modelRT/logger" + "modelRT/mq" ) -type actionHandler func(ctx context.Context, content string) error +type actionHandler func(ctx context.Context, content string, ops ...EventOption) error // actionDispatchMap define variable to store all action handler into map var actionDispatchMap = map[string]actionHandler{ @@ -19,13 +21,13 @@ var actionDispatchMap = map[string]actionHandler{ } // TriggerEventAction define func to trigger event by action in compute config -func TriggerEventAction(ctx context.Context, command string, content string) { +func TriggerEventAction(ctx context.Context, command string, content string, ops ...EventOption) { handler, exists := actionDispatchMap[command] if !exists { logger.Error(ctx, "unknown action command", "command", command) return } - err := handler(ctx, content) + err := handler(ctx, content, ops...) if err != nil { logger.Error(ctx, "action handler failed", "command", command, "content", content, "error", err) return @@ -33,7 +35,7 @@ func TriggerEventAction(ctx context.Context, command string, content string) { logger.Info(ctx, "action handler success", "command", command, "content", content) } -func handleInfoAction(ctx context.Context, content string) error { +func handleInfoAction(ctx context.Context, content string, ops ...EventOption) error { // 实际执行发送警告、记录日志等操作 actionParams := content // ... logic to send info level event using actionParams ... @@ -41,23 +43,29 @@ func handleInfoAction(ctx context.Context, content string) error { return nil } -func handleWarningAction(ctx context.Context, content string) error { - // 实际执行发送警告、记录日志等操作 - actionParams := content - // ... logic to send warning level event using actionParams ... - logger.Warn(ctx, "trigger warning event", "message", actionParams) +func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) error { + eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...) + if err != nil { + logger.Error(ctx, "failed to create event record", "error", err) + return err + } + mq.MsgChan <- fmt.Sprintf("Generated event record: %+v", eventRecord) + logger.Info(ctx, "trigger warning event", "event_name", eventName) return nil } -func handleErrorAction(ctx context.Context, content string) error { - // 实际执行发送警告、记录日志等操作 +func handleErrorAction(ctx context.Context, content string, ops ...EventOption) error { actionParams := content - // ... logic to send error level event using actionParams ... - logger.Warn(ctx, "trigger error event", "message", actionParams) + eventRecord, err := NewCriticalPlatformSoftRecord("ErrorEvent", WithCondition(map[string]any{"message": actionParams})) + if err != nil { + logger.Error(ctx, "failed to create event record", "error", err) + return err + } + mq.MsgChan <- fmt.Sprintf("Generated event record: %+v", eventRecord) return nil } -func handleCriticalAction(ctx context.Context, content string) error { +func handleCriticalAction(ctx context.Context, content string, ops ...EventOption) error { // 实际执行发送警告、记录日志等操作 actionParams := content // ... logic to send critical level event using actionParams ... @@ -65,7 +73,7 @@ func handleCriticalAction(ctx context.Context, content string) error { return nil } -func handleExceptionAction(ctx context.Context, content string) error { +func handleExceptionAction(ctx context.Context, content string, ops ...EventOption) error { // 实际执行发送警告、记录日志等操作 actionParams := content // ... logic to send except level event using actionParams ... diff --git a/real-time-data/event/event_options.go b/real-time-data/event/event_options.go new file mode 100644 index 0000000..fda0c59 --- /dev/null +++ b/real-time-data/event/event_options.go @@ -0,0 +1,46 @@ +// Package event define real time data evnet operation functions +package event + +// EventOption define option function type for event record creation +type EventOption func(*EventRecord) + +// WithCondition define option function to set event condition description +func WithCondition(cond map[string]any) EventOption { + return func(e *EventRecord) { + if cond != nil { + e.Condition = cond + } + } +} + +// WithSubscriptions define option function to set event attached subscription information +func WithSubscriptions(subs []any) EventOption { + return func(e *EventRecord) { + if subs != nil { + e.AttachedSubscriptions = subs + } + } +} + +// WithOperations define option function to set event operation records +func WithOperations(ops []OperationRecord) EventOption { + return func(e *EventRecord) { + if ops != nil { + e.Operations = ops + } + } +} + +// WithCategory define option function to set event category +func WithCategory(cat string) EventOption { + return func(e *EventRecord) { + e.Category = cat + } +} + +// WithResult define option function to set event analysis result +func WithResult(result map[string]any) EventOption { + return func(e *EventRecord) { + e.Result = result + } +} diff --git a/alert/gen_event.go b/real-time-data/event/gen_event.go similarity index 97% rename from alert/gen_event.go rename to real-time-data/event/gen_event.go index d8b467e..7b4b094 100644 --- a/alert/gen_event.go +++ b/real-time-data/event/gen_event.go @@ -1,5 +1,5 @@ -// Package alert define alert event struct of modelRT project -package alert +// Package event define real time data evnet operation functions +package event import ( "fmt" diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go index 8b4fe45..b41446d 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_computing.go @@ -227,181 +227,3 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) { } } } - -// // ReceiveChan define func to real time data receive and process -// func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) { -// consumer, err := kafka.NewConsumer(consumerConfig) -// if err != nil { -// logger.Error(ctx, "create kafka consumer failed", "error", err) -// return -// } -// defer consumer.Close() - -// err = consumer.SubscribeTopics(topics, nil) -// if err != nil { -// logger.Error(ctx, "subscribe kafka topics failed", "topic", topics, "error", err) -// return -// } - -// batchSize := 100 -// batchTimeout := util.SecondsToDuration(duration) -// messages := make([]*kafka.Message, 0, batchSize) -// lastCommit := time.Now() -// logger.Info(ctx, "start consuming from kafka", "topic", topics) -// for { -// select { -// case <-ctx.Done(): -// logger.Info(ctx, "stop real time data computing by context cancel") -// return -// case realTimeData := <-RealTimeDataChan: -// componentUUID := realTimeData.PayLoad.ComponentUUID -// component, err := diagram.GetComponentMap(componentUUID) -// if err != nil { -// logger.Error(ctx, "query component info from diagram map by componet id failed", "component_uuid", componentUUID, "error", err) -// continue -// } - -// componentType := component.Type -// if componentType != constants.DemoType { -// logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID) -// continue -// } - -// var anchorName string -// var compareValUpperLimit, compareValLowerLimit float64 -// var anchorRealTimeData []float64 -// var calculateFunc func(archorValue float64, args ...float64) float64 - -// // calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData) - -// for _, param := range realTimeData.PayLoad.Values { -// anchorRealTimeData = append(anchorRealTimeData, param.Value) -// } - -// anchorConfig := config.AnchorParamConfig{ -// AnchorParamBaseConfig: config.AnchorParamBaseConfig{ -// ComponentUUID: componentUUID, -// AnchorName: anchorName, -// CompareValUpperLimit: compareValUpperLimit, -// CompareValLowerLimit: compareValLowerLimit, -// AnchorRealTimeData: anchorRealTimeData, -// }, -// CalculateFunc: calculateFunc, -// CalculateParams: []float64{}, -// } -// anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID) -// if err != nil { -// logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err) -// continue -// } -// anchorChan <- anchorConfig -// default: -// msg, err := consumer.ReadMessage(batchTimeout) -// if err != nil { -// if err.(kafka.Error).Code() == kafka.ErrTimedOut { -// // process accumulated messages when timeout -// if len(messages) > 0 { -// processMessageBatch(ctx, messages) -// consumer.Commit() -// messages = messages[:0] -// } -// continue -// } -// logger.Error(ctx, "read message from kafka failed", "error", err, "msg", msg) -// continue -// } - -// messages = append(messages, msg) -// // process messages when batch size or timeout period is reached -// if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout { -// processMessageBatch(ctx, messages) -// consumer.Commit() -// messages = messages[:0] -// lastCommit = time.Now() -// } -// } -// } -// } - -// type realTimeDataPayload struct { -// ComponentUUID string -// Values []float64 -// } - -// type realTimeData struct { -// Payload realTimeDataPayload -// } - -// func parseKafkaMessage(msgValue []byte) (*realTimeData, error) { -// var realTimeData realTimeData -// err := json.Unmarshal(msgValue, &realTimeData) -// if err != nil { -// return nil, fmt.Errorf("unmarshal real time data failed: %w", err) -// } -// return &realTimeData, nil -// } - -// func processRealTimeData(ctx context.Context, realTimeData *realTimeData) { -// componentUUID := realTimeData.Payload.ComponentUUID -// component, err := diagram.GetComponentMap(componentUUID) -// if err != nil { -// logger.Error(ctx, "query component info from diagram map by component id failed", -// "component_uuid", componentUUID, "error", err) -// return -// } - -// componentType := component.Type -// if componentType != constants.DemoType { -// logger.Error(ctx, "can not process real time data of component type not equal DemoType", -// "component_uuid", componentUUID) -// return -// } - -// var anchorName string -// var compareValUpperLimit, compareValLowerLimit float64 -// var anchorRealTimeData []float64 -// var calculateFunc func(archorValue float64, args ...float64) float64 - -// for _, param := range realTimeData.Payload.Values { -// anchorRealTimeData = append(anchorRealTimeData, param) -// } - -// anchorConfig := config.AnchorParamConfig{ -// AnchorParamBaseConfig: config.AnchorParamBaseConfig{ -// ComponentUUID: componentUUID, -// AnchorName: anchorName, -// CompareValUpperLimit: compareValUpperLimit, -// CompareValLowerLimit: compareValLowerLimit, -// AnchorRealTimeData: anchorRealTimeData, -// }, -// CalculateFunc: calculateFunc, -// CalculateParams: []float64{}, -// } - -// anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID) -// if err != nil { -// logger.Error(ctx, "get anchor param chan failed", -// "component_uuid", componentUUID, "error", err) -// return -// } - -// select { -// case anchorChan <- anchorConfig: -// case <-ctx.Done(): -// logger.Info(ctx, "context done while sending to anchor chan") -// case <-time.After(5 * time.Second): -// logger.Error(ctx, "timeout sending to anchor chan", "component_uuid", componentUUID) -// } -// } - -// // processMessageBatch define func to bathc process kafka message -// func processMessageBatch(ctx context.Context, messages []*kafka.Message) { -// for _, msg := range messages { -// realTimeData, err := parseKafkaMessage(msg.Value) -// if err != nil { -// logger.Error(ctx, "parse kafka message failed", "error", err, "msg", msg) -// continue -// } -// go processRealTimeData(ctx, realTimeData) -// } -// }