rewrite the real-time data acquisition and processing workflow

This commit is contained in:
douxu 2025-01-21 16:35:44 +08:00
parent 43dece39c1
commit 2b967450eb
17 changed files with 302 additions and 175 deletions

View File

@ -0,0 +1,10 @@
package config
import "context"
// AnchorChanConfig define anchor params channel config struct
type AnchorChanConfig struct {
Ctx context.Context // 结束 context
AnchorChan chan AnchorParamConfig // 锚定参量实时值传递通道
ReadyChan chan struct{} // 就绪通知通道
}

View File

@ -15,12 +15,11 @@ type AnchorParamListConfig struct {
// AnchorParamBaseConfig define anchor params base config struct
type AnchorParamBaseConfig struct {
StationID string // component表 station_id
ComponentID string // component表 ID
UUID string // component表 global_uuid
AnchorName string // 锚定参量名称
CompareValUpperLimit float64 // 比较值上限
CompareValLowerLimit float64 // 比较值下限
ComponentID int64 // component表 ID
AnchorName string // 锚定参量名称
CompareValUpperLimit float64 // 比较值上限
CompareValLowerLimit float64 // 比较值下限
AnchorRealTimeData []float64 // 锚定参数实时值
}
// AnchorParamConfig define anchor params config struct
@ -28,8 +27,6 @@ type AnchorParamConfig struct {
AnchorParamBaseConfig
CalculateFunc func(archorValue float64, args ...float64) float64 // 计算函数
CalculateParams []float64 // 计算参数
APIURL string // API URL
APIMethod string // API Method
}
var baseVoltageFunc = func(archorValue float64, args ...float64) float64 {

View File

@ -3,14 +3,12 @@ package database
import (
"context"
"fmt"
"strconv"
"time"
"modelRT/config"
"modelRT/diagram"
"modelRT/orm"
"github.com/gofrs/uuid"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"gorm.io/gorm"
@ -19,18 +17,18 @@ import (
// QueryCircuitDiagramComponentFromDB return the result of query circuit diagram component info order by page id from postgresDB
func QueryCircuitDiagramComponentFromDB(ctx context.Context, tx *gorm.DB, pool *ants.PoolWithFunc, logger *zap.Logger) error {
var Components []orm.Component
var components []orm.Component
// ctx超时判断
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// TODO 将 for update 操作放到事务中
result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&Components)
result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&components)
if result.Error != nil {
logger.Error("query circuit diagram component info failed", zap.Error(result.Error))
return result.Error
}
for _, component := range Components {
for _, component := range components {
pool.Invoke(config.ModelParseConfig{
ComponentInfo: component,
Context: ctx,
@ -39,27 +37,16 @@ func QueryCircuitDiagramComponentFromDB(ctx context.Context, tx *gorm.DB, pool *
return nil
}
// QueryElectricalEquipmentUUID return the result of query electrical equipment uuid from postgresDB by circuit diagram id info
func QueryElectricalEquipmentUUID(ctx context.Context, diagramID int64, logger *zap.Logger) error {
var uuids []string
// QueryComponentByUUID return the result of query circuit diagram component info by uuid from postgresDB
func QueryComponentByUUID(ctx context.Context, tx *gorm.DB, uuid uuid.UUID) (orm.Component, error) {
var component orm.Component
// ctx超时判断
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
tableName := "circuit_diagram_" + strconv.FormatInt(diagramID, 10)
result := _globalPostgresClient.Table(tableName).WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Select("uuid").Find(&uuids)
result := tx.WithContext(cancelCtx).Where("global_uuid = ? ", uuid).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&component)
if result.Error != nil {
logger.Error("query circuit diagram overview info failed", zap.Error(result.Error))
return result.Error
return orm.Component{}, result.Error
}
for _, uuid := range uuids {
diagramParamsMap, err := diagram.GetComponentMap(uuid)
if err != nil {
logger.Error("get electrical circuit diagram overview info failed", zap.Error(result.Error))
return result.Error
}
fmt.Println(diagramParamsMap, err)
}
return nil
return component, nil
}

View File

@ -0,0 +1,56 @@
package diagram
import (
"context"
"sync"
"modelRT/config"
"github.com/panjf2000/ants/v2"
)
func init() {
globalComponentChanSet = &ComponentChanSet{
AnchorChans: make([]chan config.AnchorParamConfig, 100),
}
}
var globalComponentChanSet *ComponentChanSet
// ComponentChanSet defines component anchor real time data process channel set
type ComponentChanSet struct {
sync.RWMutex
AnchorChans []chan config.AnchorParamConfig
}
func GetComponentChan(ctx context.Context, componentID int64, pool *ants.PoolWithFunc) chan config.AnchorParamConfig {
globalComponentChanSet.RLock()
componentChan := globalComponentChanSet.AnchorChans[componentID]
if componentChan == nil {
globalComponentChanSet.RUnlock()
globalComponentChanSet.Lock()
defer globalComponentChanSet.Unlock()
return CreateComponentChan(ctx, componentID, pool)
}
globalComponentChanSet.RUnlock()
return componentChan
}
func CreateComponentChan(ctx context.Context, componentID int64, pool *ants.PoolWithFunc) chan config.AnchorParamConfig {
componentChan := globalComponentChanSet.AnchorChans[componentID]
if componentChan == nil {
componentChan = make(chan config.AnchorParamConfig, 100)
globalComponentChanSet.AnchorChans[componentID] = componentChan
readyChan := make(chan struct{})
chanConfig := config.AnchorChanConfig{
Ctx: ctx,
AnchorChan: componentChan,
ReadyChan: readyChan,
}
pool.Invoke(chanConfig)
<-readyChan
return componentChan
}
return componentChan
}

View File

@ -9,11 +9,11 @@ import (
// diagramsOverview define struct of storage all circuit diagram data
var diagramsOverview sync.Map
// GetComponentMap define func of get circuit diagram data by global uuid
func GetComponentMap(uuid string) (map[string]interface{}, error) {
value, ok := diagramsOverview.Load(uuid)
// GetComponentMap define func of get circuit diagram data by component id
func GetComponentMap(componentID int64) (map[string]interface{}, error) {
value, ok := diagramsOverview.Load(componentID)
if !ok {
return nil, fmt.Errorf("can not find graph by global uuid:%s", uuid)
return nil, fmt.Errorf("can not find graph by global uuid:%d", componentID)
}
paramsMap, ok := value.(map[string]interface{})
if !ok {
@ -22,20 +22,20 @@ func GetComponentMap(uuid string) (map[string]interface{}, error) {
return paramsMap, nil
}
// UpdateComponentMap define func of update circuit diagram data by global uuid and component info
func UpdateComponentMap(uuid string, componentInfo map[string]interface{}) bool {
_, result := diagramsOverview.Swap(uuid, componentInfo)
// UpdateComponentMap define func of update circuit diagram data by component id and component info
func UpdateComponentMap(componentID int64, componentInfo map[string]interface{}) bool {
_, result := diagramsOverview.Swap(componentID, componentInfo)
return result
}
// StoreComponentMap define func of store circuit diagram data with global uuid and component info
func StoreComponentMap(uuid string, componentInfo map[string]interface{}) {
diagramsOverview.Store(uuid, componentInfo)
// StoreComponentMap define func of store circuit diagram data with component id and component info
func StoreComponentMap(componentID int64, componentInfo map[string]interface{}) {
diagramsOverview.Store(componentID, componentInfo)
return
}
// DeleteComponentMap define func of delete circuit diagram data with global uuid
func DeleteComponentMap(uuid string) {
diagramsOverview.Delete(uuid)
// DeleteComponentMap define func of delete circuit diagram data with component id
func DeleteComponentMap(componentID int64) {
diagramsOverview.Delete(componentID)
return
}

View File

@ -5,7 +5,6 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"time"
"modelRT/config"
@ -17,8 +16,6 @@ import (
"modelRT/network"
"modelRT/orm"
realtimedata "modelRT/real-time-data"
"github.com/bitly/go-simplejson"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
@ -139,18 +136,18 @@ func ComponentAnchorReplaceHandler(c *gin.Context) {
diagram.UpdateAnchorValue(uuid, anchorName)
// TODO 检查 anchorParam 配置
anchorParam := config.AnchorParamConfig{
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
StationID: componentInfo.StationID,
ComponentID: strconv.FormatInt(componentInfo.ID, 10),
UUID: uuid,
ComponentID: componentInfo.ID,
AnchorName: anchorName,
CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("upper_limit").MustFloat64(),
CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(anchorIndex).Get("lower_limit").MustFloat64(),
},
}
anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(componentInfo.ComponentType, anchorName, unmarshalMap)
realtimedata.AnchorParamsChan <- anchorParam
// TODO 改成启动diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig
// realtimedata.AnchorParamsChan <- anchorParam
resp := network.SuccessResponse{
SuccessResponseHeader: network.SuccessResponseHeader{Status: http.StatusOK},

View File

@ -100,7 +100,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) {
graph.AddEdge(topologicCreateInfo.UUIDFrom, topologicCreateInfo.UUIDTo)
}
for _, componentInfo := range request.ComponentInfos {
for index, componentInfo := range request.ComponentInfos {
componentID, err := database.CreateComponentIntoDB(c, tx, componentInfo)
if err != nil {
tx.Rollback()
@ -116,6 +116,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) {
c.JSON(http.StatusOK, resp)
return
}
request.ComponentInfos[index].ID = componentID
err = database.CreateModelIntoDB(c, tx, componentID, componentInfo.ComponentType, componentInfo.Params)
if err != nil {
@ -169,7 +170,7 @@ func CircuitDiagramCreateHandler(c *gin.Context) {
c.JSON(http.StatusOK, resp)
return
}
diagram.StoreComponentMap(componentInfo.UUID, componentMap)
diagram.StoreComponentMap(componentInfo.ID, componentMap)
}
if len(request.FreeVertexs) > 0 {

View File

@ -210,7 +210,7 @@ func CircuitDiagramDeleteHandler(c *gin.Context) {
c.JSON(http.StatusOK, resp)
return
}
diagram.DeleteComponentMap(componentInfo.UUID)
diagram.DeleteComponentMap(component.ID)
}
if len(request.FreeVertexs) > 0 {

View File

@ -5,6 +5,7 @@ import (
"net/http"
"strconv"
"modelRT/database"
"modelRT/diagram"
"modelRT/logger"
"modelRT/network"
@ -25,6 +26,8 @@ import (
// @Router /model/diagram_load/{page_id} [get]
func CircuitDiagramLoadHandler(c *gin.Context) {
logger := logger.GetLoggerInstance()
pgClient := database.GetPostgresDBClient()
pageID, err := strconv.ParseInt(c.Query("page_id"), 10, 64)
if err != nil {
logger.Error("get pageID from url param failed", zap.Error(err))
@ -59,26 +62,55 @@ func CircuitDiagramLoadHandler(c *gin.Context) {
componentParamMap := make(map[string]any)
for _, VerticeLink := range topologicInfo.VerticeLinks {
for _, componentUUID := range VerticeLink {
UUIDStr := componentUUID.String()
componentParams, err := diagram.GetComponentMap(UUIDStr)
component, err := database.QueryComponentByUUID(c, pgClient, componentUUID)
if err != nil {
logger.Error("get component id info from DB by uuid failed", zap.Error(err))
header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()}
resp := network.FailureResponse{
FailResponseHeader: header,
PayLoad: map[string]interface{}{
"uuid": componentUUID,
},
}
c.JSON(http.StatusOK, resp)
return
}
componentParams, err := diagram.GetComponentMap(component.ID)
if err != nil {
logger.Error("get component data from set by uuid failed", zap.Error(err))
header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()}
resp := network.FailureResponse{
FailResponseHeader: header,
PayLoad: map[string]interface{}{
"uuid": UUIDStr,
"uuid": componentUUID,
},
}
c.JSON(http.StatusOK, resp)
return
}
componentParamMap[UUIDStr] = componentParams
componentParamMap[componentUUID.String()] = componentParams
}
}
rootVertexUUID := topologicInfo.RootVertex.String()
rootComponentParam, err := diagram.GetComponentMap(rootVertexUUID)
rootComponent, err := database.QueryComponentByUUID(c, pgClient, topologicInfo.RootVertex)
if err != nil {
logger.Error("get component id info from DB by uuid failed", zap.Error(err))
header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()}
resp := network.FailureResponse{
FailResponseHeader: header,
PayLoad: map[string]interface{}{
"uuid": topologicInfo.RootVertex,
},
}
c.JSON(http.StatusOK, resp)
return
}
rootComponentParam, err := diagram.GetComponentMap(rootComponent.ID)
if err != nil {
logger.Error("get component data from set by uuid failed", zap.Error(err))
header := network.FailResponseHeader{Status: http.StatusBadRequest, ErrMsg: err.Error()}

View File

@ -99,7 +99,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
}
}
for _, componentInfo := range request.ComponentInfos {
for index, componentInfo := range request.ComponentInfos {
componentID, err := database.UpdateComponentIntoDB(c, tx, componentInfo)
if err != nil {
logger.Error("udpate component info into DB failed", zap.Error(err))
@ -115,6 +115,8 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
return
}
request.ComponentInfos[index].ID = componentID
err = database.UpdateModelIntoDB(c, tx, componentID, componentInfo.ComponentType, componentInfo.Params)
if err != nil {
logger.Error("udpate component model info into DB failed", zap.Error(err))
@ -161,7 +163,7 @@ func CircuitDiagramUpdateHandler(c *gin.Context) {
c.JSON(http.StatusOK, resp)
return
}
diagram.UpdateComponentMap(componentInfo.UUID, componentMap)
diagram.UpdateComponentMap(componentInfo.ID, componentMap)
}
if len(request.FreeVertexs) > 0 {

11
main.go
View File

@ -72,21 +72,20 @@ func main() {
}
defer parsePool.Release()
// init data polling ants pool
pollingPool, err := ants.NewPoolWithFunc(modelRTConfig.PollingConcurrentQuantity, pool.AnchorFunc)
// init anchor param ants pool
// TODO 修改PollingConcurrentQuantity 名称
anchorPool, err := ants.NewPoolWithFunc(modelRTConfig.PollingConcurrentQuantity, pool.AnchorFunc)
if err != nil {
zapLogger.Error("init concurrent data polling task pool failed", zap.Error(err))
panic(err)
}
defer pollingPool.Release()
defer anchorPool.Release()
// init cancel context
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
// init anchor channel
go realtimedata.AnchorParamChangeChan(cancelCtx, pollingPool)
// init real time data receive channel
go realtimedata.ReceiveChan(cancelCtx)
go realtimedata.ReceiveChan(cancelCtx, anchorPool)
postgresDBClient.Transaction(func(tx *gorm.DB) error {
// load circuit diagram from postgres

View File

@ -21,6 +21,7 @@ type TopologicUUIDCreateInfo struct {
// ComponentCreateInfo defines circuit diagram component create index info
type ComponentCreateInfo struct {
ID int64 `json:"id"`
UUID string `json:"uuid"`
Name string `json:"name"`
Context string `json:"context"`

View File

@ -33,6 +33,7 @@ type TopologicUUIDChangeInfos struct {
// ComponentUpdateInfo defines circuit diagram component params info
type ComponentUpdateInfo struct {
ID int64 `json:"id"`
UUID string `json:"uuid"`
Name string `json:"name"`
Context string `json:"context"`

View File

@ -3,84 +3,102 @@ package pool
import (
"fmt"
"strconv"
"time"
"modelRT/config"
"modelRT/diagram"
"modelRT/logger"
"modelRT/network"
"go.uber.org/zap"
)
var AnchorFunc = func(anchorConfig interface{}) {
// AnchorFunc defines func that process the real time data of component anchor params
var AnchorFunc = func(anchorChan interface{}) {
var firstStart bool
logger := logger.GetLoggerInstance()
var firstTimePolling bool
paramConfig, ok := anchorConfig.(config.AnchorParamConfig)
fmt.Println(logger)
fmt.Println(anchorChan)
anchorChanConfig, ok := anchorChan.(config.AnchorChanConfig)
if !ok {
logger.Error("conversion model anchor param config type failed")
logger.Error("conversion component anchor chan type failed")
return
}
for {
var beginUnixTime, endUnixTime int64
if firstTimePolling {
milliUnixTime := time.Now().UnixMilli()
endUnixTime = milliUnixTime
beginUnixTime = milliUnixTime - 1000*60
firstTimePolling = false
} else {
// 判断时间差值是否小于10s如果小于则sleep0.5s后重新获取时间
endUnixTime = time.Now().UnixMilli()
if endUnixTime-beginUnixTime < 1000 {
time.Sleep(time.Duration(500 * time.Millisecond))
endUnixTime = time.Now().UnixMilli()
}
}
pollingAPI := network.APIEndpoint{
URL: paramConfig.APIURL,
Method: paramConfig.APIMethod,
QueryParams: map[string]string{
"station": paramConfig.StationID,
"component": paramConfig.ComponentID,
"point": paramConfig.AnchorName,
"begin": strconv.FormatInt(beginUnixTime, 10),
"end": strconv.FormatInt(endUnixTime, 10),
},
}
if !firstTimePolling {
beginUnixTime = time.Now().UnixMilli()
}
valueSlice, err := network.PollAPIEndpoints(pollingAPI)
if err != nil {
logger.Error("polling real time data from dataRT service failed", zap.Error(err))
continue
}
for _, value := range valueSlice {
anchorName, err := diagram.GetAnchorValue(paramConfig.UUID)
if err != nil {
logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err))
continue
}
if anchorName != paramConfig.AnchorName {
logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", paramConfig.AnchorName))
continue
}
upperLimitVal := paramConfig.CompareValUpperLimit
lowerlimitVal := paramConfig.CompareValLowerLimit
compareValue := paramConfig.CalculateFunc(value, paramConfig.CalculateParams...)
if compareValue > upperLimitVal || compareValue < lowerlimitVal {
// TODO 选择报警方式
fmt.Println("log warning")
select {
case <-anchorChanConfig.Ctx.Done():
return
case anchorParaConfig := <-anchorChanConfig.AnchorChan:
if firstStart {
close(anchorChanConfig.ReadyChan)
firstStart = false
}
// TODO 重写anchorParaConfig 处理逻辑
fmt.Println(anchorParaConfig)
default:
}
}
// var firstTimePolling bool
// paramConfig, ok := anchorConfig.(config.AnchorParamConfig)
// if !ok {
// logger.Error("conversion model anchor param config type failed")
// return
// }
// for {
// var beginUnixTime, endUnixTime int64
// if firstTimePolling {
// milliUnixTime := time.Now().UnixMilli()
// endUnixTime = milliUnixTime
// beginUnixTime = milliUnixTime - 1000*60
// firstTimePolling = false
// } else {
// // 判断时间差值是否小于10s如果小于则sleep0.5s后重新获取时间
// endUnixTime = time.Now().UnixMilli()
// if endUnixTime-beginUnixTime < 1000 {
// time.Sleep(time.Duration(500 * time.Millisecond))
// endUnixTime = time.Now().UnixMilli()
// }
// }
// pollingAPI := network.APIEndpoint{
// URL: paramConfig.APIURL,
// Method: paramConfig.APIMethod,
// QueryParams: map[string]string{
// "station": paramConfig.StationID,
// "component": paramConfig.ComponentID,
// "point": paramConfig.AnchorName,
// "begin": strconv.FormatInt(beginUnixTime, 10),
// "end": strconv.FormatInt(endUnixTime, 10),
// },
// }
// if !firstTimePolling {
// beginUnixTime = time.Now().UnixMilli()
// }
// valueSlice, err := network.PollAPIEndpoints(pollingAPI)
// if err != nil {
// logger.Error("polling real time data from dataRT service failed", zap.Error(err))
// continue
// }
// for _, value := range valueSlice {
// anchorName, err := diagram.GetAnchorValue(paramConfig.UUID)
// if err != nil {
// logger.Error("can not get anchor value from map by uuid", zap.String("uuid", paramConfig.UUID), zap.Error(err))
// continue
// }
// if anchorName != paramConfig.AnchorName {
// logger.Error("anchor name not equal param config anchor value", zap.String("map_anchor_name", anchorName), zap.String("param_anchor_name", paramConfig.AnchorName))
// continue
// }
// upperLimitVal := paramConfig.CompareValUpperLimit
// lowerlimitVal := paramConfig.CompareValLowerLimit
// compareValue := paramConfig.CalculateFunc(value, paramConfig.CalculateParams...)
// if compareValue > upperLimitVal || compareValue < lowerlimitVal {
// // TODO 选择报警方式
// fmt.Println("log warning")
// }
// }
// }
}

View File

@ -3,7 +3,6 @@ package pool
import (
"context"
"strconv"
"time"
"modelRT/config"
@ -11,7 +10,6 @@ import (
"modelRT/diagram"
"modelRT/logger"
"modelRT/model"
realtimedata "modelRT/real-time-data"
"github.com/bitly/go-simplejson"
"go.uber.org/zap"
@ -57,11 +55,11 @@ var ParseFunc = func(parseConfig interface{}) {
for index := range paramsList {
anchorName := anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustString()
// TODO 检查anchorParam 配置是否正确
anchorParam := config.AnchorParamConfig{
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
StationID: modelParseConfig.ComponentInfo.StationID,
ComponentID: strconv.FormatInt(modelParseConfig.ComponentInfo.ID, 10),
UUID: uuid,
ComponentID: modelParseConfig.ComponentInfo.ID,
AnchorName: anchorName,
CompareValUpperLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("upper_limit").MustFloat64(),
CompareValLowerLimit: anchorConfigsJSON.Get("params_list").GetIndex(index).Get("lower_limit").MustFloat64(),
@ -69,7 +67,8 @@ var ParseFunc = func(parseConfig interface{}) {
}
anchorParam.CalculateFunc, anchorParam.CalculateParams = config.SelectAnchorCalculateFuncAndParams(modelParseConfig.ComponentInfo.ComponentType, anchorName, unmarshalMap)
diagram.StoreAnchorValue(modelParseConfig.ComponentInfo.GlobalUUID.String(), anchorName)
realtimedata.AnchorParamsChan <- anchorParam
// TODO 改成启动diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig
// realtimedata.AnchorParamsChan <- anchorParam
}
}
@ -86,6 +85,6 @@ var ParseFunc = func(parseConfig interface{}) {
unmarshalMap["op"] = modelParseConfig.ComponentInfo.Op
unmarshalMap["ts"] = modelParseConfig.ComponentInfo.Ts
diagram.StoreComponentMap(uuid, unmarshalMap)
diagram.StoreComponentMap(modelParseConfig.ComponentInfo.ID, unmarshalMap)
return
}

View File

@ -1,30 +0,0 @@
// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"modelRT/config"
"github.com/panjf2000/ants/v2"
)
// AnchorParamsChan define channel of component anchor param change
var AnchorParamsChan chan config.AnchorParamConfig
func init() {
AnchorParamsChan = make(chan config.AnchorParamConfig, 100)
}
// AnchorParamChangeChan define func of component anchor param change notification process
func AnchorParamChangeChan(ctx context.Context, pool *ants.PoolWithFunc) {
for {
select {
case <-ctx.Done():
return
case anchorParam := <-AnchorParamsChan:
pool.Invoke(anchorParam)
default:
}
}
}

View File

@ -3,9 +3,15 @@ package realtimedata
import (
"context"
"fmt"
"modelRT/config"
"modelRT/constant"
"modelRT/diagram"
"modelRT/logger"
"modelRT/network"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// RealTimeDataChan define channel of real time data receive
@ -16,13 +22,64 @@ func init() {
}
// ReceiveChan define func of real time data receive and process
func ReceiveChan(ctx context.Context) {
func ReceiveChan(ctx context.Context, pool *ants.PoolWithFunc) {
logger := logger.GetLoggerInstance()
for {
select {
case <-ctx.Done():
return
case realTimeData := <-RealTimeDataChan:
fmt.Println(realTimeData.PayLoad.ComponentID)
// TODO 根据 componentID 缓存到来的实时数据
componentID := realTimeData.PayLoad.ComponentID
component, err := diagram.GetComponentMap(componentID)
if err != nil {
logger.Error("query component info from diagram map by componet id failed", zap.Int64("component_id", componentID), zap.Error(err))
continue
}
componentType := component["component_type"].(int)
if componentType != constant.DemoType {
logger.Error("can not process real time data of component type not equal DemoType", zap.Int64("component_id", componentID))
continue
}
var anchorName string
var compareValUpperLimit, compareValLowerLimit float64
var anchorRealTimeData []float64
var calculateFunc func(archorValue float64, args ...float64) float64
if anchoringV := component["anchor_v"].(bool); anchoringV {
anchorName = "voltage"
compareValUpperLimit = component["uv_alarm"].(float64)
compareValLowerLimit = component["ov_alarm"].(float64)
} else {
anchorName = "current"
compareValUpperLimit = component["ui_alarm"].(float64)
compareValLowerLimit = component["oi_alarm"].(float64)
}
componentData := map[string]interface{}{
"resistance": component["resistance"].(float64),
}
calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData)
for _, param := range realTimeData.PayLoad.Values {
anchorRealTimeData = append(anchorRealTimeData, param.Value)
}
anchorConfig := config.AnchorParamConfig{
AnchorParamBaseConfig: config.AnchorParamBaseConfig{
ComponentID: componentID,
AnchorName: anchorName,
CompareValUpperLimit: compareValUpperLimit,
CompareValLowerLimit: compareValLowerLimit,
AnchorRealTimeData: anchorRealTimeData,
},
CalculateFunc: calculateFunc,
CalculateParams: params,
}
diagram.GetComponentChan(ctx, componentID, pool) <- anchorConfig
default:
}
}