diff --git a/alert/init.go b/alert/init.go new file mode 100644 index 0000000..e70e4a0 --- /dev/null +++ b/alert/init.go @@ -0,0 +1,60 @@ +package alert + +import ( + "sync" + + "modelRT/constant" +) + +var ( + once sync.Once + _globalManagerMu sync.RWMutex + _globalManager *EventManager +) + +// Event define alert event struct +type Event struct { + ComponentID int64 + AnchorName string + Level constant.AlertLevel + Message string + StartTime int64 +} + +// EventManager define store and manager alert event struct +type EventManager struct { + events map[constant.AlertLevel][]Event +} + +// AddEvent 添加一个新的报警事件 +func (am *EventManager) AddEvent(event Event) { + am.events[event.Level] = append(am.events[event.Level], event) +} + +// GetEventsByLevel 根据报警等级查找报警事件 +func (am *EventManager) GetEventsByLevel(level constant.AlertLevel) []Event { + return am.events[level] +} + +// InitAlertEventManager define new alert event manager +func InitAlertEventManager() *EventManager { + return &EventManager{ + events: make(map[constant.AlertLevel][]Event), + } +} + +// InitAlertManagerInstance return instance of zap logger +func InitAlertManagerInstance() *EventManager { + once.Do(func() { + _globalManager = InitAlertEventManager() + }) + return _globalManager +} + +// GetAlertMangerInstance returns the global alert manager instance It's safe for concurrent use. +func GetAlertMangerInstance() *EventManager { + _globalManagerMu.RLock() + manager := _globalManager + _globalManagerMu.RUnlock() + return manager +} diff --git a/config/config.go b/config/config.go index 4e36ae5..0915713 100644 --- a/config/config.go +++ b/config/config.go @@ -45,8 +45,8 @@ type LoggerConfig struct { // AntsConfig define config stuct of ants pool config type AntsConfig struct { - ParseConcurrentQuantity int `mapstructure:"parse_concurrent_quantity"` // parse comtrade file concurrent quantity - PollingConcurrentQuantity int `mapstructure:"polling_concurrent_quantity"` // polling real time data concurrent quantity + ParseConcurrentQuantity int `mapstructure:"parse_concurrent_quantity"` // parse comtrade file concurrent quantity + RTDReceiveConcurrentQuantity int `mapstructure:"rtd_receive_concurrent_quantity"` // polling real time data concurrent quantity } // DataRTConfig define config stuct of data runtime server api config diff --git a/config/config.yaml b/config/config.yaml index a42f9af..9df949b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -33,7 +33,7 @@ logger: # ants config ants: parse_concurrent_quantity: 10 - polling_concurrent_quantity: 10 + rtd_receive_concurrent_quantity: 10 # modelRT base config base: diff --git a/constant/alert.go b/constant/alert.go new file mode 100644 index 0000000..35739d5 --- /dev/null +++ b/constant/alert.go @@ -0,0 +1,35 @@ +// Package constant define alert level constant +package constant + +// AlertLevel define alert level type +type AlertLevel int + +const ( + // AllAlertLevel define all alert level + AllAlertLevel AlertLevel = iota + // InfoAlertLevel define info alert level + InfoAlertLevel + // WarningAlertLevel define warning alert level + WarningAlertLevel + // ErrorAlertLevel define error alert level + ErrorAlertLevel + // FatalAlertLevel define fatal alert level + FatalAlertLevel +) + +func (a AlertLevel) String() string { + switch a { + case AllAlertLevel: + return "ALL" + case InfoAlertLevel: + return "INFO" + case WarningAlertLevel: + return "WARNING" + case ErrorAlertLevel: + return "ERROR" + case FatalAlertLevel: + return "FATAL" + default: + return "Unknown" + } +} diff --git a/diagram/anchor_set.go b/diagram/anchor_set.go index ac548be..6b52b23 100644 --- a/diagram/anchor_set.go +++ b/diagram/anchor_set.go @@ -9,11 +9,11 @@ import ( // anchorValueOverview define struct of storage all anchor value var anchorValueOverview sync.Map -// GetAnchorValue define func of get circuit diagram data by global uuid -func GetAnchorValue(uuid string) (string, error) { - value, ok := diagramsOverview.Load(uuid) +// GetAnchorValue define func of get circuit diagram data by componentID +func GetAnchorValue(componentID int64) (string, error) { + value, ok := diagramsOverview.Load(componentID) if !ok { - return "", fmt.Errorf("can not find anchor value by global uuid:%s", uuid) + return "", fmt.Errorf("can not find anchor value by componentID:%d", componentID) } anchorValue, ok := value.(string) if !ok { @@ -22,20 +22,20 @@ func GetAnchorValue(uuid string) (string, error) { return anchorValue, nil } -// UpdateAnchorValue define func of update anchor value by global uuid and anchor name -func UpdateAnchorValue(uuid string, anchorValue string) bool { - _, result := anchorValueOverview.Swap(uuid, anchorValue) +// UpdateAnchorValue define func of update anchor value by componentID and anchor name +func UpdateAnchorValue(componentID int64, anchorValue string) bool { + _, result := anchorValueOverview.Swap(componentID, anchorValue) return result } -// StoreAnchorValue define func of store anchor value with global uuid and anchor name -func StoreAnchorValue(uuid string, anchorValue string) { - anchorValueOverview.Store(uuid, anchorValue) +// StoreAnchorValue define func of store anchor value with componentID and anchor name +func StoreAnchorValue(componentID int64, anchorValue string) { + anchorValueOverview.Store(componentID, anchorValue) return } -// DeleteAnchorValue define func of delete anchor value with global uuid -func DeleteAnchorValue(uuid string) { - anchorValueOverview.Delete(uuid) +// DeleteAnchorValue define func of delete anchor value with componentID +func DeleteAnchorValue(componentID int64) { + anchorValueOverview.Delete(componentID) return } diff --git a/handler/anchor_point_replace.go b/handler/anchor_point_replace.go index 34c0c90..eeef2d5 100644 --- a/handler/anchor_point_replace.go +++ b/handler/anchor_point_replace.go @@ -7,7 +7,6 @@ import ( "net/http" "time" - "modelRT/config" "modelRT/constant" "modelRT/database" "modelRT/diagram" @@ -16,7 +15,6 @@ import ( "modelRT/network" "modelRT/orm" - "github.com/bitly/go-simplejson" "github.com/gin-gonic/gin" "go.uber.org/zap" ) @@ -92,62 +90,11 @@ func ComponentAnchorReplaceHandler(c *gin.Context) { return } - configsStr, exist := unmarshalMap["anchor_configs_list"] - if !exist { - logger.Error("can not find anchor config list in model model detail info") - header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: "can not find anchor config list in model model detail info"} - resp := network.FailureResponse{ - FailResponseHeader: header, - } - c.JSON(http.StatusOK, resp) - return + componentType := unmarshalMap["component_type"].(int) + if componentType != constant.DemoType { + logger.Error("can not process real time data of component type not equal DemoType", zap.Int64("component_id", componentInfo.ID)) } - - anchorConfigsJSON, err := simplejson.NewJson([]byte(configsStr.(string))) - if err != nil { - logger.Error("formmat anchor configs json failed", zap.Error(err)) - header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()} - resp := network.FailureResponse{ - FailResponseHeader: header, - } - c.JSON(http.StatusOK, resp) - return - } - - var existFlag bool - var anchorIndex int - paramsList := anchorConfigsJSON.Get("params_list").MustArray() - for index := range paramsList { - paramAnchorName := anchorConfigsJSON.Get("params_list").GetIndex(index).Get("anchor_name").MustString() - if anchorName == paramAnchorName { - existFlag = true - anchorIndex = index - } - } - - if !existFlag { - header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: "can not find new anchor name in model anchor point list"} - resp := network.FailureResponse{ - FailResponseHeader: header, - } - c.JSON(http.StatusOK, resp) - return - } - - diagram.UpdateAnchorValue(uuid, anchorName) - - // TODO 检查 anchorParam 配置 - anchorParam := config.AnchorParamConfig{ - AnchorParamBaseConfig: config.AnchorParamBaseConfig{ - ComponentID: componentInfo.ID, - AnchorName: anchorName, - CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("upper_limit").MustFloat64(), - CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("lower_limit").MustFloat64(), - }, - } - anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(componentInfo.ComponentType, anchorName, unmarshalMap) - // TODO 改成启动diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig - // realtimedata.AnchorParamsChan <- anchorParam + diagram.UpdateAnchorValue(componentInfo.ID, anchorName) resp := network.SuccessResponse{ SuccessResponseHeader: network.SuccessResponseHeader{Status: http.StatusOK}, diff --git a/main.go b/main.go index 351d8ac..b3b17db 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "flag" "time" + "modelRT/alert" "modelRT/config" "modelRT/database" _ "modelRT/docs" @@ -41,6 +42,7 @@ var ( modelRTConfig config.ModelRTConfig postgresDBClient *gorm.DB zapLogger *zap.Logger + alertManager *alert.EventManager ) // TODO 使用 wire 依赖注入管理 DVIE 面板注册的 panel @@ -64,6 +66,9 @@ func main() { zapLogger = logger.InitLoggerInstance(modelRTConfig.LoggerConfig) defer zapLogger.Sync() + // init alert manager + _ = alert.InitAlertEventManager() + // init model parse ants pool parsePool, err := ants.NewPoolWithFunc(modelRTConfig.ParseConcurrentQuantity, pool.ParseFunc) if err != nil { @@ -73,23 +78,22 @@ func main() { defer parsePool.Release() // init anchor param ants pool - // TODO 修改PollingConcurrentQuantity 名称 - anchorPool, err := ants.NewPoolWithFunc(modelRTConfig.PollingConcurrentQuantity, pool.AnchorFunc) + anchorRealTimePool, err := pool.AnchorPoolInit(modelRTConfig.RTDReceiveConcurrentQuantity) if err != nil { - zapLogger.Error("init concurrent data polling task pool failed", zap.Error(err)) + zapLogger.Error("init concurrent anchor param task pool failed", zap.Error(err)) panic(err) } - defer anchorPool.Release() + defer anchorRealTimePool.Release() // init cancel context cancelCtx, cancel := context.WithCancel(ctx) defer cancel() // init real time data receive channel - go realtimedata.ReceiveChan(cancelCtx, anchorPool) + go realtimedata.ReceiveChan(cancelCtx) postgresDBClient.Transaction(func(tx *gorm.DB) error { // load circuit diagram from postgres - err := database.QueryCircuitDiagramComponentFromDB(ctx, tx, parsePool, zapLogger) + err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool, zapLogger) if err != nil { zapLogger.Error("load circuit diagrams from postgres failed", zap.Error(err)) panic(err) diff --git a/diagram/component_chan_set.go b/pool/component_chan_set.go similarity index 75% rename from diagram/component_chan_set.go rename to pool/component_chan_set.go index 775d186..d615f4f 100644 --- a/diagram/component_chan_set.go +++ b/pool/component_chan_set.go @@ -1,12 +1,10 @@ -package diagram +package pool import ( "context" "sync" "modelRT/config" - - "github.com/panjf2000/ants/v2" ) func init() { @@ -23,20 +21,20 @@ type ComponentChanSet struct { AnchorChans []chan config.AnchorParamConfig } -func GetComponentChan(ctx context.Context, componentID int64, pool *ants.PoolWithFunc) chan config.AnchorParamConfig { +func GetComponentChan(ctx context.Context, componentID int64) chan config.AnchorParamConfig { globalComponentChanSet.RLock() componentChan := globalComponentChanSet.AnchorChans[componentID] if componentChan == nil { globalComponentChanSet.RUnlock() globalComponentChanSet.Lock() defer globalComponentChanSet.Unlock() - return CreateComponentChan(ctx, componentID, pool) + return CreateNewComponentChan(ctx, componentID) } globalComponentChanSet.RUnlock() return componentChan } -func CreateComponentChan(ctx context.Context, componentID int64, pool *ants.PoolWithFunc) chan config.AnchorParamConfig { +func CreateNewComponentChan(ctx context.Context, componentID int64) chan config.AnchorParamConfig { componentChan := globalComponentChanSet.AnchorChans[componentID] if componentChan == nil { componentChan = make(chan config.AnchorParamConfig, 100) @@ -48,7 +46,9 @@ func CreateComponentChan(ctx context.Context, componentID int64, pool *ants.Pool AnchorChan: componentChan, ReadyChan: readyChan, } - pool.Invoke(chanConfig) + + AnchorRealTimePool.Invoke(chanConfig) + <-readyChan return componentChan } diff --git a/pool/concurrency_anchor_parse.go b/pool/concurrency_anchor_parse.go index a4cb67c..8e2cb68 100644 --- a/pool/concurrency_anchor_parse.go +++ b/pool/concurrency_anchor_parse.go @@ -3,23 +3,43 @@ package pool import ( "fmt" + "time" + "modelRT/alert" "modelRT/config" + "modelRT/constant" + "modelRT/diagram" "modelRT/logger" + + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" ) +// AnchorRealTimePool define anchor param pool of real time data +var AnchorRealTimePool *ants.PoolWithFunc + +// AnchorPoolInit define anchor param pool init +func AnchorPoolInit(concurrentQuantity int) (pool *ants.PoolWithFunc, err error) { + // init anchor param ants pool + AnchorRealTimePool, err = ants.NewPoolWithFunc(concurrentQuantity, AnchorFunc) + if err != nil { + return nil, err + } + return AnchorRealTimePool, nil +} + // AnchorFunc defines func that process the real time data of component anchor params -var AnchorFunc = func(anchorChan interface{}) { +var AnchorFunc = func(poolConfig interface{}) { var firstStart bool logger := logger.GetLoggerInstance() + alertManager := alert.GetAlertMangerInstance() - fmt.Println(logger) - fmt.Println(anchorChan) - anchorChanConfig, ok := anchorChan.(config.AnchorChanConfig) + anchorChanConfig, ok := poolConfig.(config.AnchorChanConfig) if !ok { logger.Error("conversion component anchor chan type failed") return } + for { select { case <-anchorChanConfig.Ctx.Done(): @@ -29,76 +49,39 @@ var AnchorFunc = func(anchorChan interface{}) { close(anchorChanConfig.ReadyChan) firstStart = false } - // TODO 重写anchorParaConfig 处理逻辑 - fmt.Println(anchorParaConfig) + + componentID := anchorParaConfig.ComponentID + anchorRealTimeDatas := anchorParaConfig.AnchorRealTimeData + + for _, value := range anchorRealTimeDatas { + anchorName, err := diagram.GetAnchorValue(componentID) + if err != nil { + logger.Error("can not get anchor value from map by uuid", zap.Int64("component_id", componentID), zap.Error(err)) + continue + } + + if anchorName != anchorParaConfig.AnchorName { + logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", anchorParaConfig.AnchorName)) + continue + } + + upperLimitVal := anchorParaConfig.CompareValUpperLimit + lowerlimitVal := anchorParaConfig.CompareValLowerLimit + compareValue := anchorParaConfig.CalculateFunc(value, anchorParaConfig.CalculateParams...) + if compareValue > upperLimitVal || compareValue < lowerlimitVal { + message := fmt.Sprintf("anchor param %s value out of range, value:%f, upper limit:%f, lower limit:%f", anchorName, + compareValue, upperLimitVal, lowerlimitVal) + event := alert.Event{ + ComponentID: componentID, + AnchorName: anchorName, + Level: constant.InfoAlertLevel, + Message: message, + StartTime: time.Now().Unix(), + } + alertManager.AddEvent(event) + } + } default: } } - // var firstTimePolling bool - - // paramConfig, ok := anchorConfig.(config.AnchorParamConfig) - // if !ok { - // logger.Error("conversion model anchor param config type failed") - // return - // } - - // for { - // var beginUnixTime, endUnixTime int64 - // if firstTimePolling { - // milliUnixTime := time.Now().UnixMilli() - // endUnixTime = milliUnixTime - // beginUnixTime = milliUnixTime - 1000*60 - // firstTimePolling = false - // } else { - // // 判断时间差值是否小于10s,如果小于则sleep0.5s后重新获取时间 - // endUnixTime = time.Now().UnixMilli() - // if endUnixTime-beginUnixTime < 1000 { - // time.Sleep(time.Duration(500 * time.Millisecond)) - // endUnixTime = time.Now().UnixMilli() - // } - // } - - // pollingAPI := network.APIEndpoint{ - // URL: paramConfig.APIURL, - // Method: paramConfig.APIMethod, - // QueryParams: map[string]string{ - // "station": paramConfig.StationID, - // "component": paramConfig.ComponentID, - // "point": paramConfig.AnchorName, - // "begin": strconv.FormatInt(beginUnixTime, 10), - // "end": strconv.FormatInt(endUnixTime, 10), - // }, - // } - - // if !firstTimePolling { - // beginUnixTime = time.Now().UnixMilli() - // } - - // valueSlice, err := network.PollAPIEndpoints(pollingAPI) - // if err != nil { - // logger.Error("polling real time data from dataRT service failed", zap.Error(err)) - // continue - // } - - // for _, value := range valueSlice { - // anchorName, err := diagram.GetAnchorValue(paramConfig.UUID) - // if err != nil { - // logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err)) - // continue - // } - - // if anchorName != paramConfig.AnchorName { - // logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", paramConfig.AnchorName)) - // continue - // } - - // upperLimitVal := paramConfig.CompareValUpperLimit - // lowerlimitVal := paramConfig.CompareValLowerLimit - // compareValue := paramConfig.CalculateFunc(value, paramConfig.CalculateParams...) - // if compareValue > upperLimitVal || compareValue < lowerlimitVal { - // // TODO 选择报警方式 - // fmt.Println("log warning") - // } - // } - // } } diff --git a/pool/concurrency_model_parse.go b/pool/concurrency_model_parse.go index 19914ae..b0a4fa6 100644 --- a/pool/concurrency_model_parse.go +++ b/pool/concurrency_model_parse.go @@ -11,7 +11,6 @@ import ( "modelRT/logger" "modelRT/model" - "github.com/bitly/go-simplejson" "go.uber.org/zap" ) @@ -41,37 +40,17 @@ var ParseFunc = func(parseConfig interface{}) { return } - uuid := modelParseConfig.ComponentInfo.GlobalUUID.String() - - configsStr, exist := unmarshalMap["anchor_configs_list"] - if exist { - anchorConfigsJSON, err := simplejson.NewJson([]byte(configsStr.(string))) - if err != nil { - logger.Error("formmat anchor configs json failed", zap.Error(err)) - return - } - - paramsList := anchorConfigsJSON.Get("params_list").MustArray() - for index := range paramsList { - anchorName := anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustString() - - // TODO 检查anchorParam 配置是否正确 - anchorParam := config.AnchorParamConfig{ - AnchorParamBaseConfig: config.AnchorParamBaseConfig{ - ComponentID: modelParseConfig.ComponentInfo.ID, - - AnchorName: anchorName, - CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustFloat64(), - CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("lower_limit").MustFloat64(), - }, - } - anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(modelParseConfig.ComponentInfo.ComponentType, anchorName, unmarshalMap) - diagram.StoreAnchorValue(modelParseConfig.ComponentInfo.GlobalUUID.String(), anchorName) - // TODO 改成启动diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig - // realtimedata.AnchorParamsChan <- anchorParam - } + var anchorName string + if anchoringV := unmarshalMap["anchor_v"].(bool); anchoringV { + anchorName = "voltage" + } else { + anchorName = "current" } + diagram.StoreAnchorValue(modelParseConfig.ComponentInfo.ID, anchorName) + GetComponentChan(modelParseConfig.Context, modelParseConfig.ComponentInfo.ID) + + uuid := modelParseConfig.ComponentInfo.GlobalUUID.String() unmarshalMap["id"] = modelParseConfig.ComponentInfo.ID unmarshalMap["uuid"] = uuid unmarshalMap["uuid"] = modelParseConfig.ComponentInfo.Tag diff --git a/real-time-data/real_time_data_receive.go b/real-time-data/real_time_data_receive.go index 3ceb033..2d70b5f 100644 --- a/real-time-data/real_time_data_receive.go +++ b/real-time-data/real_time_data_receive.go @@ -9,8 +9,8 @@ import ( "modelRT/diagram" "modelRT/logger" "modelRT/network" + "modelRT/pool" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -22,7 +22,7 @@ func init() { } // ReceiveChan define func of real time data receive and process -func ReceiveChan(ctx context.Context, pool *ants.PoolWithFunc) { +func ReceiveChan(ctx context.Context) { logger := logger.GetLoggerInstance() for { @@ -79,7 +79,7 @@ func ReceiveChan(ctx context.Context, pool *ants.PoolWithFunc) { CalculateFunc: calculateFunc, CalculateParams: params, } - diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig + pool.GetComponentChan(ctx, componentID) <- anchorConfig default: } }