From 984ee3003daddd972ccbcf8ddd06238855b60490 Mon Sep 17 00:00:00 2001 From: douxu Date: Thu, 13 Nov 2025 11:48:26 +0800 Subject: [PATCH] optimize variable naming and api swagger comment --- config/config.go | 12 ++-- constants/{monitor.go => subscription.go} | 20 +++--- handler/real_time_data_pull.go | 4 +- handler/real_time_data_subscription.go | 80 +++++++++++----------- real-time-data/real_time_data_computing.go | 10 ++- util/time.go | 10 +++ 6 files changed, 72 insertions(+), 64 deletions(-) rename constants/{monitor.go => subscription.go} (64%) diff --git a/config/config.go b/config/config.go index f8968e3..a476a7a 100644 --- a/config/config.go +++ b/config/config.go @@ -22,12 +22,12 @@ type ServiceConfig struct { // KafkaConfig define config struct of kafka config type KafkaConfig struct { - Servers string `mapstructure:"Servers"` - GroupID string `mapstructure:"group_id"` - Topic string `mapstructure:"topic"` - AutoOffsetReset string `mapstructure:"auto_offset_reset"` - EnableAutoCommit string `mapstructure:"enable_auto_commit"` - ReadMessageTimeDuration string `mapstructure:"read_message_time_duration"` + Servers string `mapstructure:"Servers"` + GroupID string `mapstructure:"group_id"` + Topic string `mapstructure:"topic"` + AutoOffsetReset string `mapstructure:"auto_offset_reset"` + EnableAutoCommit string `mapstructure:"enable_auto_commit"` + ReadMessageTimeDuration float32 `mapstructure:"read_message_time_duration"` } // PostgresConfig define config struct of postgres config diff --git a/constants/monitor.go b/constants/subscription.go similarity index 64% rename from constants/monitor.go rename to constants/subscription.go index 6d68fad..a71a069 100644 --- a/constants/monitor.go +++ b/constants/subscription.go @@ -2,12 +2,12 @@ package constants const ( - // MonitorStartAction define the real time monitor start action - MonitorStartAction string = "start" - // MonitorStopAction define the real time monitor stop action - MonitorStopAction string = "stop" - // MonitorAppendAction define the real time monitor append action - MonitorAppendAction string = "append" + // SubStartAction define the real time subscription start action + SubStartAction string = "start" + // SubStopAction define the real time subscription stop action + SubStopAction string = "stop" + // SubAppendAction define the real time subscription append action + SubAppendAction string = "append" ) // 定义状态常量 @@ -30,17 +30,17 @@ const ( // CancelSubSuccessMsg define cancel subscription success message CancelSubSuccessMsg = "cancel subscription success" // CancelSubFailedMsg define cancel subscription failed message - CancelSubFailedMsg = "Cancel subscription failed" + CancelSubFailedMsg = "cancel subscription failed" ) // TargetOperationType define constant to the target operation type type TargetOperationType int const ( - // OpAppend define append new target to the monitoring list + // OpAppend define append new target to the subscription list OpAppend TargetOperationType = iota - // OpRemove define remove exist target from the monitoring list + // OpRemove define remove exist target from the subscription list OpRemove - // OpUpdate define update exist target from the monitoring list + // OpUpdate define update exist target from the subscription list OpUpdate ) diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index 21dcc2b..ef47878 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -59,7 +59,7 @@ func PullRealTimeDataHandler(c *gin.Context) { // TODO[BACKPRESSURE-ISSUE] 先期使用固定大容量对扇入模型进行定义 #1 fanInChan := make(chan network.RealTimePullTarget, 10000) - go processTargetPolling(ctx, globalMonitorState, clientID, fanInChan) + go processTargetPolling(ctx, globalSubState, clientID, fanInChan) go readClientMessages(ctx, conn, clientID, cancel) bufferMaxSize := constants.SendMaxBatchSize @@ -156,7 +156,7 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s stopChanMap := make(map[string]chan struct{}) s.globalMutex.RLock() - config, confExist := s.monitorMap[clientID] + config, confExist := s.subMap[clientID] if !confExist { logger.Error(ctx, "can not found config into local stored map by clientID", "clientID", clientID) s.globalMutex.RUnlock() diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index 007c70d..b4d901a 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -19,20 +19,20 @@ import ( "gorm.io/gorm" ) -var globalMonitorState *SharedMonitorState +var globalSubState *SharedMonitorState func init() { - globalMonitorState = NewSharedMonitorState() + globalSubState = NewSharedMonitorState() } // RealTimeSubHandler define real time data subscriptions process API // @Summary 开始或结束订阅实时数据 -// @Description 根据用户输入的组件token,从 modelRT 服务中开始或结束对于实时数据的监控 +// @Description 根据用户输入的组件token,从 modelRT 服务中开始或结束对于量测节点的实时数据的订阅 // @Tags RealTime Component // @Accept json // @Produce json -// @Param request body network.MeasurementRecommendRequest true "查询输入参数,例如 'trans' 或 'transformfeeder1_220.'" -// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeMonitorPayload} "订阅实时数据结果列表" +// @Param request body network.RealTimeSubRequest true "量测节点实时数据订阅" +// @Success 200 {object} network.SuccessResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表" // // @Example 200 { // "code": 200, @@ -53,7 +53,7 @@ func init() { // } // } // -// @Failure 400 {object} network.FailureResponse{payload=network.RealTimeMonitorPayload} "订阅实时数据结果列表" +// @Failure 400 {object} network.FailureResponse{payload=network.RealTimeSubPayload} "订阅实时数据结果列表" // // @Example 400 { // "code": 400, @@ -77,7 +77,7 @@ func init() { // @Router /monitors/data/subscriptions [post] func RealTimeSubHandler(c *gin.Context) { var request network.RealTimeSubRequest - var monitorAction string + var subAction string var clientID string if err := c.ShouldBindJSON(&request); err != nil { @@ -89,11 +89,11 @@ func RealTimeSubHandler(c *gin.Context) { return } - if request.Action == constants.MonitorStartAction && request.ClientID == "" { - monitorAction = request.Action + if request.Action == constants.SubStartAction && request.ClientID == "" { + subAction = request.Action id, err := uuid.NewV4() if err != nil { - logger.Error(c, "failed to generate monitor id", "error", err) + logger.Error(c, "failed to generate client id", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), @@ -101,11 +101,11 @@ func RealTimeSubHandler(c *gin.Context) { return } clientID = id.String() - } else if request.Action == constants.MonitorStartAction && request.ClientID != "" { - monitorAction = constants.MonitorAppendAction + } else if request.Action == constants.SubStartAction && request.ClientID != "" { + subAction = constants.SubAppendAction clientID = request.ClientID - } else if request.Action == constants.MonitorStopAction && request.ClientID != "" { - monitorAction = request.Action + } else if request.Action == constants.SubStopAction && request.ClientID != "" { + subAction = request.Action clientID = request.ClientID } @@ -114,11 +114,11 @@ func RealTimeSubHandler(c *gin.Context) { tx := pgClient.Begin() defer tx.Commit() - switch monitorAction { - case constants.MonitorStartAction: - results, err := globalMonitorState.CreateConfig(c, tx, clientID, request.Components) + switch subAction { + case constants.SubStartAction: + results, err := globalSubState.CreateConfig(c, tx, clientID, request.Components) if err != nil { - logger.Error(c, "create real time data monitor config failed", "error", err) + logger.Error(c, "create real time data subscription config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), @@ -139,10 +139,10 @@ func RealTimeSubHandler(c *gin.Context) { }, }) return - case constants.MonitorStopAction: - results, err := globalMonitorState.RemoveTargets(c, clientID, request.Components) + case constants.SubStopAction: + results, err := globalSubState.RemoveTargets(c, clientID, request.Components) if err != nil { - logger.Error(c, "remove target to real time data monitor config failed", "error", err) + logger.Error(c, "remove target to real time data subscription config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), @@ -163,10 +163,10 @@ func RealTimeSubHandler(c *gin.Context) { }, }) return - case constants.MonitorAppendAction: - results, err := globalMonitorState.AppendTargets(c, tx, clientID, request.Components) + case constants.SubAppendAction: + results, err := globalSubState.AppendTargets(c, tx, clientID, request.Components) if err != nil { - logger.Error(c, "append target to real time data monitor config failed", "error", err) + logger.Error(c, "append target to real time data subscription config failed", "error", err) c.JSON(http.StatusOK, network.FailureResponse{ Code: http.StatusBadRequest, Msg: err.Error(), @@ -189,7 +189,7 @@ func RealTimeSubHandler(c *gin.Context) { return default: err := fmt.Errorf("%w: request action is %s", constants.ErrUnsupportedAction, request.Action) - logger.Error(c, "unsupported action of real time data monitor request", "error", err) + logger.Error(c, "unsupported action of real time data subscription request", "error", err) requestTargetsCount := processRealTimeRequestCount(request.Components) results := processRealTimeRequestTargets(request.Components, requestTargetsCount, err) c.JSON(http.StatusOK, network.FailureResponse{ @@ -204,29 +204,29 @@ func RealTimeSubHandler(c *gin.Context) { } } -// RealTimeMonitorComponent define struct of real time monitor component +// RealTimeMonitorComponent define struct of real time subscription component type RealTimeMonitorComponent struct { targets []string targetParam map[string]*orm.Measurement } -// RealTimeMonitorConfig define struct of real time monitor config +// RealTimeMonitorConfig define struct of real time subscription config type RealTimeMonitorConfig struct { noticeChan chan *transportTargets mutex sync.RWMutex components map[string]*RealTimeMonitorComponent } -// SharedMonitorState define struct of shared monitor state with mutex +// SharedMonitorState define struct of shared subscription state with mutex type SharedMonitorState struct { - monitorMap map[string]*RealTimeMonitorConfig + subMap map[string]*RealTimeMonitorConfig globalMutex sync.RWMutex } // NewSharedMonitorState define function to create new SharedMonitorState func NewSharedMonitorState() *SharedMonitorState { return &SharedMonitorState{ - monitorMap: make(map[string]*RealTimeMonitorConfig), + subMap: make(map[string]*RealTimeMonitorConfig), } } @@ -292,7 +292,7 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clie targetProcessResults, newComponentsMap, _ := processAndValidateTargets(ctx, tx, components, requestTargetsCount) s.globalMutex.Lock() - if _, exist := s.monitorMap[clientID]; exist { + if _, exist := s.subMap[clientID]; exist { s.globalMutex.Unlock() err := fmt.Errorf("clientID %s already exists. use AppendTargets to modify existing config", clientID) logger.Error(ctx, "clientID already exists. use AppendTargets to modify existing config", "error", err) @@ -303,7 +303,7 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clie noticeChan: make(chan *transportTargets), components: newComponentsMap, // 直接使用预构建的 Map } - s.monitorMap[clientID] = config + s.subMap[clientID] = config s.globalMutex.Unlock() return targetProcessResults, nil } @@ -314,7 +314,7 @@ func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, cli targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) s.globalMutex.RLock() - config, exist := s.monitorMap[clientID] + config, exist := s.subMap[clientID] if !exist { s.globalMutex.RUnlock() err := fmt.Errorf("clientID %s not found. use CreateConfig to start a new config", clientID) @@ -386,7 +386,7 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, cli targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount) s.globalMutex.RLock() - config, exist := s.monitorMap[clientID] + config, exist := s.subMap[clientID] s.globalMutex.RUnlock() var opType constants.TargetOperationType @@ -398,12 +398,12 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, cli } else { opType = constants.OpAppend s.globalMutex.Lock() - if config, exist = s.monitorMap[clientID]; !exist { + if config, exist = s.subMap[clientID]; !exist { config = &RealTimeMonitorConfig{ noticeChan: make(chan *transportTargets), components: newComponentsMap, } - s.monitorMap[clientID] = config + s.subMap[clientID] = config } else { s.globalMutex.Unlock() config.mutex.Lock() @@ -429,7 +429,7 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, clientID string, targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) s.globalMutex.RLock() - config, exist := s.monitorMap[clientID] + config, exist := s.subMap[clientID] if !exist { s.globalMutex.RUnlock() err := fmt.Errorf("clientID %s not found", clientID) @@ -511,8 +511,8 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, clientID string, if shouldRemoveClient { s.globalMutex.Lock() - if currentConfig, exist := s.monitorMap[clientID]; exist && len(currentConfig.components) == 0 { - delete(s.monitorMap, clientID) + if currentConfig, exist := s.subMap[clientID]; exist && len(currentConfig.components) == 0 { + delete(s.subMap, clientID) } s.globalMutex.Unlock() } @@ -524,7 +524,7 @@ func (s *SharedMonitorState) Get(clientID string) (*RealTimeMonitorConfig, bool) s.globalMutex.RLock() defer s.globalMutex.RUnlock() - config, ok := s.monitorMap[clientID] + config, ok := s.subMap[clientID] if !ok { return nil, false } diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go index c345253..cf584a7 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_computing.go @@ -13,6 +13,7 @@ import ( "modelRT/logger" "modelRT/network" "modelRT/pool" + "modelRT/util" "github.com/confluentinc/confluent-kafka-go/kafka" ) @@ -25,8 +26,7 @@ func init() { } // ReceiveChan define func of real time data receive and process -func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration string) { - fmt.Println(topics, duration) +func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) { consumer, err := kafka.NewConsumer(consumerConfig) if err != nil { logger.Error(ctx, "create kafka consumer failed", "error", err) @@ -40,13 +40,11 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics [] return } - logger.Info(ctx, "start consuming from kafka", "topic", topics) - batchSize := 100 - batchTimeout := 2 * time.Second + batchTimeout := util.SecondsToDuration(duration) messages := make([]*kafka.Message, 0, batchSize) lastCommit := time.Now() - + logger.Info(ctx, "start consuming from kafka", "topic", topics) for { select { case <-ctx.Done(): diff --git a/util/time.go b/util/time.go index 0c800c8..f51b210 100644 --- a/util/time.go +++ b/util/time.go @@ -13,3 +13,13 @@ func GenNanoTsStr() string { timestampStr := strconv.FormatInt(nanoseconds, 10) return timestampStr } + +// Numeric define interface to constraints supporting integer and floating-point types +type Numeric interface { + int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | float32 | float64 +} + +// SecondsToDuration define func to convert Numeric type param to time duration +func SecondsToDuration[T Numeric](seconds T) time.Duration { + return time.Duration(seconds) * time.Second +}