diff --git a/common/event_action_errors.go b/common/event_action_errors.go new file mode 100644 index 0000000..3df4a39 --- /dev/null +++ b/common/event_action_errors.go @@ -0,0 +1,10 @@ +// Package common define common error variables +package common + +import "errors" + +// ErrUnknowEventActionCommand define error of unknown event action command +var ErrUnknowEventActionCommand = errors.New("unknown action command") + +// ErrExecEventActionFailed define error of execute event action failed +var ErrExecEventActionFailed = errors.New("exec event action func failed") diff --git a/constants/error.go b/common/uuid_errors.go similarity index 97% rename from constants/error.go rename to common/uuid_errors.go index da0b91e..bca1b32 100644 --- a/constants/error.go +++ b/common/uuid_errors.go @@ -1,5 +1,5 @@ -// Package constants define constant variable -package constants +// Package common define common error variables +package common import "errors" diff --git a/constants/event.go b/constants/event.go index 435cb2b..e4ea62e 100644 --- a/constants/event.go +++ b/constants/event.go @@ -73,11 +73,20 @@ const ( const ( // EventUpDownRoutingKey define routing key for up or down limit event alarm message - EventUpDownRoutingKey = "event-up-down-routing-key" + EventUpDownRoutingKey = "event.#" // EventUpDownDeadRoutingKey define dead letter routing key for up or down limit event alarm message - EventUpDownDeadRoutingKey = "event-up-down-dead-letter-routing-key" + EventUpDownDeadRoutingKey = "event.#" // EventUpDownQueueName define queue name for up or down limit event alarm message EventUpDownQueueName = "event-up-down-queue" // EventUpDownDeadQueueName define dead letter queue name for event alarm message EventUpDownDeadQueueName = "event-dead-letter-queue" ) + +const ( + // EventGeneralUpDownLimitCategroy define category for general up and down limit event + EventGeneralUpDownLimitCategroy = "event.general.updown.limit" + // EventWarnUpDownLimitCategroy define category for warn up and down limit event + EventWarnUpDownLimitCategroy = "event.warn.updown.limit" + // EventCriticalUpDownLimitCategroy define category for critical up and down limit event + EventCriticalUpDownLimitCategroy = "event.critical.updown.limit" +) diff --git a/handler/attr_delete.go b/handler/attr_delete.go index b78e46c..fedb49a 100644 --- a/handler/attr_delete.go +++ b/handler/attr_delete.go @@ -3,7 +3,7 @@ package handler import ( "net/http" - "modelRT/constants" + "modelRT/common" "modelRT/diagram" "modelRT/logger" "modelRT/network" @@ -16,7 +16,7 @@ func AttrDeleteHandler(c *gin.Context) { var request network.AttrDeleteRequest clientToken := c.GetString("client_token") if clientToken == "" { - err := constants.ErrGetClientToken + err := common.ErrGetClientToken logger.Error(c, "failed to get client token from context", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ diff --git a/handler/attr_load.go b/handler/attr_load.go index 3a50be6..0c07744 100644 --- a/handler/attr_load.go +++ b/handler/attr_load.go @@ -3,7 +3,7 @@ package handler import ( "net/http" - "modelRT/constants" + "modelRT/common" "modelRT/database" "modelRT/logger" "modelRT/network" @@ -17,7 +17,7 @@ func AttrGetHandler(c *gin.Context) { clientToken := c.GetString("client_token") if clientToken == "" { - err := constants.ErrGetClientToken + err := common.ErrGetClientToken logger.Error(c, "failed to get client token from context", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ diff --git a/handler/attr_update.go b/handler/attr_update.go index 589164e..8a57599 100644 --- a/handler/attr_update.go +++ b/handler/attr_update.go @@ -3,7 +3,7 @@ package handler import ( "net/http" - "modelRT/constants" + "modelRT/common" "modelRT/diagram" "modelRT/logger" "modelRT/network" @@ -17,7 +17,7 @@ func AttrSetHandler(c *gin.Context) { clientToken := c.GetString("client_token") if clientToken == "" { - err := constants.ErrGetClientToken + err := common.ErrGetClientToken logger.Error(c, "failed to get client token from context", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ diff --git a/handler/diagram_node_link.go b/handler/diagram_node_link.go index 80850e7..8f484a5 100644 --- a/handler/diagram_node_link.go +++ b/handler/diagram_node_link.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" + "modelRT/common" "modelRT/constants" "modelRT/database" "modelRT/diagram" @@ -43,7 +44,7 @@ func DiagramNodeLinkHandler(c *gin.Context) { var request network.DiagramNodeLinkRequest clientToken := c.GetString("client_token") if clientToken == "" { - err := constants.ErrGetClientToken + err := common.ErrGetClientToken logger.Error(c, "failed to get client token from context", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, @@ -167,7 +168,7 @@ func processLinkSetData(ctx context.Context, action string, level int, prevLinkS err2 = prevLinkSet.SREM(prevMember) } default: - err := constants.ErrUnsupportedLinkAction + err := common.ErrUnsupportedLinkAction logger.Error(ctx, "unsupport diagram node link process action", "action", action, "error", err) return err } diff --git a/handler/measurement_load.go b/handler/measurement_load.go index 065b9f5..26dbbed 100644 --- a/handler/measurement_load.go +++ b/handler/measurement_load.go @@ -4,7 +4,7 @@ package handler import ( "net/http" - "modelRT/constants" + "modelRT/common" "modelRT/database" "modelRT/diagram" "modelRT/logger" @@ -19,7 +19,7 @@ func MeasurementGetHandler(c *gin.Context) { clientToken := c.GetString("client_token") if clientToken == "" { - err := constants.ErrGetClientToken + err := common.ErrGetClientToken logger.Error(c, "failed to get client token from context", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ diff --git a/handler/mesurement_link.go b/handler/mesurement_link.go index b45e8bb..1a878f6 100644 --- a/handler/mesurement_link.go +++ b/handler/mesurement_link.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" + "modelRT/common" "modelRT/constants" "modelRT/database" "modelRT/diagram" @@ -20,7 +21,7 @@ func MeasurementLinkHandler(c *gin.Context) { var request network.MeasurementLinkRequest clientToken := c.GetString("client_token") if clientToken == "" { - err := constants.ErrGetClientToken + err := common.ErrGetClientToken logger.Error(c, "failed to get client token from context", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, @@ -93,7 +94,7 @@ func MeasurementLinkHandler(c *gin.Context) { logger.Error(c, "del measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err) } default: - err = constants.ErrUnsupportedLinkAction + err = common.ErrUnsupportedLinkAction logger.Error(c, "unsupport measurement link process action", "measurement_id", measurementID, "action", action, "error", err) } diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index 3f171cf..340f48b 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -7,6 +7,7 @@ import ( "maps" "sync" + "modelRT/common" "modelRT/constants" "modelRT/database" "modelRT/logger" @@ -177,7 +178,7 @@ func RealTimeSubHandler(c *gin.Context) { }) return default: - err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action) + err := fmt.Errorf("%w: request action is %s", common.ErrUnsupportedSubAction, request.Action) logger.Error(c, "unsupported action of real time data subscription request", "error", err) requestTargetsCount := processRealTimeRequestCount(request.Measurements) results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, constants.CodeUnsupportSubOperation, err) diff --git a/model/measurement_protol_model.go b/model/measurement_protol_model.go index 9a5446b..7fdce12 100644 --- a/model/measurement_protol_model.go +++ b/model/measurement_protol_model.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + "modelRT/common" "modelRT/constants" ) @@ -61,7 +62,7 @@ func generateChannelName(prefix string, number int, suffix string) (string, erro switch prefix { case constants.ChannelPrefixTelemetry: if number > 10 { - return "", constants.ErrExceedsLimitType + return "", common.ErrExceedsLimitType } var builder strings.Builder numberStr := strconv.Itoa(number) @@ -86,7 +87,7 @@ func generateChannelName(prefix string, number int, suffix string) (string, erro channelName := builder.String() return channelName, nil default: - return "", constants.ErrUnsupportedChannelPrefixType + return "", common.ErrUnsupportedChannelPrefixType } } @@ -164,14 +165,14 @@ func (m MeasurementDataSource) GetIOAddress() (IOAddress, error) { if addr, ok := m.IOAddress.(CL3611Address); ok { return addr, nil } - return nil, constants.ErrInvalidAddressType + return nil, common.ErrInvalidAddressType case constants.DataSourceTypePower104: if addr, ok := m.IOAddress.(Power104Address); ok { return addr, nil } - return nil, constants.ErrInvalidAddressType + return nil, common.ErrInvalidAddressType default: - return nil, constants.ErrUnknownDataType + return nil, common.ErrUnknownDataType } } diff --git a/real-time-data/event/event.go b/mq/event/event.go similarity index 100% rename from real-time-data/event/event.go rename to mq/event/event.go diff --git a/real-time-data/event/event_handlers.go b/mq/event/event_handlers.go similarity index 64% rename from real-time-data/event/event_handlers.go rename to mq/event/event_handlers.go index 841b06e..e37e195 100644 --- a/real-time-data/event/event_handlers.go +++ b/mq/event/event_handlers.go @@ -3,13 +3,12 @@ package event import ( "context" - "encoding/json" + "modelRT/common" "modelRT/logger" - "modelRT/mq" ) -type actionHandler func(ctx context.Context, content string, ops ...EventOption) error +type actionHandler func(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error) // actionDispatchMap define variable to store all action handler into map var actionDispatchMap = map[string]actionHandler{ @@ -21,79 +20,63 @@ var actionDispatchMap = map[string]actionHandler{ } // TriggerEventAction define func to trigger event by action in compute config -func TriggerEventAction(ctx context.Context, command string, eventName string, ops ...EventOption) { +func TriggerEventAction(ctx context.Context, command string, eventName string, ops ...EventOption) (*EventRecord, error) { handler, exists := actionDispatchMap[command] if !exists { logger.Error(ctx, "unknown action command", "command", command) - return + return nil, common.ErrUnknowEventActionCommand } - err := handler(ctx, eventName, ops...) + + eventRecord, err := handler(ctx, eventName, ops...) if err != nil { - logger.Error(ctx, "action handler failed", "command", command, "event_name", eventName, "error", err) - return + logger.Error(ctx, "action event handler failed", "error", err) + return nil, common.ErrExecEventActionFailed } - logger.Info(ctx, "action handler success", "command", command, "event_name", eventName) + return eventRecord, nil } -func handleInfoAction(ctx context.Context, eventName string, ops ...EventOption) error { +func handleInfoAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) { + logger.Info(ctx, "trigger info event", "event_name", eventName) eventRecord, err := NewGeneralPlatformSoftRecord(eventName, ops...) if err != nil { logger.Error(ctx, "generate info event record failed", "error", err) - return err + return nil, err } - recordBytes, err := json.Marshal(eventRecord) - if err != nil { - logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err) - return err - } - mq.MsgChan <- recordBytes - logger.Info(ctx, "trigger info event", "event_name", eventName) - return nil + return eventRecord, nil } -func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) error { +func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) { + logger.Info(ctx, "trigger warning event", "event_name", eventName) eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...) if err != nil { logger.Error(ctx, "generate warning event record failed", "error", err) - return err + return nil, err } - recordBytes, err := json.Marshal(eventRecord) - if err != nil { - logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err) - return err - } - mq.MsgChan <- recordBytes - logger.Info(ctx, "trigger warning event", "event_name", eventName) - return nil + return eventRecord, nil } -func handleErrorAction(ctx context.Context, eventName string, ops ...EventOption) error { +func handleErrorAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) { + logger.Info(ctx, "trigger error event", "event_name", eventName) eventRecord, err := NewCriticalPlatformSoftRecord(eventName, ops...) if err != nil { logger.Error(ctx, "generate error event record failed", "error", err) - return err + return nil, err } - recordBytes, err := json.Marshal(eventRecord) - if err != nil { - logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err) - return err - } - mq.MsgChan <- recordBytes - return nil + return eventRecord, nil } -func handleCriticalAction(ctx context.Context, content string, ops ...EventOption) error { +func handleCriticalAction(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error) { // 实际执行发送警告、记录日志等操作 actionParams := content // ... logic to send critical level event using actionParams ... logger.Warn(ctx, "trigger critical event", "message", actionParams) - return nil + return nil, nil } -func handleExceptionAction(ctx context.Context, content string, ops ...EventOption) error { +func handleExceptionAction(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error) { // 实际执行发送警告、记录日志等操作 actionParams := content // ... logic to send except level event using actionParams ... logger.Warn(ctx, "trigger except event", "message", actionParams) - return nil + return nil, nil } diff --git a/real-time-data/event/event_options.go b/mq/event/event_options.go similarity index 100% rename from real-time-data/event/event_options.go rename to mq/event/event_options.go diff --git a/real-time-data/event/gen_event.go b/mq/event/gen_event.go similarity index 100% rename from real-time-data/event/gen_event.go rename to mq/event/gen_event.go diff --git a/mq/publish_up_down_limit_event.go b/mq/publish_up_down_limit_event.go index e1f0bd1..9d4bbbf 100644 --- a/mq/publish_up_down_limit_event.go +++ b/mq/publish_up_down_limit_event.go @@ -3,19 +3,21 @@ package mq import ( "context" + "encoding/json" "time" "modelRT/constants" "modelRT/logger" + "modelRT/mq/event" amqp "github.com/rabbitmq/amqp091-go" ) // MsgChan define variable of channel to store messages that need to be sent to rabbitMQ -var MsgChan chan []byte +var MsgChan chan *event.EventRecord func init() { - MsgChan = make(chan []byte, 10000) + MsgChan = make(chan *event.EventRecord, 10000) } func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) { @@ -25,30 +27,34 @@ func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) { channel, err = GetConn().Channel() if err != nil { logger.Error(ctx, "open rabbitMQ server channel failed", "error", err) + return nil, err } err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare event dead letter exchange failed", "error", err) + return nil, err } _, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare event dead letter queue failed", "error", err) + return nil, err } - err = channel.QueueBind(constants.EventUpDownDeadQueueName, constants.EventUpDownDeadRoutingKey, constants.EventDeadExchangeName, false, nil) + err = channel.QueueBind(constants.EventUpDownDeadQueueName, "#", constants.EventDeadExchangeName, false, nil) if err != nil { logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err) + return nil, err } err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare event exchange failed", "error", err) + return nil, err } args := amqp.Table{ - // messages that accumulate to the maximum number will be automatically transferred to the dead letter queue "x-max-length": int32(50), "x-dead-letter-exchange": constants.EventDeadExchangeName, "x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey, @@ -56,21 +62,24 @@ func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) { _, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args) if err != nil { logger.Error(ctx, "declare event queue failed", "error", err) + return nil, err } err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil) if err != nil { - logger.Error(ctx, "bind event queue with routing key and exchange failed:", "error", err) + logger.Error(ctx, "bind event queue with routing key and exchange failed", "error", err) + return nil, err } if err := channel.Confirm(false); err != nil { logger.Error(ctx, "channel could not be put into confirm mode", "error", err) + return nil, err } return channel, nil } // PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ -func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan []byte) { +func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.EventRecord) { channel, err := initUpDownLimitEventChannel(ctx) if err != nil { logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err) @@ -101,28 +110,36 @@ func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan []byte) { logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel") channel.Close() return - case msg, ok := <-msgChan: + case eventRecord, ok := <-msgChan: if !ok { logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop") channel.Close() return } + // TODO 将消息的序列化移动到发送之前,以便使用eventRecord的category来作为routing key + recordBytes, err := json.Marshal(eventRecord) + if err != nil { + logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err) + continue + } + // send event alarm message to rabbitMQ queue + routingKey := eventRecord.Category pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) err = channel.PublishWithContext(pubCtx, - constants.EventExchangeName, // exchange - constants.EventUpDownRoutingKey, // routing key - false, // mandatory - false, // immediate + constants.EventExchangeName, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "text/plain", - Body: msg, + Body: recordBytes, }) cancel() if err != nil { - logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", msg, "error", err) + logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", recordBytes, "error", err) } } } diff --git a/network/circuit_diagram_update_request.go b/network/circuit_diagram_update_request.go index 06966cd..86b1a76 100644 --- a/network/circuit_diagram_update_request.go +++ b/network/circuit_diagram_update_request.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "modelRT/common" "modelRT/common/errcode" "modelRT/constants" "modelRT/orm" @@ -64,10 +65,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) { switch info.ChangeType { case constants.UUIDFromChangeType: if info.NewUUIDFrom == info.OldUUIDFrom { - return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT1) + return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT1) } if info.NewUUIDTo != info.OldUUIDTo { - return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT1) + return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT1) } oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom) @@ -90,10 +91,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) { UUIDChangeInfo.NewUUIDTo = OldUUIDTo case constants.UUIDToChangeType: if info.NewUUIDFrom != info.OldUUIDFrom { - return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT2) + return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT2) } if info.NewUUIDTo == info.OldUUIDTo { - return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT2) + return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT2) } oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom) @@ -116,10 +117,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) { UUIDChangeInfo.NewUUIDTo = newUUIDTo case constants.UUIDAddChangeType: if info.OldUUIDFrom != "" { - return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT3) + return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT3) } if info.OldUUIDTo != "" { - return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT3) + return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT3) } newUUIDFrom, err := uuid.FromString(info.NewUUIDFrom) diff --git a/real-time-data/compute_analyzer.go b/real-time-data/compute_analyzer.go index e1c5166..d65a343 100644 --- a/real-time-data/compute_analyzer.go +++ b/real-time-data/compute_analyzer.go @@ -9,7 +9,8 @@ import ( "modelRT/constants" "modelRT/logger" - "modelRT/real-time-data/event" + "modelRT/mq" + "modelRT/mq/event" ) // RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering @@ -131,7 +132,7 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE opts := []event.EventOption{ event.WithConditionValue(triggerValues, conf.Cause), event.WithTEAnalysisResult(firstBreachType), - event.WithCategory(constants.EventUpDownRoutingKey), + event.WithCategory(constants.EventWarnUpDownLimitCategroy), // TODO 生成 operations并考虑如何放入 event 中 // event.WithOperations(nil) } @@ -149,8 +150,12 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE // trigger Action command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action) eventName := fmt.Sprintf("telemetry_%s_%s_Breach_Event", mainBody, breachType) - event.TriggerEventAction(ctx, command, eventName, trigger.eventOpts...) - + eventRecord, err := event.TriggerEventAction(ctx, command, eventName, trigger.eventOpts...) + if err != nil { + logger.Error(ctx, "trigger event action failed", "error", err) + return + } + mq.MsgChan <- eventRecord } } @@ -320,7 +325,12 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "main_body", mainBody) return } - event.TriggerEventAction(ctx, command, mainBody) + eventRecord, err := event.TriggerEventAction(ctx, command, mainBody) + if err != nil { + logger.Error(ctx, "trigger event action failed", "error", err) + return + } + mq.MsgChan <- eventRecord return } }