From 5a9fa5cc4db78ce7d0ce4b6a0d197e1546b8e25a Mon Sep 17 00:00:00 2001 From: douxu Date: Thu, 26 Dec 2024 15:03:20 +0800 Subject: [PATCH] write code for polling real time data from dataRT service --- config/anchor_param_config.go | 34 ++++++++------- config/config.go | 18 ++++++-- config/config.yaml | 8 ++++ handler/anchor_point_replace.go | 7 ++- main.go | 3 +- network/api_endpoint.go | 34 +++++++++------ pool/concurrency_anchor_parse.go | 73 +++++++++++++++++++++++++------- pool/concurrency_model_parse.go | 11 +++-- 8 files changed, 133 insertions(+), 55 deletions(-) diff --git a/config/anchor_param_config.go b/config/anchor_param_config.go index f32a4f6..6142837 100644 --- a/config/anchor_param_config.go +++ b/config/anchor_param_config.go @@ -9,47 +9,51 @@ import ( type AnchorParamListConfig struct { AnchorName string FuncType string // 函数类型 - UpperLimit float32 // 比较值上限 - LowerLimit float32 // 比较值下限 + UpperLimit float64 // 比较值上限 + LowerLimit float64 // 比较值下限 } // AnchorParamBaseConfig define anchor params base config struct type AnchorParamBaseConfig struct { - UUID string + StationID string // component表 station_id + ComponentID string // component表 ID + UUID string // component表 global_uuid AnchorName string // 锚定参量名称 - CompareValUpperLimit float32 // 比较值上限 - CompareValLowerLimit float32 // 比较值下限 + CompareValUpperLimit float64 // 比较值上限 + CompareValLowerLimit float64 // 比较值下限 } // AnchorParamConfig define anchor params config struct type AnchorParamConfig struct { AnchorParamBaseConfig - CalculateFunc func(archorValue float32, args ...float32) float32 // 计算函数 - CalculateParams []float32 // 计算参数 + CalculateFunc func(archorValue float64, args ...float64) float64 // 计算函数 + CalculateParams []float64 // 计算参数 + APIURL string // API URL + APIMethod string // API Method } -var baseVoltageFunc = func(archorValue float32, args ...float32) float32 { +var baseVoltageFunc = func(archorValue float64, args ...float64) float64 { voltage := archorValue resistance := args[1] return voltage / resistance } -var baseCurrentFunc = func(archorValue float32, args ...float32) float32 { +var baseCurrentFunc = func(archorValue float64, args ...float64) float64 { current := archorValue resistance := args[1] return current * resistance } // SelectAnchorCalculateFuncAndParams define select anchor func and anchor calculate value by component type 、 anchor name and component data -func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float32, args ...float32) float32, []float32) { +func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float64, args ...float64) float64, []float64) { if componentType == constant.DemoType { if anchorName == "voltage" { - resistance := componentData["resistance"].(float32) - return baseVoltageFunc, []float32{resistance} + resistance := componentData["resistance"].(float64) + return baseVoltageFunc, []float64{resistance} } else if anchorName == "current" { - resistance := componentData["resistance"].(float32) - return baseCurrentFunc, []float32{resistance} + resistance := componentData["resistance"].(float64) + return baseCurrentFunc, []float64{resistance} } } - return nil, []float32{} + return nil, []float64{} } diff --git a/config/config.go b/config/config.go index 0fe55a9..7fcf590 100644 --- a/config/config.go +++ b/config/config.go @@ -45,7 +45,16 @@ type LoggerConfig struct { // AntsConfig define config stuct of ants pool config type AntsConfig struct { - ParseConcurrentQuantity int `mapstructure:"parse_concurrent_quantity"` // parse comtrade file concurrent quantity + ParseConcurrentQuantity int `mapstructure:"parse_concurrent_quantity"` // parse comtrade file concurrent quantity + PollingConcurrentQuantity int `mapstructure:"polling_concurrent_quantity"` // polling real time data concurrent quantity +} + +// DataRTConfig define config stuct of data runtime server api config +type DataRTConfig struct { + Host string `mapstructure:"host"` + Port int64 `mapstructure:"port"` + PollingAPI string `mapstructure:"polling_api"` + Method string `mapstructure:"polling_api_method"` } // ModelRTConfig define config stuct of model runtime server @@ -55,6 +64,7 @@ type ModelRTConfig struct { KafkaConfig `mapstructure:"kafka"` LoggerConfig `mapstructure:"logger"` AntsConfig `mapstructure:"ants"` + DataRTConfig `mapstructure:"dataRT"` PostgresDBURI string `mapstructure:"-"` } @@ -71,12 +81,12 @@ func ReadAndInitConfig(configDir, configName, configType string) (modelRTConfig panic(err) } - rtConfig := ModelRTConfig{} - if err := config.Unmarshal(&rtConfig); err != nil { + modelRTconf := ModelRTConfig{} + if err := config.Unmarshal(&modelRTConfig); err != nil { panic(fmt.Sprintf("unmarshal modelRT config failed:%s\n", err.Error())) } - modelRTConfig.PostgresDBURI = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", rtConfig.Host, rtConfig.Port, rtConfig.User, rtConfig.Password, rtConfig.DataBase) + modelRTConfig.PostgresDBURI = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", modelRTconf.PostgresConfig.Host, modelRTconf.PostgresConfig.Port, modelRTconf.PostgresConfig.User, modelRTconf.PostgresConfig.Password, modelRTconf.PostgresConfig.DataBase) return modelRTConfig } diff --git a/config/config.yaml b/config/config.yaml index a804833..7f77f0d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -33,9 +33,17 @@ logger: # ants config ants: parse_concurrent_quantity: 10 + polling_concurrent_quantity: 10 # modelRT base config base: grid_id: 1 zone_id: 1 station_id: 1 + +# dataRT api config +dataRT: + host: "http://127.0.0.1" + port: 8888 + polling_api: "datart/getPointData" + polling_api_method: "GET" diff --git a/handler/anchor_point_replace.go b/handler/anchor_point_replace.go index ccd92ad..f113d05 100644 --- a/handler/anchor_point_replace.go +++ b/handler/anchor_point_replace.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "strconv" "time" "modelRT/config" @@ -139,10 +140,12 @@ func ComponentAnchorReplaceHandler(c *gin.Context) { anchorParam := config.AnchorParamConfig{ AnchorParamBaseConfig: config.AnchorParamBaseConfig{ + StationID: strconv.FormatInt(componentInfo.StationID, 10), + ComponentID: strconv.FormatInt(componentInfo.ID, 10), UUID: uuid, AnchorName: anchorName, - CompareValUpperLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("upper_limit").MustFloat64()), - CompareValLowerLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("lower_limit").MustFloat64()), + CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("upper_limit").MustFloat64(), + CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("lower_limit").MustFloat64(), }, } anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(componentInfo.ComponentType, anchorName, unmarshalMap) diff --git a/main.go b/main.go index 3f5aaef..5851833 100644 --- a/main.go +++ b/main.go @@ -72,8 +72,7 @@ func main() { defer parsePool.Release() // init data polling ants pool - // TODO 优化轮询池初始化参数定义方式,改为从 config 中获取 - pollingPool, err := ants.NewPoolWithFunc(1000, pool.AnchorFunc) + pollingPool, err := ants.NewPoolWithFunc(modelRTConfig.PollingConcurrentQuantity, pool.AnchorFunc) if err != nil { zapLogger.Error("init concurrent data polling task pool failed", zap.Error(err)) panic(err) diff --git a/network/api_endpoint.go b/network/api_endpoint.go index dd7ba78..6f15665 100644 --- a/network/api_endpoint.go +++ b/network/api_endpoint.go @@ -6,11 +6,8 @@ import ( "io" "net/http" "strings" - "time" - "modelRT/logger" - - "go.uber.org/zap" + "github.com/bitly/go-simplejson" ) // APIEndpoint defines an api endpoint struct to poll data from dataRT service @@ -61,18 +58,27 @@ func fetchAPI(endpoint APIEndpoint) (string, error) { return string(body), nil } -// pollAPIEndpoints defines unmarshal polling data from http request -func pollAPIEndpoints(endpoint APIEndpoint) { - logger := logger.GetLoggerInstance() +// PollAPIEndpoints defines unmarshal polling data from http request +func PollAPIEndpoints(endpoint APIEndpoint) ([]float64, error) { + var valueSlice []float64 respStr, err := fetchAPI(endpoint) if err != nil { - logger.Error("unmarshal component anchor point replace info failed", zap.Error(err)) - return + return valueSlice, fmt.Errorf("fetch api failed:%w", err) } - fmt.Println(respStr) - time.Sleep(time.Duration(endpoint.Interval) * time.Second) - // 注意:这里使用了 endpoint.Interval 而不是传入的 interval, - // 但为了示例简单,我们统一使用传入的 interval。 - // 如果要根据每个端点的不同间隔来轮询,应该使用 endpoint.Interval。 + realDataJSON, err := simplejson.NewJson([]byte(respStr)) + if err != nil { + return valueSlice, fmt.Errorf("format real time data failed:%w", err) + } + simplejson.New().UnmarshalJSON([]byte(respStr)) + code := realDataJSON.Get("code").MustInt() + if code != 0 { + return valueSlice, fmt.Errorf("polling data api status error:%s", realDataJSON.Get("msg").MustString()) + } + + dataLen := len(realDataJSON.Get("data").MustArray()) + for i := 0; i < dataLen; i++ { + valueSlice = append(valueSlice, realDataJSON.Get("data").GetIndex(i).Get("value").MustFloat64()) + } + return valueSlice, nil } diff --git a/pool/concurrency_anchor_parse.go b/pool/concurrency_anchor_parse.go index 5a3792c..949b355 100644 --- a/pool/concurrency_anchor_parse.go +++ b/pool/concurrency_anchor_parse.go @@ -3,41 +3,84 @@ package pool import ( "fmt" + "strconv" + "time" "modelRT/config" "modelRT/diagram" + "modelRT/logger" + "modelRT/network" "go.uber.org/zap" ) var AnchorFunc = func(anchorConfig interface{}) { - logger := zap.L() + logger := logger.GetLoggerInstance() + var firstTimePolling bool paramConfig, ok := anchorConfig.(config.AnchorParamConfig) if !ok { logger.Error("conversion model anchor param config type failed") return } - calculateFunc := paramConfig.CalculateFunc + for { - // TODO 通过 api 循环获取 vlaue 实时值 - var value float32 - anchorName, err := diagram.GetAnchorValue(paramConfig.UUID) + var beginUnixTime, endUnixTime int64 + if firstTimePolling { + milliUnixTime := time.Now().UnixMilli() + endUnixTime = milliUnixTime + beginUnixTime = milliUnixTime - 1000*60 + firstTimePolling = false + } else { + // 判断时间差值是否小于10s,如果小于则重新获取时间 + endUnixTime = time.Now().UnixMilli() + if endUnixTime-beginUnixTime < 1000*10 { + time.Sleep(time.Duration(1 * time.Second)) + endUnixTime = time.Now().UnixMilli() + } + } + + pollingAPI := network.APIEndpoint{ + URL: paramConfig.APIURL, + Method: paramConfig.APIMethod, + QueryParams: map[string]string{ + "station": paramConfig.StationID, + "component": paramConfig.ComponentID, + "point": paramConfig.AnchorName, + "begin": strconv.FormatInt(beginUnixTime, 10), + "end": strconv.FormatInt(endUnixTime, 10), + }, + } + + if !firstTimePolling { + beginUnixTime = time.Now().UnixMilli() + } + + valueSlice, err := network.PollAPIEndpoints(pollingAPI) if err != nil { - logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err)) + logger.Error("polling real time data from dataRT service failed", zap.Error(err)) continue } - if anchorName != paramConfig.AnchorName { - logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", paramConfig.AnchorName)) - continue - } + for _, value := range valueSlice { + anchorName, err := diagram.GetAnchorValue(paramConfig.UUID) + if err != nil { + logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err)) + continue + } - upperLimitVal := paramConfig.CompareValUpperLimit - lowerlimitVal := paramConfig.CompareValLowerLimit - compareValue := calculateFunc(value, paramConfig.CalculateParams...) - if compareValue > upperLimitVal || compareValue < lowerlimitVal { - fmt.Println("log warning") + if anchorName != paramConfig.AnchorName { + logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", paramConfig.AnchorName)) + continue + } + + upperLimitVal := paramConfig.CompareValUpperLimit + lowerlimitVal := paramConfig.CompareValLowerLimit + compareValue := paramConfig.CalculateFunc(value, paramConfig.CalculateParams...) + if compareValue > upperLimitVal || compareValue < lowerlimitVal { + // TODO 选择报警方式 + fmt.Println("log warning") + } } } } diff --git a/pool/concurrency_model_parse.go b/pool/concurrency_model_parse.go index 4eddbd1..49b9e32 100644 --- a/pool/concurrency_model_parse.go +++ b/pool/concurrency_model_parse.go @@ -3,11 +3,13 @@ package pool import ( "context" + "strconv" "time" "modelRT/config" "modelRT/database" "modelRT/diagram" + "modelRT/logger" "modelRT/model" realtimedata "modelRT/real-time-data" @@ -15,8 +17,9 @@ import ( "go.uber.org/zap" ) +// ParseFunc defines func that parses the model data from postgres var ParseFunc = func(parseConfig interface{}) { - logger := zap.L() + logger := logger.GetLoggerInstance() modelParseConfig, ok := parseConfig.(config.ModelParseConfig) if !ok { @@ -56,10 +59,12 @@ var ParseFunc = func(parseConfig interface{}) { anchorParam := config.AnchorParamConfig{ AnchorParamBaseConfig: config.AnchorParamBaseConfig{ + StationID: strconv.FormatInt(modelParseConfig.ComponentInfo.StationID, 10), + ComponentID: strconv.FormatInt(modelParseConfig.ComponentInfo.ID, 10), UUID: uuid, AnchorName: anchorName, - CompareValUpperLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustFloat64()), - CompareValLowerLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(index).Get("lower_limit").MustFloat64()), + CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustFloat64(), + CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("lower_limit").MustFloat64(), }, } anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(modelParseConfig.ComponentInfo.ComponentType, anchorName, unmarshalMap)