modelRT/handler/real_time_data_monitor.go

274 lines
7.5 KiB
Go
Raw Normal View History

2025-11-03 17:35:03 +08:00
// Package handler provides HTTP handlers for various endpoints.
package handler
import (
2025-11-04 17:12:15 +08:00
"context"
2025-11-03 17:35:03 +08:00
"fmt"
"net/http"
"sync"
"modelRT/constants"
"modelRT/logger"
"modelRT/network"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
)
var globalMonitorState *SharedMonitorState
func init() {
globalMonitorState = NewSharedMonitorState()
}
// RealTimeMonitorHandler define real time data monitor process API
// @Summary 开始或结束实时数据监控
// @Description 根据用户输入的组件token,从 modelRT 服务中开始或结束对于实时数据的监控
// @Tags RealTime Component
// @Accept json
// @Produce json
// @Router /data/realtime [get]
func RealTimeMonitorHandler(c *gin.Context) {
var request network.RealTimeQueryRequest
var monitorAction string
var monitorID string
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(c, "failed to unmarshal real time query request", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
if request.Action == constants.MonitorStartAction && request.MonitorID == "" {
monitorAction = request.Action
id, err := uuid.NewV4()
if err != nil {
logger.Error(c, "failed to generate monitor id", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
monitorID = id.String()
} else if request.Action == constants.MonitorStartAction && request.MonitorID != "" {
monitorAction = constants.MonitorAppendAction
monitorID = request.MonitorID
} else if request.Action == constants.MonitorStopAction && request.MonitorID != "" {
monitorAction = request.Action
monitorID = request.MonitorID
}
switch monitorAction {
case constants.MonitorStartAction:
2025-11-04 17:12:15 +08:00
err := globalMonitorState.CreateConfig(monitorID, request.Components)
if err != nil {
logger.Error(c, "create real time data monitor config failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
2025-11-03 17:35:03 +08:00
}
2025-11-04 17:12:15 +08:00
c.JSON(http.StatusOK, network.SuccessResponse{
Code: http.StatusOK,
Msg: "success",
PayLoad: map[string]interface{}{
"monitor_id": monitorID,
},
})
return
2025-11-03 17:35:03 +08:00
case constants.MonitorStopAction:
2025-11-04 17:12:15 +08:00
globalMonitorState.RemoveTargets(c, monitorID, request.Components)
2025-11-03 17:35:03 +08:00
case constants.MonitorAppendAction:
2025-11-04 17:12:15 +08:00
err := globalMonitorState.AppendTargets(monitorID, request.Components)
if err != nil {
logger.Error(c, "append target to real time data monitor config failed", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
2025-11-03 17:35:03 +08:00
}
default:
err := fmt.Errorf("%w: %s", constants.ErrUnsupportedAction, request.Action)
logger.Error(c, "unsupported action of real time data monitor request", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: err.Error(),
})
return
}
}
// RealTimeMonitorComponent define struct of real time monitor component
type RealTimeMonitorComponent struct {
2025-11-04 17:12:15 +08:00
targets []string
2025-11-03 17:35:03 +08:00
}
// RealTimeMonitorConfig define struct of real time monitor config
type RealTimeMonitorConfig struct {
noticeChan chan struct{}
components map[string]*RealTimeMonitorComponent
}
// SharedMonitorState define struct of shared monitor state with mutex
type SharedMonitorState struct {
monitorMap map[string]*RealTimeMonitorConfig
mutex sync.RWMutex
}
// NewSharedMonitorState define function to create new SharedMonitorState
func NewSharedMonitorState() *SharedMonitorState {
return &SharedMonitorState{
monitorMap: make(map[string]*RealTimeMonitorConfig),
}
}
2025-11-04 17:12:15 +08:00
// CreateConfig define function to create config in SharedMonitorState
func (s *SharedMonitorState) CreateConfig(monitorID string, components []network.RealTimeComponentItem) error {
2025-11-03 17:35:03 +08:00
s.mutex.Lock()
defer s.mutex.Unlock()
2025-11-04 17:12:15 +08:00
if _, exist := s.monitorMap[monitorID]; exist {
return fmt.Errorf("monitorID %s already exists. Use AppendTargets to modify existing config", monitorID)
}
config := &RealTimeMonitorConfig{
noticeChan: make(chan struct{}),
components: make(map[string]*RealTimeMonitorComponent),
}
for _, compent := range components {
config.components[compent.Interval] = &RealTimeMonitorComponent{
targets: compent.Targets,
}
}
s.monitorMap[monitorID] = config
return nil
2025-11-03 17:35:03 +08:00
}
2025-11-04 17:12:15 +08:00
// AppendTargets define function to append targets in SharedMonitorState
func (s *SharedMonitorState) AppendTargets(monitorID string, components []network.RealTimeComponentItem) error {
s.mutex.Lock()
defer s.mutex.Unlock()
config, exist := s.monitorMap[monitorID]
if !exist {
return fmt.Errorf("monitorID %s not found. Use CreateConfig to start a new config", monitorID)
}
for _, compent := range components {
interval := compent.Interval
comp, compExist := config.components[interval]
if !compExist {
comp = &RealTimeMonitorComponent{
targets: comp.targets,
}
config.components[interval] = comp
} else {
comp.targets = append(comp.targets, comp.targets...)
}
}
config.noticeChan <- struct{}{}
return nil
}
// UpsertTargets define function to upsert targets in SharedMonitorState
func (s *SharedMonitorState) UpsertTargets(monitorID string, interval string, newTargets []string) (isNewMonitor bool, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
config, exist := s.monitorMap[monitorID]
if !exist {
config = &RealTimeMonitorConfig{
noticeChan: make(chan struct{}),
components: make(map[string]*RealTimeMonitorComponent),
}
config.components[interval] = &RealTimeMonitorComponent{
targets: newTargets,
}
s.monitorMap[monitorID] = config
return true, nil
}
comp, compExist := config.components[interval]
if !compExist {
comp = &RealTimeMonitorComponent{
targets: newTargets,
}
config.components[interval] = comp
} else {
comp.targets = append(comp.targets, newTargets...)
}
return false, nil
}
// Get define function to get value from SharedMonitorState
2025-11-03 17:35:03 +08:00
func (s *SharedMonitorState) Get(monitorID, interval string) ([]string, bool) {
s.mutex.RLock()
defer s.mutex.RUnlock()
config, ok := s.monitorMap[monitorID]
if !ok {
return nil, false
}
component, ok := config.components[interval]
if !ok {
return nil, false
}
return component.targets, true
}
2025-11-04 17:12:15 +08:00
// RemoveTargets define function to remove targets in SharedMonitorState
func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string, components []network.RealTimeComponentItem) error {
s.mutex.Lock()
defer s.mutex.Unlock()
config, exist := s.monitorMap[monitorID]
if !exist {
return fmt.Errorf("monitorID %s not found", monitorID)
}
for _, compent := range components {
interval := compent.Interval
comp, compExist := config.components[interval]
if !compExist {
logger.Error(ctx, fmt.Sprintf("component with interval %s not found under monitorID %s", interval, monitorID), "monitorID", monitorID, "interval", interval)
continue
}
targetsToRemoveMap := make(map[string]struct{})
for _, target := range compent.Targets {
targetsToRemoveMap[target] = struct{}{}
}
var newTargets []string
for _, existingTarget := range comp.targets {
if _, found := targetsToRemoveMap[existingTarget]; !found {
newTargets = append(newTargets, existingTarget)
}
}
comp.targets = newTargets
if len(comp.targets) == 0 {
delete(config.components, interval)
}
}
if len(config.components) == 0 {
delete(s.monitorMap, monitorID)
}
config.noticeChan <- struct{}{}
return nil
}