design an alert event management structure

This commit is contained in:
douxu 2025-01-22 16:38:46 +08:00
parent 2b967450eb
commit 65f71348d6
11 changed files with 200 additions and 192 deletions

60
alert/init.go Normal file
View File

@ -0,0 +1,60 @@
package alert
import (
"sync"
"modelRT/constant"
)
var (
once sync.Once
_globalManagerMu sync.RWMutex
_globalManager *EventManager
)
// Event define alert event struct
type Event struct {
ComponentID int64
AnchorName string
Level constant.AlertLevel
Message string
StartTime int64
}
// EventManager define store and manager alert event struct
type EventManager struct {
events map[constant.AlertLevel][]Event
}
// AddEvent 添加一个新的报警事件
func (am *EventManager) AddEvent(event Event) {
am.events[event.Level] = append(am.events[event.Level], event)
}
// GetEventsByLevel 根据报警等级查找报警事件
func (am *EventManager) GetEventsByLevel(level constant.AlertLevel) []Event {
return am.events[level]
}
// InitAlertEventManager define new alert event manager
func InitAlertEventManager() *EventManager {
return &EventManager{
events: make(map[constant.AlertLevel][]Event),
}
}
// InitAlertManagerInstance return instance of zap logger
func InitAlertManagerInstance() *EventManager {
once.Do(func() {
_globalManager = InitAlertEventManager()
})
return _globalManager
}
// GetAlertMangerInstance returns the global alert manager instance It's safe for concurrent use.
func GetAlertMangerInstance() *EventManager {
_globalManagerMu.RLock()
manager := _globalManager
_globalManagerMu.RUnlock()
return manager
}

View File

@ -46,7 +46,7 @@ type LoggerConfig struct {
// AntsConfig define config stuct of ants pool config
type AntsConfig struct {
ParseConcurrentQuantity int `mapstructure:"parse_concurrent_quantity"` // parse comtrade file concurrent quantity
PollingConcurrentQuantity int `mapstructure:"polling_concurrent_quantity"` // polling real time data concurrent quantity
RTDReceiveConcurrentQuantity int `mapstructure:"rtd_receive_concurrent_quantity"` // polling real time data concurrent quantity
}
// DataRTConfig define config stuct of data runtime server api config

View File

@ -33,7 +33,7 @@ logger:
# ants config
ants:
parse_concurrent_quantity: 10
polling_concurrent_quantity: 10
rtd_receive_concurrent_quantity: 10
# modelRT base config
base:

35
constant/alert.go Normal file
View File

@ -0,0 +1,35 @@
// Package constant define alert level constant
package constant
// AlertLevel define alert level type
type AlertLevel int
const (
// AllAlertLevel define all alert level
AllAlertLevel AlertLevel = iota
// InfoAlertLevel define info alert level
InfoAlertLevel
// WarningAlertLevel define warning alert level
WarningAlertLevel
// ErrorAlertLevel define error alert level
ErrorAlertLevel
// FatalAlertLevel define fatal alert level
FatalAlertLevel
)
func (a AlertLevel) String() string {
switch a {
case AllAlertLevel:
return "ALL"
case InfoAlertLevel:
return "INFO"
case WarningAlertLevel:
return "WARNING"
case ErrorAlertLevel:
return "ERROR"
case FatalAlertLevel:
return "FATAL"
default:
return "Unknown"
}
}

View File

@ -9,11 +9,11 @@ import (
// anchorValueOverview define struct of storage all anchor value
var anchorValueOverview sync.Map
// GetAnchorValue define func of get circuit diagram data by global uuid
func GetAnchorValue(uuid string) (string, error) {
value, ok := diagramsOverview.Load(uuid)
// GetAnchorValue define func of get circuit diagram data by componentID
func GetAnchorValue(componentID int64) (string, error) {
value, ok := diagramsOverview.Load(componentID)
if !ok {
return "", fmt.Errorf("can not find anchor value by global uuid:%s", uuid)
return "", fmt.Errorf("can not find anchor value by componentID:%d", componentID)
}
anchorValue, ok := value.(string)
if !ok {
@ -22,20 +22,20 @@ func GetAnchorValue(uuid string) (string, error) {
return anchorValue, nil
}
// UpdateAnchorValue define func of update anchor value by global uuid and anchor name
func UpdateAnchorValue(uuid string, anchorValue string) bool {
_, result := anchorValueOverview.Swap(uuid, anchorValue)
// UpdateAnchorValue define func of update anchor value by componentID and anchor name
func UpdateAnchorValue(componentID int64, anchorValue string) bool {
_, result := anchorValueOverview.Swap(componentID, anchorValue)
return result
}
// StoreAnchorValue define func of store anchor value with global uuid and anchor name
func StoreAnchorValue(uuid string, anchorValue string) {
anchorValueOverview.Store(uuid, anchorValue)
// StoreAnchorValue define func of store anchor value with componentID and anchor name
func StoreAnchorValue(componentID int64, anchorValue string) {
anchorValueOverview.Store(componentID, anchorValue)
return
}
// DeleteAnchorValue define func of delete anchor value with global uuid
func DeleteAnchorValue(uuid string) {
anchorValueOverview.Delete(uuid)
// DeleteAnchorValue define func of delete anchor value with componentID
func DeleteAnchorValue(componentID int64) {
anchorValueOverview.Delete(componentID)
return
}

View File

@ -7,7 +7,6 @@ import (
"net/http"
"time"
"modelRT/config"
"modelRT/constant"
"modelRT/database"
"modelRT/diagram"
@ -16,7 +15,6 @@ import (
"modelRT/network"
"modelRT/orm"
"github.com/bitly/go-simplejson"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
@ -92,62 +90,11 @@ func ComponentAnchorReplaceHandler(c *gin.Context) {
return
}
configsStr, exist := unmarshalMap["anchor_configs_list"]
if !exist {
logger.Error("can not find anchor config list in model model detail info")
header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: "can not find anchor config list in model model detail info"}
resp := network.FailureResponse{
FailResponseHeader: header,
componentType := unmarshalMap["component_type"].(int)
if componentType != constant.DemoType {
logger.Error("can not process real time data of component type not equal DemoType", zap.Int64("component_id", componentInfo.ID))
}
c.JSON(http.StatusOK, resp)
return
}
anchorConfigsJSON, err := simplejson.NewJson([]byte(configsStr.(string)))
if err != nil {
logger.Error("formmat anchor configs json failed", zap.Error(err))
header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()}
resp := network.FailureResponse{
FailResponseHeader: header,
}
c.JSON(http.StatusOK, resp)
return
}
var existFlag bool
var anchorIndex int
paramsList := anchorConfigsJSON.Get("params_list").MustArray()
for index := range paramsList {
paramAnchorName := anchorConfigsJSON.Get("params_list").GetIndex(index).Get("anchor_name").MustString()
if anchorName == paramAnchorName {
existFlag = true
anchorIndex = index
}
}
if !existFlag {
header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: "can not find new anchor name in model anchor point list"}
resp := network.FailureResponse{
FailResponseHeader: header,
}
c.JSON(http.StatusOK, resp)
return
}
diagram.UpdateAnchorValue(uuid, anchorName)
// TODO 检查 anchorParam 配置
anchorParam := config.AnchorParamConfig{
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
ComponentID: componentInfo.ID,
AnchorName: anchorName,
CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("upper_limit").MustFloat64(),
CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("lower_limit").MustFloat64(),
},
}
anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(componentInfo.ComponentType, anchorName, unmarshalMap)
// TODO 改成启动diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig
// realtimedata.AnchorParamsChan <- anchorParam
diagram.UpdateAnchorValue(componentInfo.ID, anchorName)
resp := network.SuccessResponse{
SuccessResponseHeader: network.SuccessResponseHeader{Status: http.StatusOK},

16
main.go
View File

@ -6,6 +6,7 @@ import (
"flag"
"time"
"modelRT/alert"
"modelRT/config"
"modelRT/database"
_ "modelRT/docs"
@ -41,6 +42,7 @@ var (
modelRTConfig config.ModelRTConfig
postgresDBClient *gorm.DB
zapLogger *zap.Logger
alertManager *alert.EventManager
)
// TODO 使用 wire 依赖注入管理 DVIE 面板注册的 panel
@ -64,6 +66,9 @@ func main() {
zapLogger = logger.InitLoggerInstance(modelRTConfig.LoggerConfig)
defer zapLogger.Sync()
// init alert manager
_ = alert.InitAlertEventManager()
// init model parse ants pool
parsePool, err := ants.NewPoolWithFunc(modelRTConfig.ParseConcurrentQuantity, pool.ParseFunc)
if err != nil {
@ -73,23 +78,22 @@ func main() {
defer parsePool.Release()
// init anchor param ants pool
// TODO 修改PollingConcurrentQuantity 名称
anchorPool, err := ants.NewPoolWithFunc(modelRTConfig.PollingConcurrentQuantity, pool.AnchorFunc)
anchorRealTimePool, err := pool.AnchorPoolInit(modelRTConfig.RTDReceiveConcurrentQuantity)
if err != nil {
zapLogger.Error("init concurrent data polling task pool failed", zap.Error(err))
zapLogger.Error("init concurrent anchor param task pool failed", zap.Error(err))
panic(err)
}
defer anchorPool.Release()
defer anchorRealTimePool.Release()
// init cancel context
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
// init real time data receive channel
go realtimedata.ReceiveChan(cancelCtx, anchorPool)
go realtimedata.ReceiveChan(cancelCtx)
postgresDBClient.Transaction(func(tx *gorm.DB) error {
// load circuit diagram from postgres
err := database.QueryCircuitDiagramComponentFromDB(ctx, tx, parsePool, zapLogger)
err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool, zapLogger)
if err != nil {
zapLogger.Error("load circuit diagrams from postgres failed", zap.Error(err))
panic(err)

View File

@ -1,12 +1,10 @@
package diagram
package pool
import (
"context"
"sync"
"modelRT/config"
"github.com/panjf2000/ants/v2"
)
func init() {
@ -23,20 +21,20 @@ type ComponentChanSet struct {
AnchorChans []chan config.AnchorParamConfig
}
func GetComponentChan(ctx context.Context, componentID int64, pool *ants.PoolWithFunc) chan config.AnchorParamConfig {
func GetComponentChan(ctx context.Context, componentID int64) chan config.AnchorParamConfig {
globalComponentChanSet.RLock()
componentChan := globalComponentChanSet.AnchorChans[componentID]
if componentChan == nil {
globalComponentChanSet.RUnlock()
globalComponentChanSet.Lock()
defer globalComponentChanSet.Unlock()
return CreateComponentChan(ctx, componentID, pool)
return CreateNewComponentChan(ctx, componentID)
}
globalComponentChanSet.RUnlock()
return componentChan
}
func CreateComponentChan(ctx context.Context, componentID int64, pool *ants.PoolWithFunc) chan config.AnchorParamConfig {
func CreateNewComponentChan(ctx context.Context, componentID int64) chan config.AnchorParamConfig {
componentChan := globalComponentChanSet.AnchorChans[componentID]
if componentChan == nil {
componentChan = make(chan config.AnchorParamConfig, 100)
@ -48,7 +46,9 @@ func CreateComponentChan(ctx context.Context, componentID int64, pool *ants.Pool
AnchorChan: componentChan,
ReadyChan: readyChan,
}
pool.Invoke(chanConfig)
AnchorRealTimePool.Invoke(chanConfig)
<-readyChan
return componentChan
}

View File

@ -3,23 +3,43 @@ package pool
import (
"fmt"
"time"
"modelRT/alert"
"modelRT/config"
"modelRT/constant"
"modelRT/diagram"
"modelRT/logger"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// AnchorRealTimePool define anchor param pool of real time data
var AnchorRealTimePool *ants.PoolWithFunc
// AnchorPoolInit define anchor param pool init
func AnchorPoolInit(concurrentQuantity int) (pool *ants.PoolWithFunc, err error) {
// init anchor param ants pool
AnchorRealTimePool, err = ants.NewPoolWithFunc(concurrentQuantity, AnchorFunc)
if err != nil {
return nil, err
}
return AnchorRealTimePool, nil
}
// AnchorFunc defines func that process the real time data of component anchor params
var AnchorFunc = func(anchorChan interface{}) {
var AnchorFunc = func(poolConfig interface{}) {
var firstStart bool
logger := logger.GetLoggerInstance()
alertManager := alert.GetAlertMangerInstance()
fmt.Println(logger)
fmt.Println(anchorChan)
anchorChanConfig, ok := anchorChan.(config.AnchorChanConfig)
anchorChanConfig, ok := poolConfig.(config.AnchorChanConfig)
if !ok {
logger.Error("conversion component anchor chan type failed")
return
}
for {
select {
case <-anchorChanConfig.Ctx.Done():
@ -29,76 +49,39 @@ var AnchorFunc = func(anchorChan interface{}) {
close(anchorChanConfig.ReadyChan)
firstStart = false
}
// TODO 重写anchorParaConfig 处理逻辑
fmt.Println(anchorParaConfig)
componentID := anchorParaConfig.ComponentID
anchorRealTimeDatas := anchorParaConfig.AnchorRealTimeData
for _, value := range anchorRealTimeDatas {
anchorName, err := diagram.GetAnchorValue(componentID)
if err != nil {
logger.Error("can not get anchor value from map by uuid", zap.Int64("component_id", componentID), zap.Error(err))
continue
}
if anchorName != anchorParaConfig.AnchorName {
logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", anchorParaConfig.AnchorName))
continue
}
upperLimitVal := anchorParaConfig.CompareValUpperLimit
lowerlimitVal := anchorParaConfig.CompareValLowerLimit
compareValue := anchorParaConfig.CalculateFunc(value, anchorParaConfig.CalculateParams...)
if compareValue > upperLimitVal || compareValue < lowerlimitVal {
message := fmt.Sprintf("anchor param %s value out of range, value:%f, upper limit:%f, lower limit:%f", anchorName,
compareValue, upperLimitVal, lowerlimitVal)
event := alert.Event{
ComponentID: componentID,
AnchorName: anchorName,
Level: constant.InfoAlertLevel,
Message: message,
StartTime: time.Now().Unix(),
}
alertManager.AddEvent(event)
}
}
default:
}
}
// var firstTimePolling bool
// paramConfig, ok := anchorConfig.(config.AnchorParamConfig)
// if !ok {
// logger.Error("conversion model anchor param config type failed")
// return
// }
// for {
// var beginUnixTime, endUnixTime int64
// if firstTimePolling {
// milliUnixTime := time.Now().UnixMilli()
// endUnixTime = milliUnixTime
// beginUnixTime = milliUnixTime - 1000*60
// firstTimePolling = false
// } else {
// // 判断时间差值是否小于10s如果小于则sleep0.5s后重新获取时间
// endUnixTime = time.Now().UnixMilli()
// if endUnixTime-beginUnixTime < 1000 {
// time.Sleep(time.Duration(500 * time.Millisecond))
// endUnixTime = time.Now().UnixMilli()
// }
// }
// pollingAPI := network.APIEndpoint{
// URL: paramConfig.APIURL,
// Method: paramConfig.APIMethod,
// QueryParams: map[string]string{
// "station": paramConfig.StationID,
// "component": paramConfig.ComponentID,
// "point": paramConfig.AnchorName,
// "begin": strconv.FormatInt(beginUnixTime, 10),
// "end": strconv.FormatInt(endUnixTime, 10),
// },
// }
// if !firstTimePolling {
// beginUnixTime = time.Now().UnixMilli()
// }
// valueSlice, err := network.PollAPIEndpoints(pollingAPI)
// if err != nil {
// logger.Error("polling real time data from dataRT service failed", zap.Error(err))
// continue
// }
// for _, value := range valueSlice {
// anchorName, err := diagram.GetAnchorValue(paramConfig.UUID)
// if err != nil {
// logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err))
// continue
// }
// if anchorName != paramConfig.AnchorName {
// logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", paramConfig.AnchorName))
// continue
// }
// upperLimitVal := paramConfig.CompareValUpperLimit
// lowerlimitVal := paramConfig.CompareValLowerLimit
// compareValue := paramConfig.CalculateFunc(value, paramConfig.CalculateParams...)
// if compareValue > upperLimitVal || compareValue < lowerlimitVal {
// // TODO 选择报警方式
// fmt.Println("log warning")
// }
// }
// }
}

View File

@ -11,7 +11,6 @@ import (
"modelRT/logger"
"modelRT/model"
"github.com/bitly/go-simplejson"
"go.uber.org/zap"
)
@ -41,37 +40,17 @@ var ParseFunc = func(parseConfig interface{}) {
return
}
var anchorName string
if anchoringV := unmarshalMap["anchor_v"].(bool); anchoringV {
anchorName = "voltage"
} else {
anchorName = "current"
}
diagram.StoreAnchorValue(modelParseConfig.ComponentInfo.ID, anchorName)
GetComponentChan(modelParseConfig.Context, modelParseConfig.ComponentInfo.ID)
uuid := modelParseConfig.ComponentInfo.GlobalUUID.String()
configsStr, exist := unmarshalMap["anchor_configs_list"]
if exist {
anchorConfigsJSON, err := simplejson.NewJson([]byte(configsStr.(string)))
if err != nil {
logger.Error("formmat anchor configs json failed", zap.Error(err))
return
}
paramsList := anchorConfigsJSON.Get("params_list").MustArray()
for index := range paramsList {
anchorName := anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustString()
// TODO 检查anchorParam 配置是否正确
anchorParam := config.AnchorParamConfig{
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
ComponentID: modelParseConfig.ComponentInfo.ID,
AnchorName: anchorName,
CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustFloat64(),
CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("lower_limit").MustFloat64(),
},
}
anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(modelParseConfig.ComponentInfo.ComponentType, anchorName, unmarshalMap)
diagram.StoreAnchorValue(modelParseConfig.ComponentInfo.GlobalUUID.String(), anchorName)
// TODO 改成启动diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig
// realtimedata.AnchorParamsChan <- anchorParam
}
}
unmarshalMap["id"] = modelParseConfig.ComponentInfo.ID
unmarshalMap["uuid"] = uuid
unmarshalMap["uuid"] = modelParseConfig.ComponentInfo.Tag

View File

@ -9,8 +9,8 @@ import (
"modelRT/diagram"
"modelRT/logger"
"modelRT/network"
"modelRT/pool"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
@ -22,7 +22,7 @@ func init() {
}
// ReceiveChan define func of real time data receive and process
func ReceiveChan(ctx context.Context, pool *ants.PoolWithFunc) {
func ReceiveChan(ctx context.Context) {
logger := logger.GetLoggerInstance()
for {
@ -79,7 +79,7 @@ func ReceiveChan(ctx context.Context, pool *ants.PoolWithFunc) {
CalculateFunc: calculateFunc,
CalculateParams: params,
}
diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig
pool.GetComponentChan(ctx, componentID) <- anchorConfig
default:
}
}