write code for polling real time data from dataRT service

This commit is contained in:
douxu 2024-12-26 15:03:20 +08:00
parent 39e380ee1e
commit 5a9fa5cc4d
8 changed files with 133 additions and 55 deletions

View File

@ -9,47 +9,51 @@ import (
type AnchorParamListConfig struct {
AnchorName string
FuncType string // 函数类型
UpperLimit float32 // 比较值上限
LowerLimit float32 // 比较值下限
UpperLimit float64 // 比较值上限
LowerLimit float64 // 比较值下限
}
// AnchorParamBaseConfig define anchor params base config struct
type AnchorParamBaseConfig struct {
UUID string
StationID string // component表 station_id
ComponentID string // component表 ID
UUID string // component表 global_uuid
AnchorName string // 锚定参量名称
CompareValUpperLimit float32 // 比较值上限
CompareValLowerLimit float32 // 比较值下限
CompareValUpperLimit float64 // 比较值上限
CompareValLowerLimit float64 // 比较值下限
}
// AnchorParamConfig define anchor params config struct
type AnchorParamConfig struct {
AnchorParamBaseConfig
CalculateFunc func(archorValue float32, args ...float32) float32 // 计算函数
CalculateParams []float32 // 计算参数
CalculateFunc func(archorValue float64, args ...float64) float64 // 计算函数
CalculateParams []float64 // 计算参数
APIURL string // API URL
APIMethod string // API Method
}
var baseVoltageFunc = func(archorValue float32, args ...float32) float32 {
var baseVoltageFunc = func(archorValue float64, args ...float64) float64 {
voltage := archorValue
resistance := args[1]
return voltage / resistance
}
var baseCurrentFunc = func(archorValue float32, args ...float32) float32 {
var baseCurrentFunc = func(archorValue float64, args ...float64) float64 {
current := archorValue
resistance := args[1]
return current * resistance
}
// SelectAnchorCalculateFuncAndParams define select anchor func and anchor calculate value by component type 、 anchor name and component data
func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float32, args ...float32) float32, []float32) {
func SelectAnchorCalculateFuncAndParams(componentType int, anchorName string, componentData map[string]interface{}) (func(archorValue float64, args ...float64) float64, []float64) {
if componentType == constant.DemoType {
if anchorName == "voltage" {
resistance := componentData["resistance"].(float32)
return baseVoltageFunc, []float32{resistance}
resistance := componentData["resistance"].(float64)
return baseVoltageFunc, []float64{resistance}
} else if anchorName == "current" {
resistance := componentData["resistance"].(float32)
return baseCurrentFunc, []float32{resistance}
resistance := componentData["resistance"].(float64)
return baseCurrentFunc, []float64{resistance}
}
}
return nil, []float32{}
return nil, []float64{}
}

View File

@ -46,6 +46,15 @@ type LoggerConfig struct {
// AntsConfig define config stuct of ants pool config
type AntsConfig struct {
ParseConcurrentQuantity int `mapstructure:"parse_concurrent_quantity"` // parse comtrade file concurrent quantity
PollingConcurrentQuantity int `mapstructure:"polling_concurrent_quantity"` // polling real time data concurrent quantity
}
// DataRTConfig define config stuct of data runtime server api config
type DataRTConfig struct {
Host string `mapstructure:"host"`
Port int64 `mapstructure:"port"`
PollingAPI string `mapstructure:"polling_api"`
Method string `mapstructure:"polling_api_method"`
}
// ModelRTConfig define config stuct of model runtime server
@ -55,6 +64,7 @@ type ModelRTConfig struct {
KafkaConfig `mapstructure:"kafka"`
LoggerConfig `mapstructure:"logger"`
AntsConfig `mapstructure:"ants"`
DataRTConfig `mapstructure:"dataRT"`
PostgresDBURI string `mapstructure:"-"`
}
@ -71,12 +81,12 @@ func ReadAndInitConfig(configDir, configName, configType string) (modelRTConfig
panic(err)
}
rtConfig := ModelRTConfig{}
if err := config.Unmarshal(&rtConfig); err != nil {
modelRTconf := ModelRTConfig{}
if err := config.Unmarshal(&modelRTConfig); err != nil {
panic(fmt.Sprintf("unmarshal modelRT config failed:%s\n", err.Error()))
}
modelRTConfig.PostgresDBURI = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", rtConfig.Host, rtConfig.Port, rtConfig.User, rtConfig.Password, rtConfig.DataBase)
modelRTConfig.PostgresDBURI = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", modelRTconf.PostgresConfig.Host, modelRTconf.PostgresConfig.Port, modelRTconf.PostgresConfig.User, modelRTconf.PostgresConfig.Password, modelRTconf.PostgresConfig.DataBase)
return modelRTConfig
}

View File

@ -33,9 +33,17 @@ logger:
# ants config
ants:
parse_concurrent_quantity: 10
polling_concurrent_quantity: 10
# modelRT base config
base:
grid_id: 1
zone_id: 1
station_id: 1
# dataRT api config
dataRT:
host: "http://127.0.0.1"
port: 8888
polling_api: "datart/getPointData"
polling_api_method: "GET"

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"time"
"modelRT/config"
@ -139,10 +140,12 @@ func ComponentAnchorReplaceHandler(c *gin.Context) {
anchorParam := config.AnchorParamConfig{
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
StationID: strconv.FormatInt(componentInfo.StationID, 10),
ComponentID: strconv.FormatInt(componentInfo.ID, 10),
UUID: uuid,
AnchorName: anchorName,
CompareValUpperLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("upper_limit").MustFloat64()),
CompareValLowerLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("lower_limit").MustFloat64()),
CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("upper_limit").MustFloat64(),
CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("lower_limit").MustFloat64(),
},
}
anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(componentInfo.ComponentType, anchorName, unmarshalMap)

View File

@ -72,8 +72,7 @@ func main() {
defer parsePool.Release()
// init data polling ants pool
// TODO 优化轮询池初始化参数定义方式,改为从 config 中获取
pollingPool, err := ants.NewPoolWithFunc(1000, pool.AnchorFunc)
pollingPool, err := ants.NewPoolWithFunc(modelRTConfig.PollingConcurrentQuantity, pool.AnchorFunc)
if err != nil {
zapLogger.Error("init concurrent data polling task pool failed", zap.Error(err))
panic(err)

View File

@ -6,11 +6,8 @@ import (
"io"
"net/http"
"strings"
"time"
"modelRT/logger"
"go.uber.org/zap"
"github.com/bitly/go-simplejson"
)
// APIEndpoint defines an api endpoint struct to poll data from dataRT service
@ -61,18 +58,27 @@ func fetchAPI(endpoint APIEndpoint) (string, error) {
return string(body), nil
}
// pollAPIEndpoints defines unmarshal polling data from http request
func pollAPIEndpoints(endpoint APIEndpoint) {
logger := logger.GetLoggerInstance()
// PollAPIEndpoints defines unmarshal polling data from http request
func PollAPIEndpoints(endpoint APIEndpoint) ([]float64, error) {
var valueSlice []float64
respStr, err := fetchAPI(endpoint)
if err != nil {
logger.Error("unmarshal component anchor point replace info failed", zap.Error(err))
return
return valueSlice, fmt.Errorf("fetch api failed:%w", err)
}
fmt.Println(respStr)
time.Sleep(time.Duration(endpoint.Interval) * time.Second)
// 注意:这里使用了 endpoint.Interval 而不是传入的 interval
// 但为了示例简单,我们统一使用传入的 interval。
// 如果要根据每个端点的不同间隔来轮询,应该使用 endpoint.Interval。
realDataJSON, err := simplejson.NewJson([]byte(respStr))
if err != nil {
return valueSlice, fmt.Errorf("format real time data failed:%w", err)
}
simplejson.New().UnmarshalJSON([]byte(respStr))
code := realDataJSON.Get("code").MustInt()
if code != 0 {
return valueSlice, fmt.Errorf("polling data api status error:%s", realDataJSON.Get("msg").MustString())
}
dataLen := len(realDataJSON.Get("data").MustArray())
for i := 0; i < dataLen; i++ {
valueSlice = append(valueSlice, realDataJSON.Get("data").GetIndex(i).Get("value").MustFloat64())
}
return valueSlice, nil
}

View File

@ -3,25 +3,66 @@ package pool
import (
"fmt"
"strconv"
"time"
"modelRT/config"
"modelRT/diagram"
"modelRT/logger"
"modelRT/network"
"go.uber.org/zap"
)
var AnchorFunc = func(anchorConfig interface{}) {
logger := zap.L()
logger := logger.GetLoggerInstance()
var firstTimePolling bool
paramConfig, ok := anchorConfig.(config.AnchorParamConfig)
if !ok {
logger.Error("conversion model anchor param config type failed")
return
}
calculateFunc := paramConfig.CalculateFunc
for {
// TODO 通过 api 循环获取 vlaue 实时值
var value float32
var beginUnixTime, endUnixTime int64
if firstTimePolling {
milliUnixTime := time.Now().UnixMilli()
endUnixTime = milliUnixTime
beginUnixTime = milliUnixTime - 1000*60
firstTimePolling = false
} else {
// 判断时间差值是否小于10s如果小于则重新获取时间
endUnixTime = time.Now().UnixMilli()
if endUnixTime-beginUnixTime < 1000*10 {
time.Sleep(time.Duration(1 * time.Second))
endUnixTime = time.Now().UnixMilli()
}
}
pollingAPI := network.APIEndpoint{
URL: paramConfig.APIURL,
Method: paramConfig.APIMethod,
QueryParams: map[string]string{
"station": paramConfig.StationID,
"component": paramConfig.ComponentID,
"point": paramConfig.AnchorName,
"begin": strconv.FormatInt(beginUnixTime, 10),
"end": strconv.FormatInt(endUnixTime, 10),
},
}
if !firstTimePolling {
beginUnixTime = time.Now().UnixMilli()
}
valueSlice, err := network.PollAPIEndpoints(pollingAPI)
if err != nil {
logger.Error("polling real time data from dataRT service failed", zap.Error(err))
continue
}
for _, value := range valueSlice {
anchorName, err := diagram.GetAnchorValue(paramConfig.UUID)
if err != nil {
logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err))
@ -35,9 +76,11 @@ var AnchorFunc = func(anchorConfig interface{}) {
upperLimitVal := paramConfig.CompareValUpperLimit
lowerlimitVal := paramConfig.CompareValLowerLimit
compareValue := calculateFunc(value, paramConfig.CalculateParams...)
compareValue := paramConfig.CalculateFunc(value, paramConfig.CalculateParams...)
if compareValue > upperLimitVal || compareValue < lowerlimitVal {
// TODO 选择报警方式
fmt.Println("log warning")
}
}
}
}

View File

@ -3,11 +3,13 @@ package pool
import (
"context"
"strconv"
"time"
"modelRT/config"
"modelRT/database"
"modelRT/diagram"
"modelRT/logger"
"modelRT/model"
realtimedata "modelRT/real-time-data"
@ -15,8 +17,9 @@ import (
"go.uber.org/zap"
)
// ParseFunc defines func that parses the model data from postgres
var ParseFunc = func(parseConfig interface{}) {
logger := zap.L()
logger := logger.GetLoggerInstance()
modelParseConfig, ok := parseConfig.(config.ModelParseConfig)
if !ok {
@ -56,10 +59,12 @@ var ParseFunc = func(parseConfig interface{}) {
anchorParam := config.AnchorParamConfig{
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
StationID: strconv.FormatInt(modelParseConfig.ComponentInfo.StationID, 10),
ComponentID: strconv.FormatInt(modelParseConfig.ComponentInfo.ID, 10),
UUID: uuid,
AnchorName: anchorName,
CompareValUpperLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustFloat64()),
CompareValLowerLimit: float32(anchorConfigsJSON.Get("params_list").GetIndex(index).Get("lower_limit").MustFloat64()),
CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustFloat64(),
CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("lower_limit").MustFloat64(),
},
}
anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(modelParseConfig.ComponentInfo.ComponentType, anchorName, unmarshalMap)