optimize update monitor config func of real time data query api

This commit is contained in:
douxu 2025-11-06 17:22:14 +08:00
parent b75358e676
commit 8090751914
4 changed files with 293 additions and 53 deletions

View File

@ -16,6 +16,10 @@ const (
SubSuccessCode = "1001" SubSuccessCode = "1001"
// SubSuccessCode define subscription failed code // SubSuccessCode define subscription failed code
SubFailedCode = "1002" SubFailedCode = "1002"
// CancelSubSuccessCode define cancel subscription success code
CancelSubSuccessCode = "1005"
// CancelSubFailedCode define cancel subscription failed code
CancelSubFailedCode = "1006"
) )
const ( const (
@ -23,4 +27,8 @@ const (
SubSuccessMsg = "subscription success" SubSuccessMsg = "subscription success"
// SubFailedMsg define subscription failed message // SubFailedMsg define subscription failed message
SubFailedMsg = "subscription failed" SubFailedMsg = "subscription failed"
// SubSuccessMsg define cancel subscription success message
CancelSubSuccessMsg = "cancel subscription success"
// SubFailedMsg define cancel subscription failed message
CancelSubFailedMsg = "Cancel subscription failed"
) )

View File

@ -29,9 +29,52 @@ func init() {
// @Tags RealTime Component // @Tags RealTime Component
// @Accept json // @Accept json
// @Produce json // @Produce json
// @Router /data/realtime [get] // @Param request body network.MeasurementRecommendRequest true "查询输入参数,例如 'trans' 或 'transformfeeder1_220.'"
// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeMonitorPayload} "订阅实时数据结果列表"
//
// @Example 200 {
// "code": 200,
// "msg": "success",
// "payload": {
// "targets": [
// {
// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms",
// "code": "1001",
// "msg": "subscription success"
// },
// {
// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms",
// "code": "1002",
// "msg": "subscription failed"
// }
// ]
// }
// }
//
// @Failure 400 {object} network.FailureResponse{payload=network.RealTimeMonitorPayload} "订阅实时数据结果列表"
//
// @Example 400 {
// "code": 400,
// "msg": "failed to get recommend data from redis",
// "payload": {
// "targets": [
// {
// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms",
// "code": "1002",
// "msg": "subscription failed"
// },
// {
// "id": "grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_B_rms",
// "code": "1002",
// "msg": "subscription failed"
// }
// ]
// }
// }
//
// @Router /data/realtime [post]
func RealTimeMonitorHandler(c *gin.Context) { func RealTimeMonitorHandler(c *gin.Context) {
var request network.RealTimeQueryRequest var request network.RealTimeSubRequest
var monitorAction string var monitorAction string
var monitorID string var monitorID string
@ -77,6 +120,10 @@ func RealTimeMonitorHandler(c *gin.Context) {
c.JSON(http.StatusOK, network.FailureResponse{ c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
Msg: err.Error(), Msg: err.Error(),
PayLoad: network.RealTimeSubPayload{
MonitorID: monitorID,
TargetResults: results,
},
}) })
return return
} }
@ -84,29 +131,72 @@ func RealTimeMonitorHandler(c *gin.Context) {
c.JSON(http.StatusOK, network.SuccessResponse{ c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK, Code: http.StatusOK,
Msg: "success", Msg: "success",
PayLoad: network.RealTimeQueryPayload{ PayLoad: network.RealTimeSubPayload{
MonitorID: monitorID,
TargetResults: results, TargetResults: results,
}, },
}) })
return return
case constants.MonitorStopAction: case constants.MonitorStopAction:
globalMonitorState.RemoveTargets(c, monitorID, request.Components) results, err := globalMonitorState.RemoveTargets(c, monitorID, request.Components)
if err != nil {
logger.Error(c, "remove target to real time data monitor config failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
PayLoad: network.RealTimeSubPayload{
MonitorID: monitorID,
TargetResults: results,
},
})
return
}
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
PayLoad: network.RealTimeSubPayload{
MonitorID: monitorID,
TargetResults: results,
},
})
return
case constants.MonitorAppendAction: case constants.MonitorAppendAction:
err := globalMonitorState.AppendTargets(monitorID, request.Components) results, err := globalMonitorState.AppendTargets(c, tx, monitorID, request.Components)
if err != nil { if err != nil {
logger.Error(c, "append target to real time data monitor config failed", "error", err) logger.Error(c, "append target to real time data monitor config failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{ c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
Msg: err.Error(), Msg: err.Error(),
PayLoad: network.RealTimeSubPayload{
MonitorID: monitorID,
TargetResults: results,
},
}) })
return return
} }
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
PayLoad: network.RealTimeSubPayload{
MonitorID: monitorID,
TargetResults: results,
},
})
return
default: default:
err := fmt.Errorf("%w: %s", constants.ErrUnsupportedAction, request.Action) err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedAction, request.Action)
logger.Error(c, "unsupported action of real time data monitor request", "error", err) logger.Error(c, "unsupported action of real time data monitor request", "error", err)
requestTargetsCount := processRealTimeRequestCount(request.Components)
results := processRealTimeRequestTargets(request.Components, requestTargetsCount, err)
c.JSON(http.StatusOK, network.FailureResponse{ c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
Msg: err.Error(), Msg: err.Error(),
PayLoad: network.RealTimeSubPayload{
MonitorID: monitorID,
TargetResults: results,
},
}) })
return return
} }
@ -114,7 +204,8 @@ func RealTimeMonitorHandler(c *gin.Context) {
// RealTimeMonitorComponent define struct of real time monitor component // RealTimeMonitorComponent define struct of real time monitor component
type RealTimeMonitorComponent struct { type RealTimeMonitorComponent struct {
targets []string targets []string
targetParam map[string]int
} }
// RealTimeMonitorConfig define struct of real time monitor config // RealTimeMonitorConfig define struct of real time monitor config
@ -141,8 +232,13 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
requestTargetsCount := processRealTimeRequestCount(components)
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
if _, exist := s.monitorMap[monitorID]; exist { if _, exist := s.monitorMap[monitorID]; exist {
return nil, fmt.Errorf("monitorID %s already exists. Use AppendTargets to modify existing config", monitorID) err := fmt.Errorf("monitorID %s already exists. Use AppendTargets to modify existing config", monitorID)
logger.Error(ctx, "monitorID already exists. Use AppendTargets to modify existing config", "error", err)
return processRealTimeRequestTargets(components, requestTargetsCount, err), err
} }
config := &RealTimeMonitorConfig{ config := &RealTimeMonitorConfig{
@ -150,15 +246,13 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni
components: make(map[string]*RealTimeMonitorComponent), components: make(map[string]*RealTimeMonitorComponent),
} }
targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components))
for _, componentItem := range components { for _, componentItem := range components {
interval := componentItem.Interval interval := componentItem.Interval
for _, target := range componentItem.Targets { for _, target := range componentItem.Targets {
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
var targetResult network.TargetResult var targetResult network.TargetResult
targetResult.ID = target targetResult.ID = target
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil { if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.SubFailedCode targetResult.Code = constants.SubFailedCode
@ -169,16 +263,19 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni
targetResult.Code = constants.SubSuccessCode targetResult.Code = constants.SubSuccessCode
targetResult.Msg = constants.SubSuccessMsg targetResult.Msg = constants.SubSuccessMsg
if _, ok := config.components[componentItem.Interval]; !ok { targetProcessResults = append(targetProcessResults, targetResult)
if comp, ok := config.components[interval]; !ok {
targets := make([]string, 0, len(componentItem.Targets)) targets := make([]string, 0, len(componentItem.Targets))
targetParam := make(map[string]int)
targetParam[target] = targetModel.GetMeasurementInfo().Size
config.components[interval] = &RealTimeMonitorComponent{ config.components[interval] = &RealTimeMonitorComponent{
targets: append(targets, target), targets: append(targets, target),
targetParam: targetParam,
} }
} else { } else {
component := config.components[interval] comp.targets = append(comp.targets, target)
component.targets = append(component.targets, target) comp.targetParam[target] = targetModel.GetMeasurementInfo().Size
} }
fmt.Println(targetModel.GetMeasurementInfo().Size)
} }
} }
@ -187,62 +284,137 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni
} }
// AppendTargets define function to append targets in SharedMonitorState // AppendTargets define function to append targets in SharedMonitorState
// TODO 增加targetsResults的返回 func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
func (s *SharedMonitorState) AppendTargets(monitorID string, components []network.RealTimeComponentItem) error {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
requestTargetsCount := processRealTimeRequestCount(components)
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
config, exist := s.monitorMap[monitorID] config, exist := s.monitorMap[monitorID]
if !exist { if !exist {
return fmt.Errorf("monitorID %s not found. Use CreateConfig to start a new config", monitorID) err := fmt.Errorf("monitorID %s not found. Use CreateConfig to start a new config", monitorID)
logger.Error(ctx, "monitorID not found. Use CreateConfig to start a new config", "error", err)
return processRealTimeRequestTargets(components, requestTargetsCount, err), err
} }
for _, compent := range components { for _, componentItem := range components {
interval := compent.Interval interval := componentItem.Interval
comp, compExist := config.components[interval] for _, target := range componentItem.Targets {
if !compExist { var targetResult network.TargetResult
comp = &RealTimeMonitorComponent{ targetResult.ID = target
targets: compent.Targets,
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.SubSuccessCode
targetResult.Msg = constants.SubSuccessMsg
targetProcessResults = append(targetProcessResults, targetResult)
if comp, ok := config.components[interval]; !ok {
targets := make([]string, 0, len(componentItem.Targets))
targetParam := make(map[string]int)
targetParam[target] = targetModel.GetMeasurementInfo().Size
config.components[interval] = &RealTimeMonitorComponent{
targets: append(targets, target),
}
} else {
comp.targets = append(comp.targets, target)
comp.targetParam[target] = targetModel.GetMeasurementInfo().Size
} }
config.components[interval] = comp
} else {
comp.targets = append(comp.targets, compent.Targets...)
} }
} }
config.noticeChan <- struct{}{} config.noticeChan <- struct{}{}
return nil return targetProcessResults, nil
} }
// UpsertTargets define function to upsert targets in SharedMonitorState // UpsertTargets define function to upsert targets in SharedMonitorState
// TODO 增加targetsResults的返回 func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
func (s *SharedMonitorState) UpsertTargets(monitorID string, interval string, newTargets []string) (isNewMonitor bool, err error) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
config, exist := s.monitorMap[monitorID] config, exist := s.monitorMap[monitorID]
if !exist { if !exist {
// create new config
config = &RealTimeMonitorConfig{ config = &RealTimeMonitorConfig{
noticeChan: make(chan struct{}), noticeChan: make(chan struct{}),
components: make(map[string]*RealTimeMonitorComponent), components: make(map[string]*RealTimeMonitorComponent),
} }
config.components[interval] = &RealTimeMonitorComponent{
targets: newTargets, targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components))
for _, componentItem := range components {
interval := componentItem.Interval
for _, target := range componentItem.Targets {
var targetResult network.TargetResult
targetResult.ID = target
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.SubSuccessCode
targetResult.Msg = constants.SubSuccessMsg
if comp, ok := config.components[interval]; !ok {
targets := make([]string, 0, len(componentItem.Targets))
targetParam := make(map[string]int)
targetParam[target] = targetModel.GetMeasurementInfo().Size
config.components[interval] = &RealTimeMonitorComponent{
targets: append(targets, target),
}
} else {
comp.targets = append(comp.targets, target)
comp.targetParam[target] = targetModel.GetMeasurementInfo().Size
}
}
} }
s.monitorMap[monitorID] = config s.monitorMap[monitorID] = config
return true, nil return targetProcessResults, nil
} }
comp, compExist := config.components[interval] targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components))
if !compExist { for _, componentItem := range components {
comp = &RealTimeMonitorComponent{ interval := componentItem.Interval
targets: newTargets, for _, target := range componentItem.Targets {
var targetResult network.TargetResult
targetResult.ID = target
targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target)
if err != nil {
logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target)
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
continue
}
targetResult.Code = constants.SubSuccessCode
targetResult.Msg = constants.SubSuccessMsg
targetProcessResults = append(targetProcessResults, targetResult)
if comp, ok := config.components[interval]; !ok {
targets := make([]string, 0, len(componentItem.Targets))
targetParam := make(map[string]int)
targetParam[target] = targetModel.GetMeasurementInfo().Size
config.components[interval] = &RealTimeMonitorComponent{
targets: append(targets, target),
}
} else {
comp.targets = append(comp.targets, target)
comp.targetParam[target] = targetModel.GetMeasurementInfo().Size
}
} }
config.components[interval] = comp
} else {
comp.targets = append(comp.targets, newTargets...)
} }
return false, nil config.noticeChan <- struct{}{}
return targetProcessResults, nil
} }
// Get define function to get value from SharedMonitorState // Get define function to get value from SharedMonitorState
@ -263,21 +435,35 @@ func (s *SharedMonitorState) Get(monitorID, interval string) ([]string, bool) {
} }
// RemoveTargets define function to remove targets in SharedMonitorState // RemoveTargets define function to remove targets in SharedMonitorState
// TODO 增加targetsResults的返回 func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) {
func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string, components []network.RealTimeComponentItem) error {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
requestTargetsCount := processRealTimeRequestCount(components)
targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount)
config, exist := s.monitorMap[monitorID] config, exist := s.monitorMap[monitorID]
if !exist { if !exist {
return fmt.Errorf("monitorID %s not found", monitorID) err := fmt.Errorf("monitorID %s not found", monitorID)
logger.Error(ctx, "monitorID not found in remove targets operation", "error", err)
return processRealTimeRequestTargets(components, requestTargetsCount, err), err
} }
// components is the list of items to be removed passed in the request
for _, compent := range components { for _, compent := range components {
interval := compent.Interval interval := compent.Interval
// comp is the locally running listener configuration
comp, compExist := config.components[interval] comp, compExist := config.components[interval]
if !compExist { if !compExist {
logger.Error(ctx, fmt.Sprintf("component with interval %s not found under monitorID %s", interval, monitorID), "monitorID", monitorID, "interval", interval) logger.Error(ctx, fmt.Sprintf("component with interval %s not found under monitorID %s", interval, monitorID), "monitorID", monitorID, "interval", interval)
for _, target := range comp.targets {
targetResult := network.TargetResult{
ID: target,
Code: constants.CancelSubFailedCode,
Msg: constants.CancelSubFailedMsg,
}
targetProcessResults = append(targetProcessResults, targetResult)
}
continue continue
} }
@ -290,6 +476,15 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string
for _, existingTarget := range comp.targets { for _, existingTarget := range comp.targets {
if _, found := targetsToRemoveMap[existingTarget]; !found { if _, found := targetsToRemoveMap[existingTarget]; !found {
newTargets = append(newTargets, existingTarget) newTargets = append(newTargets, existingTarget)
} else {
targetResult := network.TargetResult{
ID: existingTarget,
Code: constants.CancelSubSuccessCode,
Msg: constants.CancelSubSuccessMsg,
}
targetProcessResults = append(targetProcessResults, targetResult)
delete(targetsToRemoveMap, existingTarget)
delete(comp.targetParam, existingTarget)
} }
} }
comp.targets = newTargets comp.targets = newTargets
@ -297,14 +492,26 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string
if len(comp.targets) == 0 { if len(comp.targets) == 0 {
delete(config.components, interval) delete(config.components, interval)
} }
}
if len(config.components) == 0 { if len(config.components) == 0 {
delete(s.monitorMap, monitorID) delete(s.monitorMap, monitorID)
}
if len(targetsToRemoveMap) > 0 {
err := fmt.Errorf("target remove were not found under monitorID %s and interval %s", monitorID, interval)
for target := range targetsToRemoveMap {
targetResult := network.TargetResult{
ID: target,
Code: constants.CancelSubFailedCode,
Msg: fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()),
}
targetProcessResults = append(targetProcessResults, targetResult)
}
}
} }
config.noticeChan <- struct{}{} config.noticeChan <- struct{}{}
return nil return targetProcessResults, nil
} }
func processRealTimeRequestCount(components []network.RealTimeComponentItem) int { func processRealTimeRequestCount(components []network.RealTimeComponentItem) int {
@ -314,3 +521,17 @@ func processRealTimeRequestCount(components []network.RealTimeComponentItem) int
} }
return totalTargetsCount return totalTargetsCount
} }
func processRealTimeRequestTargets(components []network.RealTimeComponentItem, targetCount int, err error) []network.TargetResult {
targetProcessResults := make([]network.TargetResult, 0, targetCount)
for _, componentItem := range components {
for _, target := range componentItem.Targets {
var targetResult network.TargetResult
targetResult.ID = target
targetResult.Code = constants.SubFailedCode
targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error())
targetProcessResults = append(targetProcessResults, targetResult)
}
}
return targetProcessResults
}

