// Package handler provides HTTP handlers for various endpoints. package handler import ( "context" "fmt" "net/http" "sync" "modelRT/constants" "modelRT/database" "modelRT/logger" "modelRT/network" "github.com/gin-gonic/gin" "github.com/gofrs/uuid" "gorm.io/gorm" ) var globalMonitorState *SharedMonitorState func init() { globalMonitorState = NewSharedMonitorState() } // RealTimeMonitorHandler define real time data monitor process API // @Summary 开始或结束实时数据监控 // @Description 根据用户输入的组件token,从 modelRT 服务中开始或结束对于实时数据的监控 // @Tags RealTime Component // @Accept json // @Produce json // @Router /data/realtime [get] func RealTimeMonitorHandler(c *gin.Context) { var request network.RealTimeQueryRequest var monitorAction string var monitorID string if err := c.ShouldBindJSON(&request); err != nil { logger.Error(c, "failed to unmarshal real time query request", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), }) return } if request.Action == constants.MonitorStartAction && request.MonitorID == "" { monitorAction = request.Action id, err := uuid.NewV4() if err != nil { logger.Error(c, "failed to generate monitor id", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), }) return } monitorID = id.String() } else if request.Action == constants.MonitorStartAction && request.MonitorID != "" { monitorAction = constants.MonitorAppendAction monitorID = request.MonitorID } else if request.Action == constants.MonitorStopAction && request.MonitorID != "" { monitorAction = request.Action monitorID = request.MonitorID } pgClient := database.GetPostgresDBClient() // open transaction tx := pgClient.Begin() defer tx.Commit() switch monitorAction { case constants.MonitorStartAction: results, err := globalMonitorState.CreateConfig(c, tx, monitorID, request.Components) if err != nil { logger.Error(c, "create real time data monitor config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), }) return } c.JSON(http.StatusOK, network.SuccessResponse{ Code: http.StatusOK, Msg: "success", PayLoad: network.RealTimeQueryPayload{ TargetResults: results, }, }) return case constants.MonitorStopAction: globalMonitorState.RemoveTargets(c, monitorID, request.Components) case constants.MonitorAppendAction: err := globalMonitorState.AppendTargets(monitorID, request.Components) if err != nil { logger.Error(c, "append target to real time data monitor config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), }) return } default: err := fmt.Errorf("%w: %s", constants.ErrUnsupportedAction, request.Action) logger.Error(c, "unsupported action of real time data monitor request", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), }) return } } // RealTimeMonitorComponent define struct of real time monitor component type RealTimeMonitorComponent struct { targets []string } // RealTimeMonitorConfig define struct of real time monitor config type RealTimeMonitorConfig struct { noticeChan chan struct{} components map[string]*RealTimeMonitorComponent } // SharedMonitorState define struct of shared monitor state with mutex type SharedMonitorState struct { monitorMap map[string]*RealTimeMonitorConfig mutex sync.RWMutex } // NewSharedMonitorState define function to create new SharedMonitorState func NewSharedMonitorState() *SharedMonitorState { return &SharedMonitorState{ monitorMap: make(map[string]*RealTimeMonitorConfig), } } // CreateConfig define function to create config in SharedMonitorState func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, monitorID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { s.mutex.Lock() defer s.mutex.Unlock() if _, exist := s.monitorMap[monitorID]; exist { return nil, fmt.Errorf("monitorID %s already exists. Use AppendTargets to modify existing config", monitorID) } config := &RealTimeMonitorConfig{ noticeChan: make(chan struct{}), components: make(map[string]*RealTimeMonitorComponent), } targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components)) for _, componentItem := range components { interval := componentItem.Interval for _, target := range componentItem.Targets { targetModel, err := database.ParseDataIdentifierToken(ctx, tx, target) var targetResult network.TargetResult targetResult.ID = target if err != nil { logger.Error(ctx, "parse data indentity token failed", "error", err, "identity_token", target) targetResult.Code = constants.SubFailedCode targetResult.Msg = fmt.Sprintf("%s: %s", constants.SubFailedMsg, err.Error()) targetProcessResults = append(targetProcessResults, targetResult) continue } targetResult.Code = constants.SubSuccessCode targetResult.Msg = constants.SubSuccessMsg if _, ok := config.components[componentItem.Interval]; !ok { targets := make([]string, 0, len(componentItem.Targets)) config.components[interval] = &RealTimeMonitorComponent{ targets: append(targets, target), } } else { component := config.components[interval] component.targets = append(component.targets, target) } fmt.Println(targetModel.GetMeasurementInfo().Size) } } s.monitorMap[monitorID] = config return targetProcessResults, nil } // AppendTargets define function to append targets in SharedMonitorState // TODO 增加targetsResults的返回 func (s *SharedMonitorState) AppendTargets(monitorID string, components []network.RealTimeComponentItem) error { s.mutex.Lock() defer s.mutex.Unlock() config, exist := s.monitorMap[monitorID] if !exist { return fmt.Errorf("monitorID %s not found. Use CreateConfig to start a new config", monitorID) } for _, compent := range components { interval := compent.Interval comp, compExist := config.components[interval] if !compExist { comp = &RealTimeMonitorComponent{ targets: compent.Targets, } config.components[interval] = comp } else { comp.targets = append(comp.targets, compent.Targets...) } } config.noticeChan <- struct{}{} return nil } // UpsertTargets define function to upsert targets in SharedMonitorState // TODO 增加targetsResults的返回 func (s *SharedMonitorState) UpsertTargets(monitorID string, interval string, newTargets []string) (isNewMonitor bool, err error) { s.mutex.Lock() defer s.mutex.Unlock() config, exist := s.monitorMap[monitorID] if !exist { config = &RealTimeMonitorConfig{ noticeChan: make(chan struct{}), components: make(map[string]*RealTimeMonitorComponent), } config.components[interval] = &RealTimeMonitorComponent{ targets: newTargets, } s.monitorMap[monitorID] = config return true, nil } comp, compExist := config.components[interval] if !compExist { comp = &RealTimeMonitorComponent{ targets: newTargets, } config.components[interval] = comp } else { comp.targets = append(comp.targets, newTargets...) } return false, nil } // Get define function to get value from SharedMonitorState func (s *SharedMonitorState) Get(monitorID, interval string) ([]string, bool) { s.mutex.RLock() defer s.mutex.RUnlock() config, ok := s.monitorMap[monitorID] if !ok { return nil, false } component, ok := config.components[interval] if !ok { return nil, false } return component.targets, true } // RemoveTargets define function to remove targets in SharedMonitorState // TODO 增加targetsResults的返回 func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string, components []network.RealTimeComponentItem) error { s.mutex.Lock() defer s.mutex.Unlock() config, exist := s.monitorMap[monitorID] if !exist { return fmt.Errorf("monitorID %s not found", monitorID) } for _, compent := range components { interval := compent.Interval comp, compExist := config.components[interval] if !compExist { logger.Error(ctx, fmt.Sprintf("component with interval %s not found under monitorID %s", interval, monitorID), "monitorID", monitorID, "interval", interval) continue } targetsToRemoveMap := make(map[string]struct{}) for _, target := range compent.Targets { targetsToRemoveMap[target] = struct{}{} } var newTargets []string for _, existingTarget := range comp.targets { if _, found := targetsToRemoveMap[existingTarget]; !found { newTargets = append(newTargets, existingTarget) } } comp.targets = newTargets if len(comp.targets) == 0 { delete(config.components, interval) } } if len(config.components) == 0 { delete(s.monitorMap, monitorID) } config.noticeChan <- struct{}{} return nil } func processRealTimeRequestCount(components []network.RealTimeComponentItem) int { totalTargetsCount := 0 for _, compItem := range components { totalTargetsCount += len(compItem.Targets) } return totalTargetsCount }