From f8b9a702508e64eef164f5e85de5d098cbc96f45 Mon Sep 17 00:00:00 2001 From: douxu Date: Fri, 20 Dec 2024 16:06:42 +0800 Subject: [PATCH] add replace anchor point api and optimize code of anchor param data polling function points --- config/anchor_param_config.go | 41 +++++- constant/error.go | 3 + diagram/anchor_set.go | 41 ++++++ handler/anchor_point_replace.go | 157 ++++++++++++++++++++++ main.go | 3 + network/anchor_point_replace_request.go | 8 ++ pool/concurrency_anchor_parse.go | 167 +++++++++++++----------- pool/concurrency_model_parse.go | 27 ++-- 8 files changed, 358 insertions(+), 89 deletions(-) create mode 100644 diagram/anchor_set.go create mode 100644 handler/anchor_point_replace.go create mode 100644 network/anchor_point_replace_request.go diff --git a/config/anchor_param_config.go b/config/anchor_param_config.go index 1cd3044..f32a4f6 100644 --- a/config/anchor_param_config.go +++ b/config/anchor_param_config.go @@ -1,6 +1,18 @@ // Package config define config struct of model runtime service package config +import ( + "modelRT/constant" +) + +// AnchorParamListConfig define anchor params list config struct +type AnchorParamListConfig struct { + AnchorName string + FuncType string // 函数类型 + UpperLimit float32 // 比较值上限 + LowerLimit float32 // 比较值下限 +} + // AnchorParamBaseConfig define anchor params base config struct type AnchorParamBaseConfig struct { UUID string @@ -12,11 +24,32 @@ type AnchorParamBaseConfig struct { // AnchorParamConfig define anchor params config struct type AnchorParamConfig struct { AnchorParamBaseConfig - CalculateFunc func(args ...float32) float32 // 计算函数 - CalculateParams []float32 // 计算参数 + CalculateFunc func(archorValue float32, args ...float32) float32 // 计算函数 + CalculateParams []float32 // 计算参数 } -// TODO 完成内容细化 -func SelectFuncAndParamsByAnchorName(anchorName string) (func(args ...float32) float32, []float32) { +var baseVoltageFunc = func(archorValue float32, args ...float32) float32 { + voltage := archorValue + resistance := args[1] + return voltage / resistance +} + +var baseCurrentFunc = func(archorValue float32, args ...float32) float32 { + current := archorValue + resistance := args[1] + return current * resistance +} + +// SelectAnchorCalculateFuncAndParams define select anchor func and anchor calculate value by component type 、 anchor name and component data +func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float32, args ...float32) float32, []float32) { + if componentType == constant.DemoType { + if anchorName == "voltage" { + resistance := componentData["resistance"].(float32) + return baseVoltageFunc, []float32{resistance} + } else if anchorName == "current" { + resistance := componentData["resistance"].(float32) + return baseCurrentFunc, []float32{resistance} + } + } return nil, []float32{} } diff --git a/constant/error.go b/constant/error.go index 71eea83..cb48f93 100644 --- a/constant/error.go +++ b/constant/error.go @@ -11,6 +11,9 @@ var ErrUpdateRowZero = errors.New("update affected rows is zero") // ErrDeleteRowZero define error of delete affected row zero var ErrDeleteRowZero = errors.New("delete affected rows is zero") +// ErrQueryRowZero define error of query affected row zero +var ErrQueryRowZero = errors.New("query affected rows is zero") + // ErrInsertRowUnexpected define error of insert affected row not reach expected number var ErrInsertRowUnexpected = errors.New("the number of inserted data rows don't reach the expected value") diff --git a/diagram/anchor_set.go b/diagram/anchor_set.go new file mode 100644 index 0000000..ac548be --- /dev/null +++ b/diagram/anchor_set.go @@ -0,0 +1,41 @@ +package diagram + +import ( + "errors" + "fmt" + "sync" +) + +// anchorValueOverview define struct of storage all anchor value +var anchorValueOverview sync.Map + +// GetAnchorValue define func of get circuit diagram data by global uuid +func GetAnchorValue(uuid string) (string, error) { + value, ok := diagramsOverview.Load(uuid) + if !ok { + return "", fmt.Errorf("can not find anchor value by global uuid:%s", uuid) + } + anchorValue, ok := value.(string) + if !ok { + return "", errors.New("convert to string failed") + } + return anchorValue, nil +} + +// UpdateAnchorValue define func of update anchor value by global uuid and anchor name +func UpdateAnchorValue(uuid string, anchorValue string) bool { + _, result := anchorValueOverview.Swap(uuid, anchorValue) + return result +} + +// StoreAnchorValue define func of store anchor value with global uuid and anchor name +func StoreAnchorValue(uuid string, anchorValue string) { + anchorValueOverview.Store(uuid, anchorValue) + return +} + +// DeleteAnchorValue define func of delete anchor value with global uuid +func DeleteAnchorValue(uuid string) { + anchorValueOverview.Delete(uuid) + return +} diff --git a/handler/anchor_point_replace.go b/handler/anchor_point_replace.go new file mode 100644 index 0000000..9d10401 --- /dev/null +++ b/handler/anchor_point_replace.go @@ -0,0 +1,157 @@ +package handler + +import ( + "context" + "fmt" + "net/http" + "time" + + "modelRT/config" + "modelRT/constant" + "modelRT/database" + "modelRT/diagram" + "modelRT/log" + "modelRT/model" + "modelRT/network" + "modelRT/orm" + realtimedata "modelRT/real-time-data" + + "github.com/bitly/go-simplejson" + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +// ComponentAnchorReplaceHandler define component anchor point replace process API +func ComponentAnchorReplaceHandler(c *gin.Context) { + var uuid, anchorName string + logger := log.GetLoggerInstance() + pgClient := database.GetPostgresDBClient() + + cancelCtx, cancel := context.WithTimeout(c, 5*time.Second) + defer cancel() + + var request network.ComponetAnchorReplaceRequest + if err := c.ShouldBindJSON(&request); err != nil { + logger.Error("unmarshal component anchor point replace info failed", zap.Error(err)) + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()} + resp := network.FailureResponse{ + FailResponseHeader: header, + } + c.JSON(http.StatusOK, resp) + return + } + uuid = request.UUID + anchorName = request.AnchorName + + var componentInfo *orm.Component + result := pgClient.WithContext(cancelCtx).Model(&orm.Component{}).Where("global_uuid = ?", uuid).Find(componentInfo) + if result.Error != nil { + logger.Error("query component detail info failed", zap.Error(result.Error)) + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: result.Error.Error()} + resp := network.FailureResponse{ + FailResponseHeader: header, + } + c.JSON(http.StatusOK, resp) + return + } + + if componentInfo == nil { + err := fmt.Errorf("query component detail info by uuid failed:%w", constant.ErrQueryRowZero) + logger.Error("query component detail info from table is empty", zap.String("table_name", componentInfo.TableName())) + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()} + resp := network.FailureResponse{ + FailResponseHeader: header, + } + c.JSON(http.StatusOK, resp) + return + } + + cancelCtx, cancel = context.WithTimeout(c, 5*time.Second) + defer cancel() + unmarshalMap := make(map[string]interface{}) + tableName := model.SelectModelNameByType(componentInfo.ComponentType) + result = pgClient.WithContext(cancelCtx).Table(tableName).Where("global_uuid = ?", uuid).Find(&unmarshalMap) + if result.Error != nil { + logger.Error("query model detail info failed", zap.Error(result.Error)) + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: result.Error.Error()} + resp := network.FailureResponse{ + FailResponseHeader: header, + } + c.JSON(http.StatusOK, resp) + return + } + + if unmarshalMap == nil { + err := fmt.Errorf("query model detail info by uuid failed:%w", constant.ErrQueryRowZero) + logger.Error("query model detail info from table is empty", zap.String("table_name", tableName)) + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()} + resp := network.FailureResponse{ + FailResponseHeader: header, + } + c.JSON(http.StatusOK, resp) + return + } + + configsStr, exist := unmarshalMap["anchor_configs_list"] + if !exist { + logger.Error("can not find anchor config list in model model detail info") + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: "can not find anchor config list in model model detail info"} + resp := network.FailureResponse{ + FailResponseHeader: header, + } + c.JSON(http.StatusOK, resp) + return + } + + anchorConfigsJSON, err := simplejson.NewJson([]byte(configsStr.(string))) + if err != nil { + logger.Error("formmat anchor configs json failed", zap.Error(err)) + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()} + resp := network.FailureResponse{ + FailResponseHeader: header, + } + c.JSON(http.StatusOK, resp) + return + } + + var existFlag bool + var anchorIndex int + paramsList := anchorConfigsJSON.Get("params_list").MustArray() + for index := range paramsList { + paramAnchorName := anchorConfigsJSON.Get("params_list").GetIndex(index).Get("anchor_name").MustString() + if anchorName == paramAnchorName { + existFlag = true + anchorIndex = index + } + } + + if !existFlag { + header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: "can not find new anchor name in model anchor point list"} + resp := network.FailureResponse{ + FailResponseHeader: header, + } + c.JSON(http.StatusOK, resp) + return + } + + diagram.UpdateAnchorValue(uuid, anchorName) + + anchorParam := config.AnchorParamConfig{ + AnchorParamBaseConfig: config.AnchorParamBaseConfig{ + UUID: uuid, + AnchorName: anchorName, + CompareValUpperLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("upper_limit").MustFloat64()), + CompareValLowerLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("lower_limit").MustFloat64()), + }, + } + anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(componentInfo.ComponentType, anchorName, unmarshalMap) + realtimedata.AnchorParamsChan <- anchorParam + + resp := network.SuccessResponse{ + SuccessResponseHeader: network.SuccessResponseHeader{Status: http.StatusOK}, + PayLoad: map[string]interface{}{ + "uuid": request.UUID, + }, + } + c.JSON(http.StatusOK, resp) +} diff --git a/main.go b/main.go index 0e374bf..454bf46 100644 --- a/main.go +++ b/main.go @@ -112,6 +112,9 @@ func main() { engine.POST("/model/diagram_update", handler.CircuitDiagramUpdateHandler) engine.POST("/model/diagram_delete", handler.CircuitDiagramDeleteHandler) + // anchor api + engine.POST("/model/anchor_replace", handler.ComponentAnchorReplaceHandler) + // dashborad api dashboard := engine.Group("/dashboard", limiter.Middleware) { diff --git a/network/anchor_point_replace_request.go b/network/anchor_point_replace_request.go new file mode 100644 index 0000000..2b6dec9 --- /dev/null +++ b/network/anchor_point_replace_request.go @@ -0,0 +1,8 @@ +// Package network define struct of network operation +package network + +// ComponetAnchorReplaceRequest defines request params of component anchor point replace api +type ComponetAnchorReplaceRequest struct { + UUID string `json:"uuid"` + AnchorName string `json:"anchor_name"` +} diff --git a/pool/concurrency_anchor_parse.go b/pool/concurrency_anchor_parse.go index 4e8bb9a..56aea01 100644 --- a/pool/concurrency_anchor_parse.go +++ b/pool/concurrency_anchor_parse.go @@ -3,14 +3,9 @@ package pool import ( "fmt" - "io/ioutil" - "log" - "net/http" - "strings" - "sync" - "time" "modelRT/config" + "modelRT/diagram" "go.uber.org/zap" ) @@ -23,78 +18,98 @@ var AnchorFunc = func(anchorConfig interface{}) { logger.Error("conversion model anchor param config type failed") return } - fmt.Println(paramConfig) - // TODO 解析 paramConfig 轮询 dataRT http 接口 -} + calculateFunc := paramConfig.CalculateFunc + for { + // TODO 通过 api 循环获取 vlaue 实时值 + var value float32 + anchorName, err := diagram.GetAnchorValue(paramConfig.UUID) + if err != nil { + logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err)) + continue + } -type APIEndpoint struct { - URL string `json:"url"` - Method string `json:"method"` // HTTP 方法,如 "GET", "POST" - Headers map[string]string `json:"headers"` - QueryParams map[string]string `json:"query_params"` - Body string `json:"body"` // 对于 POST 请求等,可能需要一个请求体 - Interval int `json:"interval"` // 轮询间隔时间(秒) -} + if anchorName != paramConfig.AnchorName { + logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", paramConfig.AnchorName)) + continue + } -// fetchAPI 执行 HTTP 请求并返回响应体(作为字符串)或错误 -func fetchAPI(endpoint APIEndpoint) (string, error) { - client := &http.Client{} - - // 构建请求 - req, err := http.NewRequest(endpoint.Method, endpoint.URL, nil) - if err != nil { - return "", err - } - - // 设置请求头 - for key, value := range endpoint.Headers { - req.Header.Set(key, value) - } - - // 设置查询参数(如果需要) - q := req.URL.Query() - for key, value := range endpoint.QueryParams { - q.Set(key, value) - } - req.URL.RawQuery = q.Encode() - - // 设置请求体(如果需要,例如 POST 请求) - if endpoint.Method == "POST" || endpoint.Method == "PUT" { - req.Body = ioutil.NopCloser(strings.NewReader(endpoint.Body)) - req.Header.Set("Content-Type", "application/json") // 假设是 JSON 请求体 - } - - // 执行请求 - resp, err := client.Do(req) - if err != nil { - return "", err - } - defer resp.Body.Close() - - // 读取响应体 - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return "", err - } - - return string(body), nil -} - -// pollAPIEndpoints 轮询 API 端点列表,并根据指定的间隔时间执行请求 -func pollAPIEndpoints(endpoints []APIEndpoint, interval int, wg *sync.WaitGroup, results chan<- string) { - defer wg.Done() - for _, endpoint := range endpoints { - for { - body, err := fetchAPI(endpoint) - if err != nil { - log.Printf("Error fetching from %s: %v", endpoint.URL, err) - } else { - results <- fmt.Sprintf("Response from %s: %s", endpoint.URL, body) - } - time.Sleep(time.Duration(interval) * time.Second) - // 注意:这里使用了 endpoint.Interval 而不是传入的 interval, - // 但为了示例简单,我们统一使用传入的 interval。 - // 如果要根据每个端点的不同间隔来轮询,应该使用 endpoint.Interval。 + upperLimitVal := paramConfig.CompareValUpperLimit + lowerlimitVal := paramConfig.CompareValLowerLimit + compareValue := calculateFunc(value, paramConfig.CalculateParams...) + if compareValue > upperLimitVal || compareValue < lowerlimitVal { + fmt.Println("log warning") } } } + +// type APIEndpoint struct { +// URL string `json:"url"` +// Method string `json:"method"` // HTTP 方法,如 "GET", "POST" +// Headers map[string]string `json:"headers"` +// QueryParams map[string]string `json:"query_params"` +// Body string `json:"body"` // 对于 POST 请求等,可能需要一个请求体 +// Interval int `json:"interval"` // 轮询间隔时间(秒) +// } + +// // fetchAPI 执行 HTTP 请求并返回响应体(作为字符串)或错误 +// func fetchAPI(endpoint APIEndpoint) (string, error) { +// client := &http.Client{} + +// // 构建请求 +// req, err := http.NewRequest(endpoint.Method, endpoint.URL, nil) +// if err != nil { +// return "", err +// } + +// // 设置请求头 +// for key, value := range endpoint.Headers { +// req.Header.Set(key, value) +// } + +// // 设置查询参数(如果需要) +// q := req.URL.Query() +// for key, value := range endpoint.QueryParams { +// q.Set(key, value) +// } +// req.URL.RawQuery = q.Encode() + +// // 设置请求体(如果需要,例如 POST 请求) +// if endpoint.Method == "POST" || endpoint.Method == "PUT" { +// req.Body = ioutil.NopCloser(strings.NewReader(endpoint.Body)) +// req.Header.Set("Content-Type", "application/json") // 假设是 JSON 请求体 +// } + +// // 执行请求 +// resp, err := client.Do(req) +// if err != nil { +// return "", err +// } +// defer resp.Body.Close() + +// // 读取响应体 +// body, err := ioutil.ReadAll(resp.Body) +// if err != nil { +// return "", err +// } + +// return string(body), nil +// } + +// // pollAPIEndpoints 轮询 API 端点列表,并根据指定的间隔时间执行请求 +// func pollAPIEndpoints(endpoints []APIEndpoint, interval int, wg *sync.WaitGroup, results chan<- string) { +// defer wg.Done() +// for _, endpoint := range endpoints { +// for { +// body, err := fetchAPI(endpoint) +// if err != nil { +// log.Printf("Error fetching from %s: %v", endpoint.URL, err) +// } else { +// results <- fmt.Sprintf("Response from %s: %s", endpoint.URL, body) +// } +// time.Sleep(time.Duration(interval) * time.Second) +// // 注意:这里使用了 endpoint.Interval 而不是传入的 interval, +// // 但为了示例简单,我们统一使用传入的 interval。 +// // 如果要根据每个端点的不同间隔来轮询,应该使用 endpoint.Interval。 +// } +// } +// } diff --git a/pool/concurrency_model_parse.go b/pool/concurrency_model_parse.go index d68c308..4eddbd1 100644 --- a/pool/concurrency_model_parse.go +++ b/pool/concurrency_model_parse.go @@ -34,11 +34,14 @@ var ParseFunc = func(parseConfig interface{}) { result := pgClient.WithContext(cancelCtx).Table(tableName).Where("global_uuid = ?", modelParseConfig.ComponentInfo.GlobalUUID).Find(&unmarshalMap) if result.Error != nil { logger.Error("query component detail info failed", zap.Error(result.Error)) + return } else if result.RowsAffected == 0 { logger.Error("query component detail info from table is empty", zap.String("table_name", tableName)) + return } - // TODO 判断对应 model 是否有锚定参量 + uuid := modelParseConfig.ComponentInfo.GlobalUUID.String() + configsStr, exist := unmarshalMap["anchor_configs_list"] if exist { anchorConfigsJSON, err := simplejson.NewJson([]byte(configsStr.(string))) @@ -48,18 +51,24 @@ var ParseFunc = func(parseConfig interface{}) { } paramsList := anchorConfigsJSON.Get("params_list").MustArray() - for _, param := range paramsList { - if baseConfig, ok := param.(config.AnchorParamBaseConfig); ok { - anchorParam := config.AnchorParamConfig{ - AnchorParamBaseConfig: baseConfig, - } - anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectFuncAndParamsByAnchorName(baseConfig.AnchorName) - realtimedata.AnchorParamsChan <- anchorParam + for index := range paramsList { + anchorName := anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustString() + + anchorParam := config.AnchorParamConfig{ + AnchorParamBaseConfig: config.AnchorParamBaseConfig{ + UUID: uuid, + AnchorName: anchorName, + CompareValUpperLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustFloat64()), + CompareValLowerLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(index).Get("lower_limit").MustFloat64()), + }, } + anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(modelParseConfig.ComponentInfo.ComponentType, anchorName, unmarshalMap) + diagram.StoreAnchorValue(modelParseConfig.ComponentInfo.GlobalUUID.String(), anchorName) + realtimedata.AnchorParamsChan <- anchorParam + } } - uuid := modelParseConfig.ComponentInfo.GlobalUUID.String() unmarshalMap["id"] = modelParseConfig.ComponentInfo.ID unmarshalMap["uuid"] = uuid unmarshalMap["created_time"] = modelParseConfig.ComponentInfo.VisibleID