From 80907519148db65d5676865cdf06da4003f59d63 Mon Sep 17 00:00:00 2001 From: douxu Date: Thu, 6 Nov 2025 17:22:14 +0800 Subject: [PATCH] optimize update monitor config func of real time data query api --- constants/monitor.go | 8 + handler/real_time_data_monitor.go | 321 +++++++++++++++--- ...y_request.go => real_time_data_request.go} | 10 + network/response.go | 7 +- 4 files changed, 293 insertions(+), 53 deletions(-) rename network/{real_time_data_query_request.go => real_time_data_request.go} (66%) diff --git a/constants/monitor.go b/constants/monitor.go index f2ed744..dfc9017 100644 --- a/constants/monitor.go +++ b/constants/monitor.go @@ -16,6 +16,10 @@ const ( SubSuccessCode = "1001" // SubSuccessCode define subscription failed code SubFailedCode = "1002" + // CancelSubSuccessCode define cancel subscription success code + CancelSubSuccessCode = "1005" + // CancelSubFailedCode define cancel subscription failed code + CancelSubFailedCode = "1006" ) const ( @@ -23,4 +27,8 @@ const ( SubSuccessMsg = "subscription success" // SubFailedMsg define subscription failed message SubFailedMsg = "subscription failed" + // SubSuccessMsg define cancel subscription success message + CancelSubSuccessMsg = "cancel subscription success" + // SubFailedMsg define cancel subscription failed message + CancelSubFailedMsg = "Cancel subscription failed" ) diff --git a/handler/real_time_data_monitor.go b/handler/real_time_data_monitor.go index 2f02ca9..be65fbb 100644 --- a/handler/real_time_data_monitor.go +++ b/handler/real_time_data_monitor.go @@ -29,9 +29,52 @@ func init() { // @Tags RealTime Component // @Accept json // @Produce json -// @Router /data/realtime [get] +// @Param request body network.MeasurementRecommendRequest true "查询输入参数,例如 'trans' 或 'transformfeeder1_220.'" +// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeMonitorPayload} "订阅实时数据结果列表" +// +// @Example 200 { +// "code": 200, +// "msg": "success", +// "payload": { +// "targets": [ +// { +// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", +// "code": "1001", +// "msg": "subscription success" +// }, +// { +// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms", +// "code": "1002", +// "msg": "subscription failed" +// } +// ] +// } +// } +// +// @Failure 400 {object} network.FailureResponse{payload=network.RealTimeMonitorPayload} "订阅实时数据结果列表" +// +// @Example 400 { +// "code": 400, +// "msg": "failed to get recommend data from redis", +// "payload": { +// "targets": [ +// { +// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms", +// "code": "1002", +// "msg": "subscription failed" +// }, +// { +// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms", +// "code": "1002", +// "msg": "subscription failed" +// } +// ] +// } +// } +// +// @Router /data/realtime [post] func RealTimeMonitorHandler(c *gin.Context) { - var request network.RealTimeQueryRequest + var request network.RealTimeSubRequest var monitorAction string var monitorID string @@ -77,6 +120,10 @@ func RealTimeMonitorHandler(c *gin.Context) { c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), + PayLoad: network.RealTimeSubPayload{ + MonitorID: monitorID, + TargetResults: results, + }, }) return } @@ -84,29 +131,72 @@ func RealTimeMonitorHandler(c *gin.Context) { c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", - PayLoad: network.RealTimeQueryPayload{ + PayLoad: network.RealTimeSubPayload{ + MonitorID: monitorID, TargetResults: results, }, }) return case constants.MonitorStopAction: - globalMonitorState.RemoveTargets(c, monitorID, request.Components) + results, err := globalMonitorState.RemoveTargets(c, monitorID, request.Components) + if err != nil { + logger.Error(c, "remove target to real time data monitor config failed", "error", err) + c.JSON(http.StatusOK, network.FailureResponse{ + Code: http.StatusBadRequest, + Msg: err.Error(), + PayLoad: network.RealTimeSubPayload{ + MonitorID: monitorID, + TargetResults: results, + }, + }) + return + } + + c.JSON(http.StatusOK, network.SuccessResponse{ + Code: http.StatusOK, + Msg: "success", + PayLoad: network.RealTimeSubPayload{ + MonitorID: monitorID, + TargetResults: results, + }, + }) + return case constants.MonitorAppendAction: - err := globalMonitorState.AppendTargets(monitorID, request.Components) + results, err := globalMonitorState.AppendTargets(c, tx, monitorID, request.Components) if err != nil { logger.Error(c, "append target to real time data monitor config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), + PayLoad: network.RealTimeSubPayload{ + MonitorID: monitorID, + TargetResults: results, + }, }) return } + + c.JSON(http.StatusOK, network.SuccessResponse{ + Code: http.StatusOK, + Msg: "success", + PayLoad: network.RealTimeSubPayload{ + MonitorID: monitorID, + TargetResults: results, + }, + }) + return default: - err := fmt.Errorf("%w: %s", constants.ErrUnsupportedAction, request.Action) + err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedAction, request.Action) logger.Error(c, "unsupported action of real time data monitor request", "error", err) + requestTargetsCount := processRealTimeRequestCount(request.Components) + results := processRealTimeRequestTargets(request.Components, requestTargetsCount, err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), + PayLoad: network.RealTimeSubPayload{ + MonitorID: monitorID, + TargetResults: results, + }, }) return } @@ -114,7 +204,8 @@ func RealTimeMonitorHandler(c *gin.Context) { // RealTimeMonitorComponent define struct of real time monitor component type RealTimeMonitorComponent struct { - targets []string + targets []string + targetParam map[string]int } // RealTimeMonitorConfig define struct of real time monitor config @@ -141,8 +232,13 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni s.mutex.Lock() defer s.mutex.Unlock() + requestTargetsCount := processRealTimeRequestCount(components) + targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) + if _, exist := s.monitorMap[monitorID]; exist { - return nil, fmt.Errorf("monitorID %s already exists. Use AppendTargets to modify existing config", monitorID) + err := fmt.Errorf("monitorID %s already exists. Use AppendTargets to modify existing config", monitorID) + logger.Error(ctx, "monitorID already exists. Use AppendTargets to modify existing config", "error", err) + return processRealTimeRequestTargets(components, requestTargetsCount, err), err } config := &RealTimeMonitorConfig{ @@ -150,15 +246,13 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni components: make(map[string]*RealTimeMonitorComponent), } - targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components)) for _, componentItem := range components { interval := componentItem.Interval for _, target := range componentItem.Targets { - targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) - var targetResult network.TargetResult targetResult.ID = target + targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) if err != nil { logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) targetResult.Code = constants.SubFailedCode @@ -169,16 +263,19 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni targetResult.Code = constants.SubSuccessCode targetResult.Msg = constants.SubSuccessMsg - if _, ok := config.components[componentItem.Interval]; !ok { + targetProcessResults = append(targetProcessResults, targetResult) + if comp, ok := config.components[interval]; !ok { targets := make([]string, 0, len(componentItem.Targets)) + targetParam := make(map[string]int) + targetParam[target] = targetModel.GetMeasurementInfo().Size config.components[interval] = &RealTimeMonitorComponent{ - targets: append(targets, target), + targets: append(targets, target), + targetParam: targetParam, } } else { - component := config.components[interval] - component.targets = append(component.targets, target) + comp.targets = append(comp.targets, target) + comp.targetParam[target] = targetModel.GetMeasurementInfo().Size } - fmt.Println(targetModel.GetMeasurementInfo().Size) } } @@ -187,62 +284,137 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni } // AppendTargets define function to append targets in SharedMonitorState -// TODO 增加targetsResults的返回 -func (s *SharedMonitorState) AppendTargets(monitorID string, components []network.RealTimeComponentItem) error { +func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { s.mutex.Lock() defer s.mutex.Unlock() + requestTargetsCount := processRealTimeRequestCount(components) + targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) + config, exist := s.monitorMap[monitorID] if !exist { - return fmt.Errorf("monitorID %s not found. Use CreateConfig to start a new config", monitorID) + err := fmt.Errorf("monitorID %s not found. Use CreateConfig to start a new config", monitorID) + logger.Error(ctx, "monitorID not found. Use CreateConfig to start a new config", "error", err) + return processRealTimeRequestTargets(components, requestTargetsCount, err), err } - for _, compent := range components { - interval := compent.Interval - comp, compExist := config.components[interval] - if !compExist { - comp = &RealTimeMonitorComponent{ - targets: compent.Targets, + for _, componentItem := range components { + interval := componentItem.Interval + for _, target := range componentItem.Targets { + var targetResult network.TargetResult + targetResult.ID = target + + targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) + if err != nil { + logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) + targetResult.Code = constants.SubFailedCode + targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) + targetProcessResults = append(targetProcessResults, targetResult) + continue + } + + targetResult.Code = constants.SubSuccessCode + targetResult.Msg = constants.SubSuccessMsg + targetProcessResults = append(targetProcessResults, targetResult) + if comp, ok := config.components[interval]; !ok { + targets := make([]string, 0, len(componentItem.Targets)) + targetParam := make(map[string]int) + targetParam[target] = targetModel.GetMeasurementInfo().Size + config.components[interval] = &RealTimeMonitorComponent{ + targets: append(targets, target), + } + } else { + comp.targets = append(comp.targets, target) + comp.targetParam[target] = targetModel.GetMeasurementInfo().Size } - config.components[interval] = comp - } else { - comp.targets = append(comp.targets, compent.Targets...) } } - config.noticeChan <- struct{}{} - return nil + return targetProcessResults, nil } // UpsertTargets define function to upsert targets in SharedMonitorState -// TODO 增加targetsResults的返回 -func (s *SharedMonitorState) UpsertTargets(monitorID string, interval string, newTargets []string) (isNewMonitor bool, err error) { +func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { s.mutex.Lock() defer s.mutex.Unlock() config, exist := s.monitorMap[monitorID] if !exist { + // create new config config = &RealTimeMonitorConfig{ noticeChan: make(chan struct{}), components: make(map[string]*RealTimeMonitorComponent), } - config.components[interval] = &RealTimeMonitorComponent{ - targets: newTargets, + + targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components)) + for _, componentItem := range components { + interval := componentItem.Interval + for _, target := range componentItem.Targets { + var targetResult network.TargetResult + targetResult.ID = target + + targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) + if err != nil { + logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) + targetResult.Code = constants.SubFailedCode + targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) + targetProcessResults = append(targetProcessResults, targetResult) + continue + } + + targetResult.Code = constants.SubSuccessCode + targetResult.Msg = constants.SubSuccessMsg + if comp, ok := config.components[interval]; !ok { + targets := make([]string, 0, len(componentItem.Targets)) + targetParam := make(map[string]int) + targetParam[target] = targetModel.GetMeasurementInfo().Size + config.components[interval] = &RealTimeMonitorComponent{ + targets: append(targets, target), + } + } else { + comp.targets = append(comp.targets, target) + comp.targetParam[target] = targetModel.GetMeasurementInfo().Size + } + } } s.monitorMap[monitorID] = config - return true, nil + return targetProcessResults, nil } - comp, compExist := config.components[interval] - if !compExist { - comp = &RealTimeMonitorComponent{ - targets: newTargets, + targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components)) + for _, componentItem := range components { + interval := componentItem.Interval + for _, target := range componentItem.Targets { + var targetResult network.TargetResult + targetResult.ID = target + + targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) + if err != nil { + logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) + targetResult.Code = constants.SubFailedCode + targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) + targetProcessResults = append(targetProcessResults, targetResult) + continue + } + + targetResult.Code = constants.SubSuccessCode + targetResult.Msg = constants.SubSuccessMsg + targetProcessResults = append(targetProcessResults, targetResult) + if comp, ok := config.components[interval]; !ok { + targets := make([]string, 0, len(componentItem.Targets)) + targetParam := make(map[string]int) + targetParam[target] = targetModel.GetMeasurementInfo().Size + config.components[interval] = &RealTimeMonitorComponent{ + targets: append(targets, target), + } + } else { + comp.targets = append(comp.targets, target) + comp.targetParam[target] = targetModel.GetMeasurementInfo().Size + } } - config.components[interval] = comp - } else { - comp.targets = append(comp.targets, newTargets...) } - return false, nil + config.noticeChan <- struct{}{} + return targetProcessResults, nil } // Get define function to get value from SharedMonitorState @@ -263,21 +435,35 @@ func (s *SharedMonitorState) Get(monitorID, interval string) ([]string, bool) { } // RemoveTargets define function to remove targets in SharedMonitorState -// TODO 增加targetsResults的返回 -func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string, components []network.RealTimeComponentItem) error { +func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { s.mutex.Lock() defer s.mutex.Unlock() + requestTargetsCount := processRealTimeRequestCount(components) + targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) + config, exist := s.monitorMap[monitorID] if !exist { - return fmt.Errorf("monitorID %s not found", monitorID) + err := fmt.Errorf("monitorID %s not found", monitorID) + logger.Error(ctx, "monitorID not found in remove targets operation", "error", err) + return processRealTimeRequestTargets(components, requestTargetsCount, err), err } + // components is the list of items to be removed passed in the request for _, compent := range components { interval := compent.Interval + // comp is the locally running listener configuration comp, compExist := config.components[interval] if !compExist { logger.Error(ctx, fmt.Sprintf("component with interval %s not found under monitorID %s", interval, monitorID), "monitorID", monitorID, "interval", interval) + for _, target := range comp.targets { + targetResult := network.TargetResult{ + ID: target, + Code: constants.CancelSubFailedCode, + Msg: constants.CancelSubFailedMsg, + } + targetProcessResults = append(targetProcessResults, targetResult) + } continue } @@ -290,6 +476,15 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string for _, existingTarget := range comp.targets { if _, found := targetsToRemoveMap[existingTarget]; !found { newTargets = append(newTargets, existingTarget) + } else { + targetResult := network.TargetResult{ + ID: existingTarget, + Code: constants.CancelSubSuccessCode, + Msg: constants.CancelSubSuccessMsg, + } + targetProcessResults = append(targetProcessResults, targetResult) + delete(targetsToRemoveMap, existingTarget) + delete(comp.targetParam, existingTarget) } } comp.targets = newTargets @@ -297,14 +492,26 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string if len(comp.targets) == 0 { delete(config.components, interval) } - } - if len(config.components) == 0 { - delete(s.monitorMap, monitorID) + if len(config.components) == 0 { + delete(s.monitorMap, monitorID) + } + + if len(targetsToRemoveMap) > 0 { + err := fmt.Errorf("target remove were not found under monitorID %s and interval %s", monitorID, interval) + for target := range targetsToRemoveMap { + targetResult := network.TargetResult{ + ID: target, + Code: constants.CancelSubFailedCode, + Msg: fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()), + } + targetProcessResults = append(targetProcessResults, targetResult) + } + } } config.noticeChan <- struct{}{} - return nil + return targetProcessResults, nil } func processRealTimeRequestCount(components []network.RealTimeComponentItem) int { @@ -314,3 +521,17 @@ func processRealTimeRequestCount(components []network.RealTimeComponentItem) int } return totalTargetsCount } + +func processRealTimeRequestTargets(components []network.RealTimeComponentItem, targetCount int, err error) []network.TargetResult { + targetProcessResults := make([]network.TargetResult, 0, targetCount) + for _, componentItem := range components { + for _, target := range componentItem.Targets { + var targetResult network.TargetResult + targetResult.ID = target + targetResult.Code = constants.SubFailedCode + targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) + targetProcessResults = append(targetProcessResults, targetResult) + } + } + return targetProcessResults +} diff --git a/network/real_time_data_query_request.go b/network/real_time_data_request.go similarity index 66% rename from network/real_time_data_query_request.go rename to network/real_time_data_request.go index b5fa13e..13046c0 100644 --- a/network/real_time_data_query_request.go +++ b/network/real_time_data_request.go @@ -13,6 +13,16 @@ type RealTimeQueryRequest struct { Components []RealTimeComponentItem `json:"components" description:"定义不同的数据采集策略和目标"` } +// RealTimeSubRequest define struct of real time data subscription request +type RealTimeSubRequest struct { + // required: true + // enum: [start, stop] + Action string `json:"action" example:"start" description:"请求的操作,例如 start/stop"` + MonitorID string `json:"monitor_id" example:"5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" description:"用于标识不同client的监控请求ID"` + // required: true + Components []RealTimeComponentItem `json:"components" description:"定义不同的数据采集策略和目标"` +} + // RealTimeComponentItem define struct of real time component item type RealTimeComponentItem struct { Interval string `json:"interval" example:"1" description:"数据采集的时间间隔(秒)"` diff --git a/network/response.go b/network/response.go index 7edd99f..394b62c 100644 --- a/network/response.go +++ b/network/response.go @@ -22,14 +22,15 @@ type MeasurementRecommendPayload struct { RecommendedList []string `json:"recommended_list" example:"[\"I_A_rms\", \"I_B_rms\",\"I_C_rms\"]"` } -// TargetResult define struct of target item in real time data query response payload +// TargetResult define struct of target item in real time data subscription response payload type TargetResult struct { ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"` Code string `json:"code" example:"1001"` Msg string `json:"msg" example:"subscription success"` } -// RealTimeQueryPayload define struct of real time data query request -type RealTimeQueryPayload struct { +// RealTimeSubPayload define struct of real time data subscription request +type RealTimeSubPayload struct { + MonitorID string `json:"monitor_id" example:"5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" description:"用于标识不同client的监控请求ID"` TargetResults []TargetResult `json:"targets"` }