From f5ea909120d6c3ffac5ce5c5475dd65b71c6cd1c Mon Sep 17 00:00:00 2001 From: douxu Date: Tue, 4 Nov 2025 17:12:15 +0800 Subject: [PATCH] optimize real time data query api --- database/fill_identity_token_model.go | 66 ++++++++++ database/query_measurement.go | 21 +++- handler/real_time_data_monitor.go | 168 +++++++++++++++++++++++--- model/attribute_model.go | 8 +- model/identity_token_model.go | 76 ++++++++++++ network/response.go | 12 ++ 6 files changed, 329 insertions(+), 22 deletions(-) create mode 100644 database/fill_identity_token_model.go create mode 100644 model/identity_token_model.go diff --git a/database/fill_identity_token_model.go b/database/fill_identity_token_model.go new file mode 100644 index 0000000..694dd89 --- /dev/null +++ b/database/fill_identity_token_model.go @@ -0,0 +1,66 @@ +// Package database define database operation functions +package database + +import ( + "context" + "strings" + + "modelRT/model" + + "gorm.io/gorm" +) + +// FillingShortTokenModel define filling short token model info +func FillingShortTokenModel(ctx context.Context, tx *gorm.DB, attrItems []string, attrModel *model.ShortAttrInfo) error { + // TODO 重新创建tokenModel 及相关sql查询函数 + // component, err := QueryComponentByLongToken(ctx, tx, attrItems[0]) + // if err != nil { + // return err + // } + // attrModel.ComponentInfo = &component + return nil +} + +// FillingLongTokenModel define filling long token model info +func FillingLongTokenModel(ctx context.Context, tx *gorm.DB, attrItems []string, attrModel *model.LongAttrInfo) error { + // TODO 重新创建tokenModel 及相关sql查询函数 + // component, err := QueryComponentByShortToken(ctx, tx, attrItems[3]) + // if err != nil { + // return err + // } + // attrModel.ComponentInfo = &component + return nil +} + +// ParseDataIdentifierToken define function to parse data identifier token function +func ParseDataIdentifierToken(ctx context.Context, tx *gorm.DB, identToken string) (model.AttrModelInterface, error) { + // TODO 使用identityToken代替ShortAttrInfo等 + attrSlice := strings.Split(identToken, ".") + attrLen := len(attrSlice) + if attrLen == 4 { + short := &model.ShortAttrInfo{ + AttrGroupName: attrSlice[2], + AttrKey: attrSlice[3], + } + err := FillingShortTokenModel(ctx, tx, attrSlice, short) + if err != nil { + return nil, err + } + + // short.AttrValue = attrValue + return short, nil + } else if attrLen == 7 { + long := &model.LongAttrInfo{ + AttrGroupName: attrSlice[5], + AttrKey: attrSlice[6], + } + err := FillingLongTokenModel(ctx, tx, attrSlice, long) + if err != nil { + return nil, err + } + + // long.AttrValue = attrValue + return long, nil + } + return nil, nil +} diff --git a/database/query_measurement.go b/database/query_measurement.go index 5b671de..69a93f7 100644 --- a/database/query_measurement.go +++ b/database/query_measurement.go @@ -13,13 +13,32 @@ import ( // QueryMeasurementByID return the result of query circuit diagram component measurement info by id from postgresDB func QueryMeasurementByID(ctx context.Context, tx *gorm.DB, id int64) (orm.Measurement, error) { - var component orm.Measurement + var measurement orm.Measurement // ctx超时判断 cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result := tx.WithContext(cancelCtx). Where("id = ?", id). Clauses(clause.Locking{Strength: "UPDATE"}). + First(&measurement) + + if result.Error != nil { + return orm.Measurement{}, result.Error + } + return measurement, nil +} + +// QueryMeasurementByToken define function query circuit diagram component measurement info by token from postgresDB +func QueryMeasurementByToken(ctx context.Context, tx *gorm.DB, token string) (orm.Measurement, error) { + // TODO parse token to avoid SQL injection + + var component orm.Measurement + // ctx超时判断 + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + result := tx.WithContext(cancelCtx). + Where(" = ?", token). + Clauses(clause.Locking{Strength: "UPDATE"}). First(&component) if result.Error != nil { diff --git a/handler/real_time_data_monitor.go b/handler/real_time_data_monitor.go index 47fc38e..d228e25 100644 --- a/handler/real_time_data_monitor.go +++ b/handler/real_time_data_monitor.go @@ -2,6 +2,7 @@ package handler import ( + "context" "fmt" "net/http" "sync" @@ -63,19 +64,35 @@ func RealTimeMonitorHandler(c *gin.Context) { switch monitorAction { case constants.MonitorStartAction: - fmt.Println(monitorID) - var config RealTimeMonitorConfig - for _, component := range request.Components { - fmt.Println(config) - globalMonitorState.Get(monitorID, component.Interval) + err := globalMonitorState.CreateConfig(monitorID, request.Components) + if err != nil { + logger.Error(c, "create real time data monitor config failed", "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + }) + return } + + c.JSON(http.StatusOK, network.SuccessResponse{ + Code: http.StatusOK, + Msg: "success", + PayLoad: map[string]interface{}{ + "monitor_id": monitorID, + }, + }) + return case constants.MonitorStopAction: - for _, component := range request.Components { - fmt.Println(component.Interval) - } + globalMonitorState.RemoveTargets(c, monitorID, request.Components) case constants.MonitorAppendAction: - for _, component := range request.Components { - fmt.Println(component.Interval) + err := globalMonitorState.AppendTargets(monitorID, request.Components) + if err != nil { + logger.Error(c, "append target to real time data monitor config failed", "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + }) + return } default: err := fmt.Errorf("%w: %s", constants.ErrUnsupportedAction, request.Action) @@ -90,8 +107,7 @@ func RealTimeMonitorHandler(c *gin.Context) { // RealTimeMonitorComponent define struct of real time monitor component type RealTimeMonitorComponent struct { - interval string - targets []string + targets []string } // RealTimeMonitorConfig define struct of real time monitor config @@ -113,14 +129,89 @@ func NewSharedMonitorState() *SharedMonitorState { } } -// Set defined function to set value in SharedMonitorState -func (s *SharedMonitorState) Set(key string, value *RealTimeMonitorConfig) { +// CreateConfig define function to create config in SharedMonitorState +func (s *SharedMonitorState) CreateConfig(monitorID string, components []network.RealTimeComponentItem) error { s.mutex.Lock() defer s.mutex.Unlock() - s.monitorMap[key] = value + + if _, exist := s.monitorMap[monitorID]; exist { + return fmt.Errorf("monitorID %s already exists. Use AppendTargets to modify existing config", monitorID) + } + + config := &RealTimeMonitorConfig{ + noticeChan: make(chan struct{}), + components: make(map[string]*RealTimeMonitorComponent), + } + + for _, compent := range components { + config.components[compent.Interval] = &RealTimeMonitorComponent{ + targets: compent.Targets, + } + } + + s.monitorMap[monitorID] = config + return nil } -// Get defined function to get value from SharedMonitorState +// AppendTargets define function to append targets in SharedMonitorState +func (s *SharedMonitorState) AppendTargets(monitorID string, components []network.RealTimeComponentItem) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + config, exist := s.monitorMap[monitorID] + if !exist { + return fmt.Errorf("monitorID %s not found. Use CreateConfig to start a new config", monitorID) + } + + for _, compent := range components { + interval := compent.Interval + comp, compExist := config.components[interval] + if !compExist { + comp = &RealTimeMonitorComponent{ + targets: comp.targets, + } + config.components[interval] = comp + + } else { + comp.targets = append(comp.targets, comp.targets...) + } + } + + config.noticeChan <- struct{}{} + return nil +} + +// UpsertTargets define function to upsert targets in SharedMonitorState +func (s *SharedMonitorState) UpsertTargets(monitorID string, interval string, newTargets []string) (isNewMonitor bool, err error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + config, exist := s.monitorMap[monitorID] + if !exist { + config = &RealTimeMonitorConfig{ + noticeChan: make(chan struct{}), + components: make(map[string]*RealTimeMonitorComponent), + } + config.components[interval] = &RealTimeMonitorComponent{ + targets: newTargets, + } + s.monitorMap[monitorID] = config + return true, nil + } + + comp, compExist := config.components[interval] + if !compExist { + comp = &RealTimeMonitorComponent{ + targets: newTargets, + } + config.components[interval] = comp + } else { + comp.targets = append(comp.targets, newTargets...) + } + return false, nil +} + +// Get define function to get value from SharedMonitorState func (s *SharedMonitorState) Get(monitorID, interval string) ([]string, bool) { s.mutex.RLock() defer s.mutex.RUnlock() @@ -134,6 +225,49 @@ func (s *SharedMonitorState) Get(monitorID, interval string) ([]string, bool) { if !ok { return nil, false } - return component.targets, true } + +// RemoveTargets define function to remove targets in SharedMonitorState +func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string, components []network.RealTimeComponentItem) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + config, exist := s.monitorMap[monitorID] + if !exist { + return fmt.Errorf("monitorID %s not found", monitorID) + } + + for _, compent := range components { + interval := compent.Interval + comp, compExist := config.components[interval] + if !compExist { + logger.Error(ctx, fmt.Sprintf("component with interval %s not found under monitorID %s", interval, monitorID), "monitorID", monitorID, "interval", interval) + continue + } + + targetsToRemoveMap := make(map[string]struct{}) + for _, target := range compent.Targets { + targetsToRemoveMap[target] = struct{}{} + } + + var newTargets []string + for _, existingTarget := range comp.targets { + if _, found := targetsToRemoveMap[existingTarget]; !found { + newTargets = append(newTargets, existingTarget) + } + } + comp.targets = newTargets + + if len(comp.targets) == 0 { + delete(config.components, interval) + } + } + + if len(config.components) == 0 { + delete(s.monitorMap, monitorID) + } + + config.noticeChan <- struct{}{} + return nil +} diff --git a/model/attribute_model.go b/model/attribute_model.go index 4dc9400..ff7d55e 100644 --- a/model/attribute_model.go +++ b/model/attribute_model.go @@ -19,7 +19,7 @@ type AttrModelInterface interface { type LongAttrInfo struct { AttrGroupName string AttrKey string - AttrValue interface{} + AttrValue any GridInfo *orm.Grid ZoneInfo *orm.Zone StationInfo *orm.Station @@ -52,7 +52,7 @@ func (l *LongAttrInfo) IsLocal() bool { } // GetAttrValue define return the attribute value -func (l *LongAttrInfo) GetAttrValue() interface{} { +func (l *LongAttrInfo) GetAttrValue() any { return l.AttrValue } @@ -60,7 +60,7 @@ func (l *LongAttrInfo) GetAttrValue() interface{} { type ShortAttrInfo struct { AttrGroupName string AttrKey string - AttrValue interface{} + AttrValue any ComponentInfo *orm.Component } @@ -90,6 +90,6 @@ func (s *ShortAttrInfo) IsLocal() bool { } // GetAttrValue define return the attribute value -func (l *ShortAttrInfo) GetAttrValue() interface{} { +func (l *ShortAttrInfo) GetAttrValue() any { return l.AttrValue } diff --git a/model/identity_token_model.go b/model/identity_token_model.go new file mode 100644 index 0000000..0e57e57 --- /dev/null +++ b/model/identity_token_model.go @@ -0,0 +1,76 @@ +// Package model define model struct of model runtime service +package model + +import "modelRT/orm" + +// IndentityTokenModelInterface define basic identity token model type interface +type IndentityTokenModelInterface interface { + GetGridInfo() *orm.Grid + GetZoneInfo() *orm.Zone + GetStationInfo() *orm.Station + GetComponentInfo() *orm.Component + IsLocal() bool +} + +// LongIdentityTokenModel define struct to long identity token info +type LongIdentityTokenModel struct { + GridInfo *orm.Grid + ZoneInfo *orm.Zone + StationInfo *orm.Station + ComponentInfo *orm.Component +} + +// GetGridInfo define return the grid information in the long identity token +func (l *LongIdentityTokenModel) GetGridInfo() *orm.Grid { + return l.GridInfo +} + +// GetZoneInfo define return the zone information in the long identity token +func (l *LongIdentityTokenModel) GetZoneInfo() *orm.Zone { + return l.ZoneInfo +} + +// GetStationInfo define return the station information in the long identity token +func (l *LongIdentityTokenModel) GetStationInfo() *orm.Station { + return l.StationInfo +} + +// GetComponentInfo define return the component information in the long identity token +func (l *LongIdentityTokenModel) GetComponentInfo() *orm.Component { + return l.ComponentInfo +} + +// IsLocal define return the is_local information in the long identity token +func (l *LongIdentityTokenModel) IsLocal() bool { + return false +} + +// ShortIdentityTokenModel define struct to short identity token info +type ShortIdentityTokenModel struct { + ComponentInfo *orm.Component +} + +// GetGridInfo define return the grid information in the short identity token +func (s *ShortIdentityTokenModel) GetGridInfo() *orm.Grid { + return nil +} + +// GetZoneInfo define return the zone information in the short identity token +func (s *ShortIdentityTokenModel) GetZoneInfo() *orm.Zone { + return nil +} + +// GetStationInfo define return the station information in the short identity token +func (s *ShortIdentityTokenModel) GetStationInfo() *orm.Station { + return nil +} + +// GetComponentInfo define return the component information in the short identity token +func (s *ShortIdentityTokenModel) GetComponentInfo() *orm.Component { + return s.ComponentInfo +} + +// IsLocal define return the is_local information in the short identity token +func (s *ShortIdentityTokenModel) IsLocal() bool { + return true +} diff --git a/network/response.go b/network/response.go index 98fdeb6..a36b519 100644 --- a/network/response.go +++ b/network/response.go @@ -21,3 +21,15 @@ type MeasurementRecommendPayload struct { Offset int `json:"offset" example:"21"` RecommendedList []string `json:"recommended_list" example:"[\"I_A_rms\", \"I_B_rms\",\"I_C_rms\"]"` } + +// Target define struct of target item in real time data query response payload +type Target struct { + ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"` + Code string `json:"code" example:"1001"` + Msg string `json:"msg" example:"subscription success"` +} + +// RealTimeQueryPayload define struct of real time data query request +type RealTimeQueryPayload struct { + Targets []Target `json:"targets"` +}