optimize struct of rabbitmq event
This commit is contained in:
parent
4b52e5f3c6
commit
898beaeec4
|
|
@ -0,0 +1,10 @@
|
||||||
|
// Package common define common error variables
|
||||||
|
package common
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
// ErrUnknowEventActionCommand define error of unknown event action command
|
||||||
|
var ErrUnknowEventActionCommand = errors.New("unknown action command")
|
||||||
|
|
||||||
|
// ErrExecEventActionFailed define error of execute event action failed
|
||||||
|
var ErrExecEventActionFailed = errors.New("exec event action func failed")
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
// Package constants define constant variable
|
// Package common define common error variables
|
||||||
package constants
|
package common
|
||||||
|
|
||||||
import "errors"
|
import "errors"
|
||||||
|
|
||||||
|
|
@ -73,11 +73,20 @@ const (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// EventUpDownRoutingKey define routing key for up or down limit event alarm message
|
// EventUpDownRoutingKey define routing key for up or down limit event alarm message
|
||||||
EventUpDownRoutingKey = "event-up-down-routing-key"
|
EventUpDownRoutingKey = "event.#"
|
||||||
// EventUpDownDeadRoutingKey define dead letter routing key for up or down limit event alarm message
|
// EventUpDownDeadRoutingKey define dead letter routing key for up or down limit event alarm message
|
||||||
EventUpDownDeadRoutingKey = "event-up-down-dead-letter-routing-key"
|
EventUpDownDeadRoutingKey = "event.#"
|
||||||
// EventUpDownQueueName define queue name for up or down limit event alarm message
|
// EventUpDownQueueName define queue name for up or down limit event alarm message
|
||||||
EventUpDownQueueName = "event-up-down-queue"
|
EventUpDownQueueName = "event-up-down-queue"
|
||||||
// EventUpDownDeadQueueName define dead letter queue name for event alarm message
|
// EventUpDownDeadQueueName define dead letter queue name for event alarm message
|
||||||
EventUpDownDeadQueueName = "event-dead-letter-queue"
|
EventUpDownDeadQueueName = "event-dead-letter-queue"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// EventGeneralUpDownLimitCategroy define category for general up and down limit event
|
||||||
|
EventGeneralUpDownLimitCategroy = "event.general.updown.limit"
|
||||||
|
// EventWarnUpDownLimitCategroy define category for warn up and down limit event
|
||||||
|
EventWarnUpDownLimitCategroy = "event.warn.updown.limit"
|
||||||
|
// EventCriticalUpDownLimitCategroy define category for critical up and down limit event
|
||||||
|
EventCriticalUpDownLimitCategroy = "event.critical.updown.limit"
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/constants"
|
"modelRT/common"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
|
|
@ -16,7 +16,7 @@ func AttrDeleteHandler(c *gin.Context) {
|
||||||
var request network.AttrDeleteRequest
|
var request network.AttrDeleteRequest
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := constants.ErrGetClientToken
|
err := common.ErrGetClientToken
|
||||||
|
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/constants"
|
"modelRT/common"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
|
|
@ -17,7 +17,7 @@ func AttrGetHandler(c *gin.Context) {
|
||||||
|
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := constants.ErrGetClientToken
|
err := common.ErrGetClientToken
|
||||||
|
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/constants"
|
"modelRT/common"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/network"
|
"modelRT/network"
|
||||||
|
|
@ -17,7 +17,7 @@ func AttrSetHandler(c *gin.Context) {
|
||||||
|
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := constants.ErrGetClientToken
|
err := common.ErrGetClientToken
|
||||||
|
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"modelRT/common"
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
|
|
@ -43,7 +44,7 @@ func DiagramNodeLinkHandler(c *gin.Context) {
|
||||||
var request network.DiagramNodeLinkRequest
|
var request network.DiagramNodeLinkRequest
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := constants.ErrGetClientToken
|
err := common.ErrGetClientToken
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
|
|
@ -167,7 +168,7 @@ func processLinkSetData(ctx context.Context, action string, level int, prevLinkS
|
||||||
err2 = prevLinkSet.SREM(prevMember)
|
err2 = prevLinkSet.SREM(prevMember)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
err := constants.ErrUnsupportedLinkAction
|
err := common.ErrUnsupportedLinkAction
|
||||||
logger.Error(ctx, "unsupport diagram node link process action", "action", action, "error", err)
|
logger.Error(ctx, "unsupport diagram node link process action", "action", action, "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"modelRT/constants"
|
"modelRT/common"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
|
|
@ -19,7 +19,7 @@ func MeasurementGetHandler(c *gin.Context) {
|
||||||
|
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := constants.ErrGetClientToken
|
err := common.ErrGetClientToken
|
||||||
|
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"modelRT/common"
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/diagram"
|
"modelRT/diagram"
|
||||||
|
|
@ -20,7 +21,7 @@ func MeasurementLinkHandler(c *gin.Context) {
|
||||||
var request network.MeasurementLinkRequest
|
var request network.MeasurementLinkRequest
|
||||||
clientToken := c.GetString("client_token")
|
clientToken := c.GetString("client_token")
|
||||||
if clientToken == "" {
|
if clientToken == "" {
|
||||||
err := constants.ErrGetClientToken
|
err := common.ErrGetClientToken
|
||||||
logger.Error(c, "failed to get client token from context", "error", err)
|
logger.Error(c, "failed to get client token from context", "error", err)
|
||||||
c.JSON(http.StatusOK, network.FailureResponse{
|
c.JSON(http.StatusOK, network.FailureResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
|
|
@ -93,7 +94,7 @@ func MeasurementLinkHandler(c *gin.Context) {
|
||||||
logger.Error(c, "del measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err)
|
logger.Error(c, "del measurement link process operation failed", "measurement_id", measurementID, "action", action, "error", err)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
err = constants.ErrUnsupportedLinkAction
|
err = common.ErrUnsupportedLinkAction
|
||||||
logger.Error(c, "unsupport measurement link process action", "measurement_id", measurementID, "action", action, "error", err)
|
logger.Error(c, "unsupport measurement link process action", "measurement_id", measurementID, "action", action, "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"maps"
|
"maps"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"modelRT/common"
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/database"
|
"modelRT/database"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
|
|
@ -177,7 +178,7 @@ func RealTimeSubHandler(c *gin.Context) {
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedSubAction, request.Action)
|
err := fmt.Errorf("%w: request action is %s", common.ErrUnsupportedSubAction, request.Action)
|
||||||
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
|
logger.Error(c, "unsupported action of real time data subscription request", "error", err)
|
||||||
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
|
requestTargetsCount := processRealTimeRequestCount(request.Measurements)
|
||||||
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, constants.CodeUnsupportSubOperation, err)
|
results := processRealTimeRequestTargets(request.Measurements, requestTargetsCount, constants.CodeUnsupportSubOperation, err)
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"modelRT/common"
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -61,7 +62,7 @@ func generateChannelName(prefix string, number int, suffix string) (string, erro
|
||||||
switch prefix {
|
switch prefix {
|
||||||
case constants.ChannelPrefixTelemetry:
|
case constants.ChannelPrefixTelemetry:
|
||||||
if number > 10 {
|
if number > 10 {
|
||||||
return "", constants.ErrExceedsLimitType
|
return "", common.ErrExceedsLimitType
|
||||||
}
|
}
|
||||||
var builder strings.Builder
|
var builder strings.Builder
|
||||||
numberStr := strconv.Itoa(number)
|
numberStr := strconv.Itoa(number)
|
||||||
|
|
@ -86,7 +87,7 @@ func generateChannelName(prefix string, number int, suffix string) (string, erro
|
||||||
channelName := builder.String()
|
channelName := builder.String()
|
||||||
return channelName, nil
|
return channelName, nil
|
||||||
default:
|
default:
|
||||||
return "", constants.ErrUnsupportedChannelPrefixType
|
return "", common.ErrUnsupportedChannelPrefixType
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -164,14 +165,14 @@ func (m MeasurementDataSource) GetIOAddress() (IOAddress, error) {
|
||||||
if addr, ok := m.IOAddress.(CL3611Address); ok {
|
if addr, ok := m.IOAddress.(CL3611Address); ok {
|
||||||
return addr, nil
|
return addr, nil
|
||||||
}
|
}
|
||||||
return nil, constants.ErrInvalidAddressType
|
return nil, common.ErrInvalidAddressType
|
||||||
case constants.DataSourceTypePower104:
|
case constants.DataSourceTypePower104:
|
||||||
if addr, ok := m.IOAddress.(Power104Address); ok {
|
if addr, ok := m.IOAddress.(Power104Address); ok {
|
||||||
return addr, nil
|
return addr, nil
|
||||||
}
|
}
|
||||||
return nil, constants.ErrInvalidAddressType
|
return nil, common.ErrInvalidAddressType
|
||||||
default:
|
default:
|
||||||
return nil, constants.ErrUnknownDataType
|
return nil, common.ErrUnknownDataType
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,13 +3,12 @@ package event
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
|
"modelRT/common"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/mq"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type actionHandler func(ctx context.Context, content string, ops ...EventOption) error
|
type actionHandler func(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error)
|
||||||
|
|
||||||
// actionDispatchMap define variable to store all action handler into map
|
// actionDispatchMap define variable to store all action handler into map
|
||||||
var actionDispatchMap = map[string]actionHandler{
|
var actionDispatchMap = map[string]actionHandler{
|
||||||
|
|
@ -21,79 +20,63 @@ var actionDispatchMap = map[string]actionHandler{
|
||||||
}
|
}
|
||||||
|
|
||||||
// TriggerEventAction define func to trigger event by action in compute config
|
// TriggerEventAction define func to trigger event by action in compute config
|
||||||
func TriggerEventAction(ctx context.Context, command string, eventName string, ops ...EventOption) {
|
func TriggerEventAction(ctx context.Context, command string, eventName string, ops ...EventOption) (*EventRecord, error) {
|
||||||
handler, exists := actionDispatchMap[command]
|
handler, exists := actionDispatchMap[command]
|
||||||
if !exists {
|
if !exists {
|
||||||
logger.Error(ctx, "unknown action command", "command", command)
|
logger.Error(ctx, "unknown action command", "command", command)
|
||||||
return
|
return nil, common.ErrUnknowEventActionCommand
|
||||||
}
|
}
|
||||||
err := handler(ctx, eventName, ops...)
|
|
||||||
|
eventRecord, err := handler(ctx, eventName, ops...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "action handler failed", "command", command, "event_name", eventName, "error", err)
|
logger.Error(ctx, "action event handler failed", "error", err)
|
||||||
return
|
return nil, common.ErrExecEventActionFailed
|
||||||
}
|
}
|
||||||
logger.Info(ctx, "action handler success", "command", command, "event_name", eventName)
|
return eventRecord, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleInfoAction(ctx context.Context, eventName string, ops ...EventOption) error {
|
func handleInfoAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) {
|
||||||
|
logger.Info(ctx, "trigger info event", "event_name", eventName)
|
||||||
eventRecord, err := NewGeneralPlatformSoftRecord(eventName, ops...)
|
eventRecord, err := NewGeneralPlatformSoftRecord(eventName, ops...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "generate info event record failed", "error", err)
|
logger.Error(ctx, "generate info event record failed", "error", err)
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
recordBytes, err := json.Marshal(eventRecord)
|
return eventRecord, nil
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
mq.MsgChan <- recordBytes
|
|
||||||
logger.Info(ctx, "trigger info event", "event_name", eventName)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) error {
|
func handleWarningAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) {
|
||||||
|
logger.Info(ctx, "trigger warning event", "event_name", eventName)
|
||||||
eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...)
|
eventRecord, err := NewWarnPlatformSoftRecord(eventName, ops...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "generate warning event record failed", "error", err)
|
logger.Error(ctx, "generate warning event record failed", "error", err)
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
recordBytes, err := json.Marshal(eventRecord)
|
return eventRecord, nil
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
mq.MsgChan <- recordBytes
|
|
||||||
logger.Info(ctx, "trigger warning event", "event_name", eventName)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleErrorAction(ctx context.Context, eventName string, ops ...EventOption) error {
|
func handleErrorAction(ctx context.Context, eventName string, ops ...EventOption) (*EventRecord, error) {
|
||||||
|
logger.Info(ctx, "trigger error event", "event_name", eventName)
|
||||||
eventRecord, err := NewCriticalPlatformSoftRecord(eventName, ops...)
|
eventRecord, err := NewCriticalPlatformSoftRecord(eventName, ops...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "generate error event record failed", "error", err)
|
logger.Error(ctx, "generate error event record failed", "error", err)
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
recordBytes, err := json.Marshal(eventRecord)
|
return eventRecord, nil
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
mq.MsgChan <- recordBytes
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleCriticalAction(ctx context.Context, content string, ops ...EventOption) error {
|
func handleCriticalAction(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error) {
|
||||||
// 实际执行发送警告、记录日志等操作
|
// 实际执行发送警告、记录日志等操作
|
||||||
actionParams := content
|
actionParams := content
|
||||||
// ... logic to send critical level event using actionParams ...
|
// ... logic to send critical level event using actionParams ...
|
||||||
logger.Warn(ctx, "trigger critical event", "message", actionParams)
|
logger.Warn(ctx, "trigger critical event", "message", actionParams)
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleExceptionAction(ctx context.Context, content string, ops ...EventOption) error {
|
func handleExceptionAction(ctx context.Context, content string, ops ...EventOption) (*EventRecord, error) {
|
||||||
// 实际执行发送警告、记录日志等操作
|
// 实际执行发送警告、记录日志等操作
|
||||||
actionParams := content
|
actionParams := content
|
||||||
// ... logic to send except level event using actionParams ...
|
// ... logic to send except level event using actionParams ...
|
||||||
logger.Warn(ctx, "trigger except event", "message", actionParams)
|
logger.Warn(ctx, "trigger except event", "message", actionParams)
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
@ -3,19 +3,21 @@ package mq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
|
"modelRT/mq/event"
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ
|
// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ
|
||||||
var MsgChan chan []byte
|
var MsgChan chan *event.EventRecord
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
MsgChan = make(chan []byte, 10000)
|
MsgChan = make(chan *event.EventRecord, 10000)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
|
func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
|
||||||
|
|
@ -25,30 +27,34 @@ func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
|
||||||
channel, err = GetConn().Channel()
|
channel, err = GetConn().Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
|
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil)
|
err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "declare event dead letter exchange failed", "error", err)
|
logger.Error(ctx, "declare event dead letter exchange failed", "error", err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil)
|
_, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "declare event dead letter queue failed", "error", err)
|
logger.Error(ctx, "declare event dead letter queue failed", "error", err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = channel.QueueBind(constants.EventUpDownDeadQueueName, constants.EventUpDownDeadRoutingKey, constants.EventDeadExchangeName, false, nil)
|
err = channel.QueueBind(constants.EventUpDownDeadQueueName, "#", constants.EventDeadExchangeName, false, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err)
|
logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil)
|
err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "declare event exchange failed", "error", err)
|
logger.Error(ctx, "declare event exchange failed", "error", err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
args := amqp.Table{
|
args := amqp.Table{
|
||||||
// messages that accumulate to the maximum number will be automatically transferred to the dead letter queue
|
|
||||||
"x-max-length": int32(50),
|
"x-max-length": int32(50),
|
||||||
"x-dead-letter-exchange": constants.EventDeadExchangeName,
|
"x-dead-letter-exchange": constants.EventDeadExchangeName,
|
||||||
"x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey,
|
"x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey,
|
||||||
|
|
@ -56,21 +62,24 @@ func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
|
||||||
_, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args)
|
_, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "declare event queue failed", "error", err)
|
logger.Error(ctx, "declare event queue failed", "error", err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil)
|
err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "bind event queue with routing key and exchange failed:", "error", err)
|
logger.Error(ctx, "bind event queue with routing key and exchange failed", "error", err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := channel.Confirm(false); err != nil {
|
if err := channel.Confirm(false); err != nil {
|
||||||
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
|
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return channel, nil
|
return channel, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ
|
// PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ
|
||||||
func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan []byte) {
|
func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *event.EventRecord) {
|
||||||
channel, err := initUpDownLimitEventChannel(ctx)
|
channel, err := initUpDownLimitEventChannel(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
|
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
|
||||||
|
|
@ -101,28 +110,36 @@ func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan []byte) {
|
||||||
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel")
|
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel")
|
||||||
channel.Close()
|
channel.Close()
|
||||||
return
|
return
|
||||||
case msg, ok := <-msgChan:
|
case eventRecord, ok := <-msgChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop")
|
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop")
|
||||||
channel.Close()
|
channel.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO 将消息的序列化移动到发送之前,以便使用eventRecord的category来作为routing key
|
||||||
|
recordBytes, err := json.Marshal(eventRecord)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// send event alarm message to rabbitMQ queue
|
// send event alarm message to rabbitMQ queue
|
||||||
|
routingKey := eventRecord.Category
|
||||||
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
err = channel.PublishWithContext(pubCtx,
|
err = channel.PublishWithContext(pubCtx,
|
||||||
constants.EventExchangeName, // exchange
|
constants.EventExchangeName, // exchange
|
||||||
constants.EventUpDownRoutingKey, // routing key
|
routingKey, // routing key
|
||||||
false, // mandatory
|
false, // mandatory
|
||||||
false, // immediate
|
false, // immediate
|
||||||
amqp.Publishing{
|
amqp.Publishing{
|
||||||
ContentType: "text/plain",
|
ContentType: "text/plain",
|
||||||
Body: msg,
|
Body: recordBytes,
|
||||||
})
|
})
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", msg, "error", err)
|
logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", recordBytes, "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"modelRT/common"
|
||||||
"modelRT/common/errcode"
|
"modelRT/common/errcode"
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/orm"
|
"modelRT/orm"
|
||||||
|
|
@ -64,10 +65,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) {
|
||||||
switch info.ChangeType {
|
switch info.ChangeType {
|
||||||
case constants.UUIDFromChangeType:
|
case constants.UUIDFromChangeType:
|
||||||
if info.NewUUIDFrom == info.OldUUIDFrom {
|
if info.NewUUIDFrom == info.OldUUIDFrom {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT1)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT1)
|
||||||
}
|
}
|
||||||
if info.NewUUIDTo != info.OldUUIDTo {
|
if info.NewUUIDTo != info.OldUUIDTo {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT1)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT1)
|
||||||
}
|
}
|
||||||
|
|
||||||
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
|
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
|
||||||
|
|
@ -90,10 +91,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) {
|
||||||
UUIDChangeInfo.NewUUIDTo = OldUUIDTo
|
UUIDChangeInfo.NewUUIDTo = OldUUIDTo
|
||||||
case constants.UUIDToChangeType:
|
case constants.UUIDToChangeType:
|
||||||
if info.NewUUIDFrom != info.OldUUIDFrom {
|
if info.NewUUIDFrom != info.OldUUIDFrom {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT2)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT2)
|
||||||
}
|
}
|
||||||
if info.NewUUIDTo == info.OldUUIDTo {
|
if info.NewUUIDTo == info.OldUUIDTo {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT2)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT2)
|
||||||
}
|
}
|
||||||
|
|
||||||
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
|
oldUUIDFrom, err := uuid.FromString(info.OldUUIDFrom)
|
||||||
|
|
@ -116,10 +117,10 @@ func ParseUUID(info TopologicChangeInfo) (TopologicUUIDChangeInfos, error) {
|
||||||
UUIDChangeInfo.NewUUIDTo = newUUIDTo
|
UUIDChangeInfo.NewUUIDTo = newUUIDTo
|
||||||
case constants.UUIDAddChangeType:
|
case constants.UUIDAddChangeType:
|
||||||
if info.OldUUIDFrom != "" {
|
if info.OldUUIDFrom != "" {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDFromCheckT3)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDFromCheckT3)
|
||||||
}
|
}
|
||||||
if info.OldUUIDTo != "" {
|
if info.OldUUIDTo != "" {
|
||||||
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", constants.ErrUUIDToCheckT3)
|
return UUIDChangeInfo, fmt.Errorf("topologic change data check failed:%w", common.ErrUUIDToCheckT3)
|
||||||
}
|
}
|
||||||
|
|
||||||
newUUIDFrom, err := uuid.FromString(info.NewUUIDFrom)
|
newUUIDFrom, err := uuid.FromString(info.NewUUIDFrom)
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,8 @@ import (
|
||||||
|
|
||||||
"modelRT/constants"
|
"modelRT/constants"
|
||||||
"modelRT/logger"
|
"modelRT/logger"
|
||||||
"modelRT/real-time-data/event"
|
"modelRT/mq"
|
||||||
|
"modelRT/mq/event"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
|
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
|
||||||
|
|
@ -131,7 +132,7 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE
|
||||||
opts := []event.EventOption{
|
opts := []event.EventOption{
|
||||||
event.WithConditionValue(triggerValues, conf.Cause),
|
event.WithConditionValue(triggerValues, conf.Cause),
|
||||||
event.WithTEAnalysisResult(firstBreachType),
|
event.WithTEAnalysisResult(firstBreachType),
|
||||||
event.WithCategory(constants.EventUpDownRoutingKey),
|
event.WithCategory(constants.EventWarnUpDownLimitCategroy),
|
||||||
// TODO 生成 operations并考虑如何放入 event 中
|
// TODO 生成 operations并考虑如何放入 event 中
|
||||||
// event.WithOperations(nil)
|
// event.WithOperations(nil)
|
||||||
}
|
}
|
||||||
|
|
@ -149,8 +150,12 @@ func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teE
|
||||||
// trigger Action
|
// trigger Action
|
||||||
command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action)
|
command, mainBody := genTEEventCommandAndMainBody(ctx, conf.Action)
|
||||||
eventName := fmt.Sprintf("telemetry_%s_%s_Breach_Event", mainBody, breachType)
|
eventName := fmt.Sprintf("telemetry_%s_%s_Breach_Event", mainBody, breachType)
|
||||||
event.TriggerEventAction(ctx, command, eventName, trigger.eventOpts...)
|
eventRecord, err := event.TriggerEventAction(ctx, command, eventName, trigger.eventOpts...)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "trigger event action failed", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mq.MsgChan <- eventRecord
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -320,7 +325,12 @@ func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiE
|
||||||
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "main_body", mainBody)
|
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "main_body", mainBody)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
event.TriggerEventAction(ctx, command, mainBody)
|
eventRecord, err := event.TriggerEventAction(ctx, command, mainBody)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "trigger event action failed", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mq.MsgChan <- eventRecord
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue