diff --git a/config/anchor_chan_config.go b/config/anchor_chan_config.go new file mode 100644 index 0000000..c565a39 --- /dev/null +++ b/config/anchor_chan_config.go @@ -0,0 +1,10 @@ +package config + +import "context" + +// AnchorChanConfig define anchor params channel config struct +type AnchorChanConfig struct { + Ctx context.Context // 结束 context + AnchorChan chan AnchorParamConfig // 锚定参量实时值传递通道 + ReadyChan chan struct{} // 就绪通知通道 +} diff --git a/config/anchor_param_config.go b/config/anchor_param_config.go index 6142837..989b870 100644 --- a/config/anchor_param_config.go +++ b/config/anchor_param_config.go @@ -15,12 +15,11 @@ type AnchorParamListConfig struct { // AnchorParamBaseConfig define anchor params base config struct type AnchorParamBaseConfig struct { - StationID string // component表 station_id - ComponentID string // component表 ID - UUID string // component表 global_uuid - AnchorName string // 锚定参量名称 - CompareValUpperLimit float64 // 比较值上限 - CompareValLowerLimit float64 // 比较值下限 + ComponentID int64 // component表 ID + AnchorName string // 锚定参量名称 + CompareValUpperLimit float64 // 比较值上限 + CompareValLowerLimit float64 // 比较值下限 + AnchorRealTimeData []float64 // 锚定参数实时值 } // AnchorParamConfig define anchor params config struct @@ -28,8 +27,6 @@ type AnchorParamConfig struct { AnchorParamBaseConfig CalculateFunc func(archorValue float64, args ...float64) float64 // 计算函数 CalculateParams []float64 // 计算参数 - APIURL string // API URL - APIMethod string // API Method } var baseVoltageFunc = func(archorValue float64, args ...float64) float64 { diff --git a/database/query_component.go b/database/query_component.go index 8ee154b..18471a9 100644 --- a/database/query_component.go +++ b/database/query_component.go @@ -3,14 +3,12 @@ package database import ( "context" - "fmt" - "strconv" "time" "modelRT/config" - "modelRT/diagram" "modelRT/orm" + "github.com/gofrs/uuid" "github.com/panjf2000/ants/v2" "go.uber.org/zap" "gorm.io/gorm" @@ -19,18 +17,18 @@ import ( // QueryCircuitDiagramComponentFromDB return the result of query circuit diagram component info order by page id from postgresDB func QueryCircuitDiagramComponentFromDB(ctx context.Context, tx *gorm.DB, pool *ants.PoolWithFunc, logger *zap.Logger) error { - var Components []orm.Component + var components []orm.Component // ctx超时判断 cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - // TODO 将 for update 操作放到事务中 - result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&Components) + + result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&components) if result.Error != nil { logger.Error("query circuit diagram component info failed", zap.Error(result.Error)) return result.Error } - for _, component := range Components { + for _, component := range components { pool.Invoke(config.ModelParseConfig{ ComponentInfo: component, Context: ctx, @@ -39,27 +37,16 @@ func QueryCircuitDiagramComponentFromDB(ctx context.Context, tx *gorm.DB, pool * return nil } -// QueryElectricalEquipmentUUID return the result of query electrical equipment uuid from postgresDB by circuit diagram id info -func QueryElectricalEquipmentUUID(ctx context.Context, diagramID int64, logger *zap.Logger) error { - var uuids []string +// QueryComponentByUUID return the result of query circuit diagram component info by uuid from postgresDB +func QueryComponentByUUID(ctx context.Context, tx *gorm.DB, uuid uuid.UUID) (orm.Component, error) { + var component orm.Component // ctx超时判断 cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - tableName := "circuit_diagram_" + strconv.FormatInt(diagramID, 10) - result := _globalPostgresClient.Table(tableName).WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Select("uuid").Find(&uuids) + + result := tx.WithContext(cancelCtx).Where("global_uuid = ? ", uuid).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&component) if result.Error != nil { - logger.Error("query circuit diagram overview info failed", zap.Error(result.Error)) - return result.Error + return orm.Component{}, result.Error } - - for _, uuid := range uuids { - diagramParamsMap, err := diagram.GetComponentMap(uuid) - if err != nil { - logger.Error("get electrical circuit diagram overview info failed", zap.Error(result.Error)) - return result.Error - } - fmt.Println(diagramParamsMap, err) - } - - return nil + return component, nil } diff --git a/diagram/component_chan_set.go b/diagram/component_chan_set.go new file mode 100644 index 0000000..775d186 --- /dev/null +++ b/diagram/component_chan_set.go @@ -0,0 +1,56 @@ +package diagram + +import ( + "context" + "sync" + + "modelRT/config" + + "github.com/panjf2000/ants/v2" +) + +func init() { + globalComponentChanSet = &ComponentChanSet{ + AnchorChans: make([]chan config.AnchorParamConfig, 100), + } +} + +var globalComponentChanSet *ComponentChanSet + +// ComponentChanSet defines component anchor real time data process channel set +type ComponentChanSet struct { + sync.RWMutex + AnchorChans []chan config.AnchorParamConfig +} + +func GetComponentChan(ctx context.Context, componentID int64, pool *ants.PoolWithFunc) chan config.AnchorParamConfig { + globalComponentChanSet.RLock() + componentChan := globalComponentChanSet.AnchorChans[componentID] + if componentChan == nil { + globalComponentChanSet.RUnlock() + globalComponentChanSet.Lock() + defer globalComponentChanSet.Unlock() + return CreateComponentChan(ctx, componentID, pool) + } + globalComponentChanSet.RUnlock() + return componentChan +} + +func CreateComponentChan(ctx context.Context, componentID int64, pool *ants.PoolWithFunc) chan config.AnchorParamConfig { + componentChan := globalComponentChanSet.AnchorChans[componentID] + if componentChan == nil { + componentChan = make(chan config.AnchorParamConfig, 100) + globalComponentChanSet.AnchorChans[componentID] = componentChan + + readyChan := make(chan struct{}) + chanConfig := config.AnchorChanConfig{ + Ctx: ctx, + AnchorChan: componentChan, + ReadyChan: readyChan, + } + pool.Invoke(chanConfig) + <-readyChan + return componentChan + } + return componentChan +} diff --git a/diagram/component_set.go b/diagram/component_set.go index 9f753a5..adf497f 100644 --- a/diagram/component_set.go +++ b/diagram/component_set.go @@ -9,11 +9,11 @@ import ( // diagramsOverview define struct of storage all circuit diagram data var diagramsOverview sync.Map -// GetComponentMap define func of get circuit diagram data by global uuid -func GetComponentMap(uuid string) (map[string]interface{}, error) { - value, ok := diagramsOverview.Load(uuid) +// GetComponentMap define func of get circuit diagram data by component id +func GetComponentMap(componentID int64) (map[string]interface{}, error) { + value, ok := diagramsOverview.Load(componentID) if !ok { - return nil, fmt.Errorf("can not find graph by global uuid:%s", uuid) + return nil, fmt.Errorf("can not find graph by global uuid:%d", componentID) } paramsMap, ok := value.(map[string]interface{}) if !ok { @@ -22,20 +22,20 @@ func GetComponentMap(uuid string) (map[string]interface{}, error) { return paramsMap, nil } -// UpdateComponentMap define func of update circuit diagram data by global uuid and component info -func UpdateComponentMap(uuid string, componentInfo map[string]interface{}) bool { - _, result := diagramsOverview.Swap(uuid, componentInfo) +// UpdateComponentMap define func of update circuit diagram data by component id and component info +func UpdateComponentMap(componentID int64, componentInfo map[string]interface{}) bool { + _, result := diagramsOverview.Swap(componentID, componentInfo) return result } -// StoreComponentMap define func of store circuit diagram data with global uuid and component info -func StoreComponentMap(uuid string, componentInfo map[string]interface{}) { - diagramsOverview.Store(uuid, componentInfo) +// StoreComponentMap define func of store circuit diagram data with component id and component info +func StoreComponentMap(componentID int64, componentInfo map[string]interface{}) { + diagramsOverview.Store(componentID, componentInfo) return } -// DeleteComponentMap define func of delete circuit diagram data with global uuid -func DeleteComponentMap(uuid string) { - diagramsOverview.Delete(uuid) +// DeleteComponentMap define func of delete circuit diagram data with component id +func DeleteComponentMap(componentID int64) { + diagramsOverview.Delete(componentID) return } diff --git a/handler/anchor_point_replace.go b/handler/anchor_point_replace.go index 9c92b21..34c0c90 100644 --- a/handler/anchor_point_replace.go +++ b/handler/anchor_point_replace.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "net/http" - "strconv" "time" "modelRT/config" @@ -17,8 +16,6 @@ import ( "modelRT/network" "modelRT/orm" - realtimedata "modelRT/real-time-data" - "github.com/bitly/go-simplejson" "github.com/gin-gonic/gin" "go.uber.org/zap" @@ -139,18 +136,18 @@ func ComponentAnchorReplaceHandler(c *gin.Context) { diagram.UpdateAnchorValue(uuid, anchorName) + // TODO 检查 anchorParam 配置 anchorParam := config.AnchorParamConfig{ AnchorParamBaseConfig: config.AnchorParamBaseConfig{ - StationID: componentInfo.StationID, - ComponentID: strconv.FormatInt(componentInfo.ID, 10), - UUID: uuid, + ComponentID: componentInfo.ID, AnchorName: anchorName, CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("upper_limit").MustFloat64(), CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("lower_limit").MustFloat64(), }, } anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(componentInfo.ComponentType, anchorName, unmarshalMap) - realtimedata.AnchorParamsChan <- anchorParam + // TODO 改成启动diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig + // realtimedata.AnchorParamsChan <- anchorParam resp := network.SuccessResponse{ SuccessResponseHeader: network.SuccessResponseHeader{Status: http.StatusOK}, diff --git a/handler/circuit_diagram_create.go b/handler/circuit_diagram_create.go index 40edce7..9a06ac3 100644 --- a/handler/circuit_diagram_create.go +++ b/handler/circuit_diagram_create.go @@ -100,7 +100,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) { graph.AddEdge(topologicCreateInfo.UUIDFrom, topologicCreateInfo.UUIDTo) } - for _, componentInfo := range request.ComponentInfos { + for index, componentInfo := range request.ComponentInfos { componentID, err := database.CreateComponentIntoDB(c, tx, componentInfo) if err != nil { tx.Rollback() @@ -116,6 +116,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) { c.JSON(http.StatusOK, resp) return } + request.ComponentInfos[index].ID = componentID err = database.CreateModelIntoDB(c, tx, componentID, componentInfo.ComponentType, componentInfo.Params) if err != nil { @@ -169,7 +170,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) { c.JSON(http.StatusOK, resp) return } - diagram.StoreComponentMap(componentInfo.UUID, componentMap) + diagram.StoreComponentMap(componentInfo.ID, componentMap) } if len(request.FreeVertexs) > 0 { diff --git a/handler/circuit_diagram_delete.go b/handler/circuit_diagram_delete.go index 69993dc..504adfc 100644 --- a/handler/circuit_diagram_delete.go +++ b/handler/circuit_diagram_delete.go @@ -210,7 +210,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) { c.JSON(http.StatusOK, resp) return } - diagram.DeleteComponentMap(componentInfo.UUID) + diagram.DeleteComponentMap(component.ID) } if len(request.FreeVertexs) > 0 { diff --git a/handler/circuit_diagram_load.go b/handler/circuit_diagram_load.go index 9deb1da..e29d432 100644 --- a/handler/circuit_diagram_load.go +++ b/handler/circuit_diagram_load.go @@ -5,6 +5,7 @@ import ( "net/http" "strconv" + "modelRT/database" "modelRT/diagram" "modelRT/logger" "modelRT/network" @@ -25,6 +26,8 @@ import ( // @Router /model/diagram_load/{page_id} [get] func CircuitDiagramLoadHandler(c *gin.Context) { logger := logger.GetLoggerInstance() + pgClient := database.GetPostgresDBClient() + pageID, err := strconv.ParseInt(c.Query("page_id"), 10, 64) if err != nil { logger.Error("get pageID from url param failed", zap.Error(err)) @@ -59,26 +62,55 @@ func CircuitDiagramLoadHandler(c *gin.Context) { componentParamMap := make(map[string]any) for _, VerticeLink := range topologicInfo.VerticeLinks { for _, componentUUID := range VerticeLink { - UUIDStr := componentUUID.String() - componentParams, err := diagram.GetComponentMap(UUIDStr) + component, err := database.QueryComponentByUUID(c, pgClient, componentUUID) + if err != nil { + logger.Error("get component id info from DB by uuid failed", zap.Error(err)) + + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()} + resp := network.FailureResponse{ + FailResponseHeader: header, + PayLoad: map[string]interface{}{ + "uuid": componentUUID, + }, + } + c.JSON(http.StatusOK, resp) + return + } + + componentParams, err := diagram.GetComponentMap(component.ID) if err != nil { logger.Error("get component data from set by uuid failed", zap.Error(err)) header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()} resp := network.FailureResponse{ FailResponseHeader: header, PayLoad: map[string]interface{}{ - "uuid": UUIDStr, + "uuid": componentUUID, }, } c.JSON(http.StatusOK, resp) return } - componentParamMap[UUIDStr] = componentParams + componentParamMap[componentUUID.String()] = componentParams } } rootVertexUUID := topologicInfo.RootVertex.String() - rootComponentParam, err := diagram.GetComponentMap(rootVertexUUID) + rootComponent, err := database.QueryComponentByUUID(c, pgClient, topologicInfo.RootVertex) + if err != nil { + logger.Error("get component id info from DB by uuid failed", zap.Error(err)) + + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()} + resp := network.FailureResponse{ + FailResponseHeader: header, + PayLoad: map[string]interface{}{ + "uuid": topologicInfo.RootVertex, + }, + } + c.JSON(http.StatusOK, resp) + return + } + + rootComponentParam, err := diagram.GetComponentMap(rootComponent.ID) if err != nil { logger.Error("get component data from set by uuid failed", zap.Error(err)) header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()} diff --git a/handler/circuit_diagram_update.go b/handler/circuit_diagram_update.go index 90e85fb..60e0804 100644 --- a/handler/circuit_diagram_update.go +++ b/handler/circuit_diagram_update.go @@ -99,7 +99,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { } } - for _, componentInfo := range request.ComponentInfos { + for index, componentInfo := range request.ComponentInfos { componentID, err := database.UpdateComponentIntoDB(c, tx, componentInfo) if err != nil { logger.Error("udpate component info into DB failed", zap.Error(err)) @@ -115,6 +115,8 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { return } + request.ComponentInfos[index].ID = componentID + err = database.UpdateModelIntoDB(c, tx, componentID, componentInfo.ComponentType, componentInfo.Params) if err != nil { logger.Error("udpate component model info into DB failed", zap.Error(err)) @@ -161,7 +163,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) { c.JSON(http.StatusOK, resp) return } - diagram.UpdateComponentMap(componentInfo.UUID, componentMap) + diagram.UpdateComponentMap(componentInfo.ID, componentMap) } if len(request.FreeVertexs) > 0 { diff --git a/main.go b/main.go index 9afd52e..351d8ac 100644 --- a/main.go +++ b/main.go @@ -72,21 +72,20 @@ func main() { } defer parsePool.Release() - // init data polling ants pool - pollingPool, err := ants.NewPoolWithFunc(modelRTConfig.PollingConcurrentQuantity, pool.AnchorFunc) + // init anchor param ants pool + // TODO 修改PollingConcurrentQuantity 名称 + anchorPool, err := ants.NewPoolWithFunc(modelRTConfig.PollingConcurrentQuantity, pool.AnchorFunc) if err != nil { zapLogger.Error("init concurrent data polling task pool failed", zap.Error(err)) panic(err) } - defer pollingPool.Release() + defer anchorPool.Release() // init cancel context cancelCtx, cancel := context.WithCancel(ctx) defer cancel() - // init anchor channel - go realtimedata.AnchorParamChangeChan(cancelCtx, pollingPool) // init real time data receive channel - go realtimedata.ReceiveChan(cancelCtx) + go realtimedata.ReceiveChan(cancelCtx, anchorPool) postgresDBClient.Transaction(func(tx *gorm.DB) error { // load circuit diagram from postgres diff --git a/network/circuit_diagram_create_request.go b/network/circuit_diagram_create_request.go index aebe0cf..fdbabb4 100644 --- a/network/circuit_diagram_create_request.go +++ b/network/circuit_diagram_create_request.go @@ -21,6 +21,7 @@ type TopologicUUIDCreateInfo struct { // ComponentCreateInfo defines circuit diagram component create index info type ComponentCreateInfo struct { + ID int64 `json:"id"` UUID string `json:"uuid"` Name string `json:"name"` Context string `json:"context"` diff --git a/network/circuit_diagram_update_request.go b/network/circuit_diagram_update_request.go index 934f3bf..2b3e3ba 100644 --- a/network/circuit_diagram_update_request.go +++ b/network/circuit_diagram_update_request.go @@ -33,6 +33,7 @@ type TopologicUUIDChangeInfos struct { // ComponentUpdateInfo defines circuit diagram component params info type ComponentUpdateInfo struct { + ID int64 `json:"id"` UUID string `json:"uuid"` Name string `json:"name"` Context string `json:"context"` diff --git a/pool/concurrency_anchor_parse.go b/pool/concurrency_anchor_parse.go index 3766048..a4cb67c 100644 --- a/pool/concurrency_anchor_parse.go +++ b/pool/concurrency_anchor_parse.go @@ -3,84 +3,102 @@ package pool import ( "fmt" - "strconv" - "time" "modelRT/config" - "modelRT/diagram" "modelRT/logger" - "modelRT/network" - - "go.uber.org/zap" ) -var AnchorFunc = func(anchorConfig interface{}) { +// AnchorFunc defines func that process the real time data of component anchor params +var AnchorFunc = func(anchorChan interface{}) { + var firstStart bool logger := logger.GetLoggerInstance() - var firstTimePolling bool - paramConfig, ok := anchorConfig.(config.AnchorParamConfig) + fmt.Println(logger) + fmt.Println(anchorChan) + anchorChanConfig, ok := anchorChan.(config.AnchorChanConfig) if !ok { - logger.Error("conversion model anchor param config type failed") + logger.Error("conversion component anchor chan type failed") return } - for { - var beginUnixTime, endUnixTime int64 - if firstTimePolling { - milliUnixTime := time.Now().UnixMilli() - endUnixTime = milliUnixTime - beginUnixTime = milliUnixTime - 1000*60 - firstTimePolling = false - } else { - // 判断时间差值是否小于10s,如果小于则sleep0.5s后重新获取时间 - endUnixTime = time.Now().UnixMilli() - if endUnixTime-beginUnixTime < 1000 { - time.Sleep(time.Duration(500 * time.Millisecond)) - endUnixTime = time.Now().UnixMilli() - } - } - - pollingAPI := network.APIEndpoint{ - URL: paramConfig.APIURL, - Method: paramConfig.APIMethod, - QueryParams: map[string]string{ - "station": paramConfig.StationID, - "component": paramConfig.ComponentID, - "point": paramConfig.AnchorName, - "begin": strconv.FormatInt(beginUnixTime, 10), - "end": strconv.FormatInt(endUnixTime, 10), - }, - } - - if !firstTimePolling { - beginUnixTime = time.Now().UnixMilli() - } - - valueSlice, err := network.PollAPIEndpoints(pollingAPI) - if err != nil { - logger.Error("polling real time data from dataRT service failed", zap.Error(err)) - continue - } - - for _, value := range valueSlice { - anchorName, err := diagram.GetAnchorValue(paramConfig.UUID) - if err != nil { - logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err)) - continue - } - - if anchorName != paramConfig.AnchorName { - logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", paramConfig.AnchorName)) - continue - } - - upperLimitVal := paramConfig.CompareValUpperLimit - lowerlimitVal := paramConfig.CompareValLowerLimit - compareValue := paramConfig.CalculateFunc(value, paramConfig.CalculateParams...) - if compareValue > upperLimitVal || compareValue < lowerlimitVal { - // TODO 选择报警方式 - fmt.Println("log warning") + select { + case <-anchorChanConfig.Ctx.Done(): + return + case anchorParaConfig := <-anchorChanConfig.AnchorChan: + if firstStart { + close(anchorChanConfig.ReadyChan) + firstStart = false } + // TODO 重写anchorParaConfig 处理逻辑 + fmt.Println(anchorParaConfig) + default: } } + // var firstTimePolling bool + + // paramConfig, ok := anchorConfig.(config.AnchorParamConfig) + // if !ok { + // logger.Error("conversion model anchor param config type failed") + // return + // } + + // for { + // var beginUnixTime, endUnixTime int64 + // if firstTimePolling { + // milliUnixTime := time.Now().UnixMilli() + // endUnixTime = milliUnixTime + // beginUnixTime = milliUnixTime - 1000*60 + // firstTimePolling = false + // } else { + // // 判断时间差值是否小于10s,如果小于则sleep0.5s后重新获取时间 + // endUnixTime = time.Now().UnixMilli() + // if endUnixTime-beginUnixTime < 1000 { + // time.Sleep(time.Duration(500 * time.Millisecond)) + // endUnixTime = time.Now().UnixMilli() + // } + // } + + // pollingAPI := network.APIEndpoint{ + // URL: paramConfig.APIURL, + // Method: paramConfig.APIMethod, + // QueryParams: map[string]string{ + // "station": paramConfig.StationID, + // "component": paramConfig.ComponentID, + // "point": paramConfig.AnchorName, + // "begin": strconv.FormatInt(beginUnixTime, 10), + // "end": strconv.FormatInt(endUnixTime, 10), + // }, + // } + + // if !firstTimePolling { + // beginUnixTime = time.Now().UnixMilli() + // } + + // valueSlice, err := network.PollAPIEndpoints(pollingAPI) + // if err != nil { + // logger.Error("polling real time data from dataRT service failed", zap.Error(err)) + // continue + // } + + // for _, value := range valueSlice { + // anchorName, err := diagram.GetAnchorValue(paramConfig.UUID) + // if err != nil { + // logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err)) + // continue + // } + + // if anchorName != paramConfig.AnchorName { + // logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", paramConfig.AnchorName)) + // continue + // } + + // upperLimitVal := paramConfig.CompareValUpperLimit + // lowerlimitVal := paramConfig.CompareValLowerLimit + // compareValue := paramConfig.CalculateFunc(value, paramConfig.CalculateParams...) + // if compareValue > upperLimitVal || compareValue < lowerlimitVal { + // // TODO 选择报警方式 + // fmt.Println("log warning") + // } + // } + // } } diff --git a/pool/concurrency_model_parse.go b/pool/concurrency_model_parse.go index 81e2893..19914ae 100644 --- a/pool/concurrency_model_parse.go +++ b/pool/concurrency_model_parse.go @@ -3,7 +3,6 @@ package pool import ( "context" - "strconv" "time" "modelRT/config" @@ -11,7 +10,6 @@ import ( "modelRT/diagram" "modelRT/logger" "modelRT/model" - realtimedata "modelRT/real-time-data" "github.com/bitly/go-simplejson" "go.uber.org/zap" @@ -57,11 +55,11 @@ var ParseFunc = func(parseConfig interface{}) { for index := range paramsList { anchorName := anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustString() + // TODO 检查anchorParam 配置是否正确 anchorParam := config.AnchorParamConfig{ AnchorParamBaseConfig: config.AnchorParamBaseConfig{ - StationID: modelParseConfig.ComponentInfo.StationID, - ComponentID: strconv.FormatInt(modelParseConfig.ComponentInfo.ID, 10), - UUID: uuid, + ComponentID: modelParseConfig.ComponentInfo.ID, + AnchorName: anchorName, CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustFloat64(), CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("lower_limit").MustFloat64(), @@ -69,7 +67,8 @@ var ParseFunc = func(parseConfig interface{}) { } anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(modelParseConfig.ComponentInfo.ComponentType, anchorName, unmarshalMap) diagram.StoreAnchorValue(modelParseConfig.ComponentInfo.GlobalUUID.String(), anchorName) - realtimedata.AnchorParamsChan <- anchorParam + // TODO 改成启动diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig + // realtimedata.AnchorParamsChan <- anchorParam } } @@ -86,6 +85,6 @@ var ParseFunc = func(parseConfig interface{}) { unmarshalMap["op"] = modelParseConfig.ComponentInfo.Op unmarshalMap["ts"] = modelParseConfig.ComponentInfo.Ts - diagram.StoreComponentMap(uuid, unmarshalMap) + diagram.StoreComponentMap(modelParseConfig.ComponentInfo.ID, unmarshalMap) return } diff --git a/real-time-data/anchor_param.go b/real-time-data/anchor_param.go deleted file mode 100644 index db149fa..0000000 --- a/real-time-data/anchor_param.go +++ /dev/null @@ -1,30 +0,0 @@ -// Package realtimedata define real time data operation functions -package realtimedata - -import ( - "context" - - "modelRT/config" - - "github.com/panjf2000/ants/v2" -) - -// AnchorParamsChan define channel of component anchor param change -var AnchorParamsChan chan config.AnchorParamConfig - -func init() { - AnchorParamsChan = make(chan config.AnchorParamConfig, 100) -} - -// AnchorParamChangeChan define func of component anchor param change notification process -func AnchorParamChangeChan(ctx context.Context, pool *ants.PoolWithFunc) { - for { - select { - case <-ctx.Done(): - return - case anchorParam := <-AnchorParamsChan: - pool.Invoke(anchorParam) - default: - } - } -} diff --git a/real-time-data/real_time_data_receive.go b/real-time-data/real_time_data_receive.go index 28abf78..3ceb033 100644 --- a/real-time-data/real_time_data_receive.go +++ b/real-time-data/real_time_data_receive.go @@ -3,9 +3,15 @@ package realtimedata import ( "context" - "fmt" + "modelRT/config" + "modelRT/constant" + "modelRT/diagram" + "modelRT/logger" "modelRT/network" + + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" ) // RealTimeDataChan define channel of real time data receive @@ -16,13 +22,64 @@ func init() { } // ReceiveChan define func of real time data receive and process -func ReceiveChan(ctx context.Context) { +func ReceiveChan(ctx context.Context, pool *ants.PoolWithFunc) { + logger := logger.GetLoggerInstance() + for { select { case <-ctx.Done(): return case realTimeData := <-RealTimeDataChan: - fmt.Println(realTimeData.PayLoad.ComponentID) + // TODO 根据 componentID 缓存到来的实时数据 + componentID := realTimeData.PayLoad.ComponentID + component, err := diagram.GetComponentMap(componentID) + if err != nil { + logger.Error("query component info from diagram map by componet id failed", zap.Int64("component_id", componentID), zap.Error(err)) + continue + } + + componentType := component["component_type"].(int) + if componentType != constant.DemoType { + logger.Error("can not process real time data of component type not equal DemoType", zap.Int64("component_id", componentID)) + continue + } + + var anchorName string + var compareValUpperLimit, compareValLowerLimit float64 + var anchorRealTimeData []float64 + var calculateFunc func(archorValue float64, args ...float64) float64 + if anchoringV := component["anchor_v"].(bool); anchoringV { + anchorName = "voltage" + compareValUpperLimit = component["uv_alarm"].(float64) + compareValLowerLimit = component["ov_alarm"].(float64) + } else { + anchorName = "current" + compareValUpperLimit = component["ui_alarm"].(float64) + compareValLowerLimit = component["oi_alarm"].(float64) + } + + componentData := map[string]interface{}{ + "resistance": component["resistance"].(float64), + } + + calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData) + + for _, param := range realTimeData.PayLoad.Values { + anchorRealTimeData = append(anchorRealTimeData, param.Value) + } + + anchorConfig := config.AnchorParamConfig{ + AnchorParamBaseConfig: config.AnchorParamBaseConfig{ + ComponentID: componentID, + AnchorName: anchorName, + CompareValUpperLimit: compareValUpperLimit, + CompareValLowerLimit: compareValLowerLimit, + AnchorRealTimeData: anchorRealTimeData, + }, + CalculateFunc: calculateFunc, + CalculateParams: params, + } + diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig default: } }