View File

@ -13,6 +13,16 @@ type RealTimeQueryRequest struct {
Components []RealTimeComponentItem `json:"components" description:"定义不同的数据采集策略和目标"` Components []RealTimeComponentItem `json:"components" description:"定义不同的数据采集策略和目标"`
} }
// RealTimeSubRequest define struct of real time data subscription request
type RealTimeSubRequest struct {
// required: true
// enum: [start, stop]
Action string `json:"action" example:"start" description:"请求的操作,例如 start/stop"`
MonitorID string `json:"monitor_id" example:"5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" description:"用于标识不同client的监控请求ID"`
// required: true
Components []RealTimeComponentItem `json:"components" description:"定义不同的数据采集策略和目标"`
}
// RealTimeComponentItem define struct of real time component item // RealTimeComponentItem define struct of real time component item
type RealTimeComponentItem struct { type RealTimeComponentItem struct {
Interval string `json:"interval" example:"1" description:"数据采集的时间间隔(秒)"` Interval string `json:"interval" example:"1" description:"数据采集的时间间隔(秒)"`

View File

@ -22,14 +22,15 @@ type MeasurementRecommendPayload struct {
RecommendedList []string `json:"recommended_list" example:"[\"I_A_rms\", \"I_B_rms\",\"I_C_rms\"]"` RecommendedList []string `json:"recommended_list" example:"[\"I_A_rms\", \"I_B_rms\",\"I_C_rms\"]"`
} }
// TargetResult define struct of target item in real time data query response payload // TargetResult define struct of target item in real time data subscription response payload
type TargetResult struct { type TargetResult struct {
ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"` ID string `json:"id" example:"grid1.zone1.station1.ns1.tag1.transformfeeder1_220.I_A_rms"`
Code string `json:"code" example:"1001"` Code string `json:"code" example:"1001"`
Msg string `json:"msg" example:"subscription success"` Msg string `json:"msg" example:"subscription success"`
} }
// RealTimeQueryPayload define struct of real time data query request // RealTimeSubPayload define struct of real time data subscription request
type RealTimeQueryPayload struct { type RealTimeSubPayload struct {
MonitorID string `json:"monitor_id" example:"5d72f2d9-e33a-4f1b-9c76-88a44b9a953e" description:"用于标识不同client的监控请求ID"`
TargetResults []TargetResult `json:"targets"` TargetResults []TargetResult `json:"targets"`
} }