From 8a4116879b460acccb7e7d7ea1ebd23eeee9cc18 Mon Sep 17 00:00:00 2001 From: douxu Date: Tue, 2 Dec 2025 17:26:15 +0800 Subject: [PATCH] add real time data measurement target update func --- constants/subscription.go | 20 ++- handler/real_time_data_pull.go | 3 + handler/real_time_data_subscription.go | 206 ++++++++++++++++++++++--- network/real_time_data_request.go | 6 +- 4 files changed, 209 insertions(+), 26 deletions(-) diff --git a/constants/subscription.go b/constants/subscription.go index fa22f43..e78967f 100644 --- a/constants/subscription.go +++ b/constants/subscription.go @@ -8,17 +8,19 @@ const ( SubStopAction string = "stop" // SubAppendAction define the real time subscription append action SubAppendAction string = "append" + // SubUpdateAction define the real time subscription update action + SubUpdateAction string = "update" ) // 定义状态常量 const ( // SubSuccessCode define subscription success code SubSuccessCode = "1001" - // SubSuccessCode define subscription failed code + // SubFailedCode define subscription failed code SubFailedCode = "1002" - // RTDSuccessCode define real time data resturn success code + // RTDSuccessCode define real time data return success code RTDSuccessCode = "1003" - // SubSuccessCode define real time data resturn failed code + // RTDFailedCode define real time data return failed code RTDFailedCode = "1004" // CancelSubSuccessCode define cancel subscription success code CancelSubSuccessCode = "1005" @@ -26,6 +28,10 @@ const ( CancelSubFailedCode = "1006" // SubRepeatCode define subscription repeat code SubRepeatCode = "1007" + // UpdateSubSuccessCode define update subscription success code + UpdateSubSuccessCode = "1008" + // UpdateSubFailedCode define update subscription failed code + UpdateSubFailedCode = "1009" ) const ( @@ -33,9 +39,9 @@ const ( SubSuccessMsg = "subscription success" // SubFailedMsg define subscription failed message SubFailedMsg = "subscription failed" - // RTDSuccessMsg define real time data resturn success message + // RTDSuccessMsg define real time data return success message RTDSuccessMsg = "real time data return success" - // RTDFailedMsg define real time data resturn failed message + // RTDFailedMsg define real time data return failed message RTDFailedMsg = "real time data return failed" // CancelSubSuccessMsg define cancel subscription success message CancelSubSuccessMsg = "cancel subscription success" @@ -43,6 +49,10 @@ const ( CancelSubFailedMsg = "cancel subscription failed" // SubRepeatMsg define subscription repeat message SubRepeatMsg = "subscription repeat in target interval" + // UpdateSubSuccessMsg define update subscription success message + UpdateSubSuccessMsg = "update subscription success" + // UpdateSubFailedMsg define update subscription failed message + UpdateSubFailedMsg = "update subscription failed" ) // TargetOperationType define constant to the target operation type diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index 5d7c79b..47c1a05 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -218,6 +218,9 @@ func processTargetPolling(ctx context.Context, s *SharedSubState, clientID strin appendTargets(ctx, config, stopChanMap, fanInChan, transportTargets.Targets) case constants.OpRemove: removeTargets(ctx, stopChanMap, transportTargets.Targets) + case constants.OpUpdate: + // TODO 处理更新操作 + fmt.Println(11111) } config.mutex.Unlock() case <-ctx.Done(): diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index 45bf7be..607e267 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -41,12 +41,12 @@ func init() { // "payload": { // "targets": [ // { -// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", +// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_C_rms", // "code": "1001", // "msg": "subscription success" // }, // { -// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms", +// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms", // "code": "1002", // "msg": "subscription failed" // } @@ -62,12 +62,12 @@ func init() { // "payload": { // "targets": [ // { -// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", +// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_A_rms", // "code": "1002", // "msg": "subscription failed" // }, // { -// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms", +// "id": "grid1.zone1.station1.ns1.tag1.bay.I11_B_rms", // "code": "1002", // "msg": "subscription failed" // } @@ -108,6 +108,9 @@ func RealTimeSubHandler(c *gin.Context) { } else if request.Action == constants.SubStopAction && request.ClientID != "" { subAction = request.Action clientID = request.ClientID + } else if request.Action == constants.SubUpdateAction && request.ClientID != "" { + subAction = request.Action + clientID = request.ClientID } pgClient := database.GetPostgresDBClient() @@ -179,6 +182,30 @@ func RealTimeSubHandler(c *gin.Context) { return } + c.JSON(http.StatusOK, network.SuccessResponse{ + Code: http.StatusOK, + Msg: "success", + Payload: network.RealTimeSubPayload{ + ClientID: clientID, + TargetResults: results, + }, + }) + return + case constants.SubUpdateAction: + results, err := globalSubState.UpdateTargets(c, tx, clientID, request.Measurements) + if err != nil { + logger.Error(c, "update target to real time data subscription config failed", "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + Payload: network.RealTimeSubPayload{ + ClientID: clientID, + TargetResults: results, + }, + }) + return + } + c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", @@ -237,8 +264,8 @@ func NewSharedSubState() *SharedSubState { } } -// processAndValidateTargets define func to perform all database I/O operations in a lock-free state (eg,ParseDataIdentifierToken) -func processAndValidateTargets(ctx context.Context, tx *gorm.DB, measurements []network.RealTimeMeasurementItem, allReqTargetNum int) ( +// processAndValidateTargetsForStart define func to perform all database I/O operations in a lock-free state for start action +func processAndValidateTargetsForStart(ctx context.Context, tx *gorm.DB, measurements []network.RealTimeMeasurementItem, allReqTargetNum int) ( []network.TargetResult, []string, map[string][]string, map[string]*TargetPollingContext, @@ -282,8 +309,62 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, measurements [] return targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap } -// mergeMeasurements define func to merge newMeasurementsMap into existingMeasurementsMap -func mergeMeasurements(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) []string { +// processAndValidateTargetsForUpdate define func to perform all database I/O operations in a lock-free state for update action +func processAndValidateTargetsForUpdate(ctx context.Context, tx *gorm.DB, config *RealTimeSubConfig, measurements []network.RealTimeMeasurementItem, allReqTargetNum int) ( + []network.TargetResult, []string, + map[string][]string, + map[string]*TargetPollingContext, +) { + targetProcessResults := make([]network.TargetResult, 0, allReqTargetNum) + newMeasMap := make(map[string][]string) + successfulTargets := make([]string, 0, allReqTargetNum) + newMeasContextMap := make(map[string]*TargetPollingContext) + + for _, measurementItem := range measurements { + interval := measurementItem.Interval + for _, target := range measurementItem.Targets { + targetResult := network.TargetResult{ID: target} + if _, exist := config.targetContext[target]; !exist { + err := fmt.Errorf("target %s does not exists in subscription list", target) + logger.Error(ctx, "update target does not exist in subscription list", "error", err, "target", target) + targetResult.Code = constants.UpdateSubFailedCode + targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error()) + targetProcessResults = append(targetProcessResults, targetResult) + continue + } + + targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) + if err != nil { + logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) + targetResult.Code = constants.UpdateSubFailedCode + targetResult.Msg = fmt.Sprintf("%s: %s", constants.UpdateSubFailedMsg, err.Error()) + targetProcessResults = append(targetProcessResults, targetResult) + continue + } + + targetResult.Code = constants.UpdateSubSuccessCode + targetResult.Msg = constants.UpdateSubSuccessMsg + targetProcessResults = append(targetProcessResults, targetResult) + successfulTargets = append(successfulTargets, target) + + if _, ok := newMeasMap[interval]; !ok { + newMeasMap[interval] = make([]string, 0, len(measurementItem.Targets)) + } + + meas := newMeasMap[interval] + meas = append(meas, target) + newMeasMap[interval] = meas + newMeasContextMap[target] = &TargetPollingContext{ + interval: interval, + measurement: targetModel.GetMeasurementInfo(), + } + } + } + return targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap +} + +// mergeMeasurementsForStart define func to merge newMeasurementsMap into existingMeasurementsMap for start action +func mergeMeasurementsForStart(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) []string { allDuplicates := make([]string, 0) for interval, newMeas := range newMeasurements { if existingMeas, ok := config.measurements[interval]; ok { @@ -307,10 +388,59 @@ func mergeMeasurements(config *RealTimeSubConfig, newMeasurements map[string][]s return allDuplicates } +// mergeMeasurementsForUpdate define func to merge newMeasurementsMap into existingMeasurementsMap for update action +func mergeMeasurementsForUpdate(config *RealTimeSubConfig, newMeasurements map[string][]string, newMeasurementsContextMap map[string]*TargetPollingContext) ([]string, error) { + allDuplicates := make([]string, 0) + delMeasMap := make(map[string][]string) + for _, newMeas := range newMeasurements { + for _, measurement := range newMeas { + oldInterval := config.targetContext[measurement].interval + if _, ok := delMeasMap[oldInterval]; !ok { + delMeasurements := []string{measurement} + delMeasMap[oldInterval] = delMeasurements + } else { + delMeasurements := delMeasMap[oldInterval] + delMeasurements = append(delMeasurements, measurement) + delMeasMap[oldInterval] = delMeasurements + } + } + } + + for interval, delMeas := range delMeasMap { + existingMeas, exist := config.measurements[interval] + if !exist { + return nil, fmt.Errorf("can not find exist measurements in %s interval", interval) + } + + measurements := util.RemoveTargetsFromSliceSimple(existingMeas, delMeas) + config.measurements[interval] = measurements + } + + for interval, newMeas := range newMeasurements { + if existingMeas, ok := config.measurements[interval]; ok { + deduplicated, duplicates := util.DeduplicateAndReportDuplicates(existingMeas, newMeas) + + if len(duplicates) > 0 { + for _, duplicate := range duplicates { + delete(newMeasurementsContextMap, duplicate) + } + allDuplicates = append(allDuplicates, duplicates...) + } + + if len(deduplicated) > 0 { + existingMeas = append(existingMeas, deduplicated...) + config.measurements[interval] = existingMeas + maps.Copy(config.targetContext, newMeasurementsContextMap) + } + } + } + return allDuplicates, nil +} + // CreateConfig define function to create config in SharedSubState func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(measurements) - targetProcessResults, _, newMeasurementsMap, measurementContexts := processAndValidateTargets(ctx, tx, measurements, requestTargetsCount) + targetProcessResults, _, newMeasurementsMap, measurementContexts := processAndValidateTargetsForStart(ctx, tx, measurements, requestTargetsCount) s.globalMutex.Lock() if _, exist := s.subMap[clientID]; exist { s.globalMutex.Unlock() @@ -343,10 +473,10 @@ func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientI return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err } - targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargets(ctx, tx, measurements, requestTargetsCount) + targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForStart(ctx, tx, measurements, requestTargetsCount) config.mutex.Lock() - allDuplicates := mergeMeasurements(config, newMeasMap, newMeasContextMap) + allDuplicates := mergeMeasurementsForStart(config, newMeasMap, newMeasContextMap) if len(allDuplicates) > 0 { logger.Warn(ctx, "some targets are duplicate and have been ignored in append operation", "clientID", clientID, "duplicates", allDuplicates) // process repeat target in targetProcessResults and successfulTargets @@ -391,7 +521,7 @@ func filterAndDeduplicateRepeatTargets(resultsSlice []network.TargetResult, idsS // UpsertTargets define function to upsert targets in SharedSubState func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(measurements) - targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargets(ctx, tx, measurements, requestTargetsCount) + targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForStart(ctx, tx, measurements, requestTargetsCount) s.globalMutex.RLock() config, exist := s.subMap[clientID] @@ -401,7 +531,7 @@ func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientI if exist { opType = constants.OpUpdate config.mutex.Lock() - mergeMeasurements(config, newMeasMap, newMeasContextMap) + mergeMeasurementsForStart(config, newMeasMap, newMeasContextMap) config.mutex.Unlock() } else { opType = constants.OpAppend @@ -415,7 +545,7 @@ func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientI } else { s.globalMutex.Unlock() config.mutex.Lock() - mergeMeasurements(config, newMeasMap, newMeasContextMap) + mergeMeasurementsForStart(config, newMeasMap, newMeasContextMap) config.mutex.Unlock() } s.globalMutex.Unlock() @@ -527,6 +657,48 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea return targetProcessResults, nil } +// UpdateTargets define function to update targets in SharedSubState +func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { + requestTargetsCount := processRealTimeRequestCount(measurements) + targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) + + s.globalMutex.RLock() + config, exist := s.subMap[clientID] + s.globalMutex.RUnlock() + + if !exist { + s.globalMutex.RUnlock() + err := fmt.Errorf("clientID %s not found", clientID) + logger.Error(ctx, "clientID not found in remove targets operation", "error", err) + return processRealTimeRequestTargets(measurements, requestTargetsCount, err), err + } + + targetProcessResults, successfulTargets, newMeasMap, newMeasContextMap := processAndValidateTargetsForUpdate(ctx, tx, config, measurements, requestTargetsCount) + + config.mutex.Lock() + allDuplicates, err := mergeMeasurementsForUpdate(config, newMeasMap, newMeasContextMap) + if err != nil { + logger.Warn(ctx, "can not find exist measurements in target interval", "clientID", clientID, "duplicates", allDuplicates, "error", err) + } + + if len(allDuplicates) > 0 { + logger.Warn(ctx, "some targets are duplicate and have been ignored in append operation", "clientID", clientID, "duplicates", allDuplicates) + // process repeat target in targetProcessResults and successfulTargets + targetProcessResults, successfulTargets = filterAndDeduplicateRepeatTargets(targetProcessResults, successfulTargets, allDuplicates) + } + config.mutex.Unlock() + + if len(successfulTargets) > 0 { + transportTargets := &transportTargets{ + OperationType: constants.OpUpdate, + Targets: successfulTargets, + } + config.noticeChan <- transportTargets + } + + return targetProcessResults, nil +} + // Get define function to get subscriptions config from SharedSubState func (s *SharedSubState) Get(clientID string) (*RealTimeSubConfig, bool) { s.globalMutex.RLock() @@ -539,12 +711,10 @@ func (s *SharedSubState) Get(clientID string) (*RealTimeSubConfig, bool) { return config, true } -// TODO 增加一个update 函数用来更新 interval - func processRealTimeRequestCount(measurements []network.RealTimeMeasurementItem) int { totalTargetsCount := 0 - for _, compItem := range measurements { - totalTargetsCount += len(compItem.Targets) + for _, measItem := range measurements { + totalTargetsCount += len(measItem.Targets) } return totalTargetsCount } diff --git a/network/real_time_data_request.go b/network/real_time_data_request.go index 7ace3b6..50b8c1a 100644 --- a/network/real_time_data_request.go +++ b/network/real_time_data_request.go @@ -26,18 +26,18 @@ type RealTimeSubRequest struct { // RealTimeMeasurementItem define struct of real time measurement item type RealTimeMeasurementItem struct { Interval string `json:"interval" example:"1" description:"数据采集的时间间隔(秒)"` - Targets []string `json:"targets" example:"[\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms\",\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms\"]" description:"需要采集数据的测点或标签名称列表"` + Targets []string `json:"targets" example:"[\"grid1.zone1.station1.ns1.tag1.bay.I11_A_rms\",\"grid1.zone1.station1.ns1.tag1.tag1.bay.I11_B_rms\"]" description:"需要采集数据的测点或标签名称列表"` } // RealTimePullPayload define struct of pull real time data payload type RealTimePullPayload struct { // required: true - Targets []RealTimePullTarget `json:"targets" example:"{\"targets\":[{\"id\":\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms\",\"datas\":[{\"time\":1736305467506000000,\"value\":1},{\"time\":1736305467506000000,\"value\":1}]},{\"id\":\"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms\",\"datas\":[{\"time\":1736305467506000000,\"value\":1},{\"time\":1736305467506000000,\"value\":1}]}]}" description:"实时数据"` + Targets []RealTimePullTarget `json:"targets" example:"{\"targets\":[{\"id\":\"grid1.zone1.station1.ns1.tag1.bay.I11_A_rms\",\"datas\":[{\"time\":1736305467506000000,\"value\":1},{\"time\":1736305467506000000,\"value\":1}]},{\"id\":\"grid1.zone1.station1.ns1.tag1.bay.I11_B_rms\",\"datas\":[{\"time\":1736305467506000000,\"value\":1},{\"time\":1736305467506000000,\"value\":1}]}]}" description:"实时数据"` } // RealTimePullTarget define struct of pull real time data target type RealTimePullTarget struct { - ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms" description:"实时数据ID值"` + ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.bay.I11_A_rms" description:"实时数据ID值"` Datas []RealTimePullData `json:"datas" example:"[{\"time\":1736305467506000000,\"value\":220},{\"time\":1736305467506000000,\"value\":220}]" description:"实时数据值数组"` }