diff --git a/database/query_measurement.go b/database/query_measurement.go index 69a93f7..aa51e89 100644 --- a/database/query_measurement.go +++ b/database/query_measurement.go @@ -46,3 +46,17 @@ func QueryMeasurementByToken(ctx context.Context, tx *gorm.DB, token string) (or } return component, nil } + +// GetAllMeasurements define func to query all measurement info from postgresDB +func GetAllMeasurements(ctx context.Context, tx *gorm.DB) ([]orm.Measurement, error) { + var measurements []orm.Measurement + + // ctx超时判断 + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + result := tx.WithContext(cancelCtx).Clauses(clause.Locking{Strength: "UPDATE"}).Find(&measurements) + if result.Error != nil { + return nil, result.Error + } + return measurements, nil +} diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index ef47878..3b8a955 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -149,8 +149,8 @@ func sendAggregateRealTimeDataStream(conn *websocket.Conn, targetsData []network return conn.WriteJSON(response) } -// processTargetPolling define function to process target in monitor map and data is continuously retrieved from redis based on the target -func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID string, fanInChan chan network.RealTimePullTarget) { +// processTargetPolling define function to process target in subscription map and data is continuously retrieved from redis based on the target +func processTargetPolling(ctx context.Context, s *SharedSubState, clientID string, fanInChan chan network.RealTimePullTarget) { // ensure the fanInChan will not leak defer close(fanInChan) @@ -224,7 +224,7 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s } // appendTargets starts new polling goroutines for targets that were just added -func appendTargets(ctx context.Context, config *RealTimeMonitorConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, appendTargets []string) { +func appendTargets(ctx context.Context, config *RealTimeSubConfig, stopChanMap map[string]chan struct{}, fanInChan chan network.RealTimePullTarget, appendTargets []string) { targetSet := make(map[string]struct{}, len(appendTargets)) for _, target := range appendTargets { targetSet[target] = struct{}{} diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index b4d901a..d8c32be 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -19,10 +19,10 @@ import ( "gorm.io/gorm" ) -var globalSubState *SharedMonitorState +var globalSubState *SharedSubState func init() { - globalSubState = NewSharedMonitorState() + globalSubState = NewSharedSubState() } // RealTimeSubHandler define real time data subscriptions process API @@ -204,40 +204,40 @@ func RealTimeSubHandler(c *gin.Context) { } } -// RealTimeMonitorComponent define struct of real time subscription component -type RealTimeMonitorComponent struct { +// RealTimeSubComponent define struct of real time subscription component +type RealTimeSubComponent struct { targets []string targetParam map[string]*orm.Measurement } -// RealTimeMonitorConfig define struct of real time subscription config -type RealTimeMonitorConfig struct { +// RealTimeSubConfig define struct of real time subscription config +type RealTimeSubConfig struct { noticeChan chan *transportTargets mutex sync.RWMutex - components map[string]*RealTimeMonitorComponent + components map[string]*RealTimeSubComponent } -// SharedMonitorState define struct of shared subscription state with mutex -type SharedMonitorState struct { - subMap map[string]*RealTimeMonitorConfig +// SharedSubState define struct of shared subscription state with mutex +type SharedSubState struct { + subMap map[string]*RealTimeSubConfig globalMutex sync.RWMutex } -// NewSharedMonitorState define function to create new SharedMonitorState -func NewSharedMonitorState() *SharedMonitorState { - return &SharedMonitorState{ - subMap: make(map[string]*RealTimeMonitorConfig), +// NewSharedSubState define function to create new SharedSubState +func NewSharedSubState() *SharedSubState { + return &SharedSubState{ + subMap: make(map[string]*RealTimeSubConfig), } } // processAndValidateTargets define func to perform all database I/O operations in a lock-free state (eg,ParseDataIdentifierToken) func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []network.RealTimeComponentItem, allReqTargetNum int) ( []network.TargetResult, - map[string]*RealTimeMonitorComponent, + map[string]*RealTimeSubComponent, []string, ) { targetProcessResults := make([]network.TargetResult, 0, allReqTargetNum) - newComponentsMap := make(map[string]*RealTimeMonitorComponent) + newComponentsMap := make(map[string]*RealTimeSubComponent) successfulTargets := make([]string, 0, allReqTargetNum) for _, componentItem := range components { @@ -260,7 +260,7 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []ne successfulTargets = append(successfulTargets, target) if _, ok := newComponentsMap[interval]; !ok { - newComponentsMap[interval] = &RealTimeMonitorComponent{ + newComponentsMap[interval] = &RealTimeSubComponent{ targets: make([]string, 0, len(componentItem.Targets)), targetParam: make(map[string]*orm.Measurement), } @@ -275,7 +275,7 @@ func processAndValidateTargets(ctx context.Context, tx *gorm.DB, components []ne } // mergeComponents define func to merge newComponentsMap into existingComponentsMap -func mergeComponents(existingComponents map[string]*RealTimeMonitorComponent, newComponents map[string]*RealTimeMonitorComponent) { +func mergeComponents(existingComponents map[string]*RealTimeSubComponent, newComponents map[string]*RealTimeSubComponent) { for interval, newComp := range newComponents { if existingComp, ok := existingComponents[interval]; ok { existingComp.targets = append(existingComp.targets, newComp.targets...) @@ -286,8 +286,8 @@ func mergeComponents(existingComponents map[string]*RealTimeMonitorComponent, ne } } -// CreateConfig define function to create config in SharedMonitorState -func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { +// CreateConfig define function to create config in SharedSubState +func (s *SharedSubState) CreateConfig(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults, newComponentsMap, _ := processAndValidateTargets(ctx, tx, components, requestTargetsCount) @@ -299,7 +299,7 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clie return targetProcessResults, err } - config := &RealTimeMonitorConfig{ + config := &RealTimeSubConfig{ noticeChan: make(chan *transportTargets), components: newComponentsMap, // 直接使用预构建的 Map } @@ -308,8 +308,8 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, clie return targetProcessResults, nil } -// AppendTargets define function to append targets in SharedMonitorState -func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { +// AppendTargets define function to append targets in SharedSubState +func (s *SharedSubState) AppendTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) @@ -364,7 +364,7 @@ func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, cli targets := make([]string, 0, len(componentItem.Targets)) targetParam := make(map[string]*orm.Measurement) targetParam[target] = targetModel.GetMeasurementInfo() - config.components[interval] = &RealTimeMonitorComponent{ + config.components[interval] = &RealTimeSubComponent{ targets: append(targets, target), } } else { @@ -380,8 +380,8 @@ func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, cli return targetProcessResults, nil } -// UpsertTargets define function to upsert targets in SharedMonitorState -func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { +// UpsertTargets define function to upsert targets in SharedSubState +func (s *SharedSubState) UpsertTargets(ctx context.Context, tx *gorm.DB, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults, newComponentsMap, successfulTargets := processAndValidateTargets(ctx, tx, components, requestTargetsCount) @@ -399,7 +399,7 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, cli opType = constants.OpAppend s.globalMutex.Lock() if config, exist = s.subMap[clientID]; !exist { - config = &RealTimeMonitorConfig{ + config = &RealTimeSubConfig{ noticeChan: make(chan *transportTargets), components: newComponentsMap, } @@ -423,8 +423,8 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, cli return targetProcessResults, nil } -// RemoveTargets define function to remove targets in SharedMonitorState -func (s *SharedMonitorState) RemoveTargets(ctx context.Context, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { +// RemoveTargets define function to remove targets in SharedSubState +func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, components []network.RealTimeComponentItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(components) targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) @@ -519,8 +519,8 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, clientID string, return targetProcessResults, nil } -// Get define function to get subscriptions config from SharedMonitorState -func (s *SharedMonitorState) Get(clientID string) (*RealTimeMonitorConfig, bool) { +// Get define function to get subscriptions config from SharedSubState +func (s *SharedSubState) Get(clientID string) (*RealTimeSubConfig, bool) { s.globalMutex.RLock() defer s.globalMutex.RUnlock() diff --git a/main.go b/main.go index b9872e0..f9f96f4 100644 --- a/main.go +++ b/main.go @@ -26,7 +26,6 @@ import ( "modelRT/router" "modelRT/util" - "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/gin-gonic/gin" "github.com/panjf2000/ants/v2" swaggerFiles "github.com/swaggo/files" @@ -146,19 +145,6 @@ func main() { } defer anchorRealTimePool.Release() - // TODO 配置文件中增加 kafka 配置 - // init cancel context - cancelCtx, cancel := context.WithCancel(ctx) - defer cancel() - customerConf := &kafka.ConfigMap{ - "bootstrap.servers": modelRTConfig.KafkaConfig.Servers, - "group.id": modelRTConfig.KafkaConfig.GroupID, - "auto.offset.reset": modelRTConfig.KafkaConfig.AutoOffsetReset, - "enable.auto.commit": modelRTConfig.KafkaConfig.EnableAutoCommit, - } - - go realtimedata.ReceiveChan(cancelCtx, customerConf, []string{modelRTConfig.KafkaConfig.Topic}, modelRTConfig.KafkaConfig.ReadMessageTimeDuration) - postgresDBClient.Transaction(func(tx *gorm.DB) error { // load circuit diagram from postgres // componentTypeMap, err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool) @@ -167,7 +153,14 @@ func main() { // panic(err) // } - // TODO 暂时屏蔽完成 swagger 启动测试 + allMeasurement, err := database.GetAllMeasurements(ctx, tx) + if err != nil { + logger.Error(ctx, "load topologic info from postgres failed", "error", err) + panic(err) + } + + go realtimedata.StartRealTimeDataComputing(ctx, allMeasurement) + tree, err := database.QueryTopologicFromDB(ctx, tx) if err != nil { logger.Error(ctx, "load topologic info from postgres failed", "error", err) diff --git a/real-time-data/compute_state_manager.go b/real-time-data/compute_state_manager.go new file mode 100644 index 0000000..3e748b8 --- /dev/null +++ b/real-time-data/compute_state_manager.go @@ -0,0 +1,63 @@ +// Package realtimedata define real time data operation functions +package realtimedata + +import "sync" + +// ComputeConfig define struct of measurement computation +type ComputeConfig struct { + Cause map[string]any + Action map[string]any + StopGchan chan struct{} +} + +// MeasComputeState define struct of manages the state of measurement computations using sync.Map +type MeasComputeState struct { + measMap sync.Map +} + +// NewMeasComputeState define func to create and returns a new instance of MeasComputeState +func NewMeasComputeState() *MeasComputeState { + return &MeasComputeState{} +} + +// Store define func to store a compute configuration for the specified key +func (m *MeasComputeState) Store(key string, config *ComputeConfig) { + m.measMap.Store(key, config) +} + +// Load define func to retrieve the compute configuration for the specified key +func (m *MeasComputeState) Load(key string) (*ComputeConfig, bool) { + value, ok := m.measMap.Load(key) + if !ok { + return nil, false + } + return value.(*ComputeConfig), true +} + +// Delete define func to remove the compute configuration for the specified key +func (m *MeasComputeState) Delete(key string) { + m.measMap.Delete(key) +} + +// LoadOrStore define func to returns the existing compute configuration for the key if present,otherwise stores and returns the given configuration +func (m *MeasComputeState) LoadOrStore(key string, config *ComputeConfig) (*ComputeConfig, bool) { + value, loaded := m.measMap.LoadOrStore(key, config) + return value.(*ComputeConfig), loaded +} + +// Range define func to iterate over all key-configuration pairs in the map +func (m *MeasComputeState) Range(f func(key string, config *ComputeConfig) bool) { + m.measMap.Range(func(key, value any) bool { + return f(key.(string), value.(*ComputeConfig)) + }) +} + +// Len define func to return the number of compute configurations in the map +func (m *MeasComputeState) Len() int { + count := 0 + m.measMap.Range(func(_, _ any) bool { + count++ + return true + }) + return count +} diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go index cf584a7..883263f 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_computing.go @@ -12,20 +12,44 @@ import ( "modelRT/diagram" "modelRT/logger" "modelRT/network" + "modelRT/orm" "modelRT/pool" "modelRT/util" "github.com/confluentinc/confluent-kafka-go/kafka" ) -// RealTimeDataChan define channel of real time data receive -var RealTimeDataChan chan network.RealTimeDataReceiveRequest +var ( + // RealTimeDataChan define channel of real time data receive + RealTimeDataChan chan network.RealTimeDataReceiveRequest + globalComputeState *MeasComputeState +) func init() { RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100) + globalComputeState = NewMeasComputeState() } -// ReceiveChan define func of real time data receive and process +// StartRealTimeDataComputing define func to start real time data process goroutines by measurement info +func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) { + for _, measurement := range measurements { + enableValue, exist := measurement.EventPlan["enable"] + enable, ok := enableValue.(bool) + if !exist || !enable { + logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID) + continue + } + + if !ok { + logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue) + continue + } + + // TODO 启动协程准备查询 redis 数据进行计算 + } +} + +// ReceiveChan define func to real time data receive and process func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) { consumer, err := kafka.NewConsumer(consumerConfig) if err != nil { @@ -48,6 +72,7 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics [] for { select { case <-ctx.Done(): + logger.Info(ctx, "stop real time data computing by context cancel") return case realTimeData := <-RealTimeDataChan: componentUUID := realTimeData.PayLoad.ComponentUUID @@ -95,7 +120,7 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics [] msg, err := consumer.ReadMessage(batchTimeout) if err != nil { if err.(kafka.Error).Code() == kafka.ErrTimedOut { - // 超时时处理累积的消息 + // process accumulated messages when timeout if len(messages) > 0 { processMessageBatch(ctx, messages) consumer.Commit() @@ -103,13 +128,12 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics [] } continue } - logger.Error(ctx, "read message from kafka failed", "error", err) + logger.Error(ctx, "read message from kafka failed", "error", err, "msg", msg) continue } messages = append(messages, msg) - - // TODO 达到批处理大小或超时时间时处理消息 + // process messages when batch size or timeout period is reached if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout { processMessageBatch(ctx, messages) consumer.Commit() @@ -133,7 +157,7 @@ func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) { var realTimeData RealTimeData err := json.Unmarshal(msgValue, &realTimeData) if err != nil { - return nil, fmt.Errorf("json unmarshal failed: %w", err) + return nil, fmt.Errorf("unmarshal real time data failed: %w", err) } return &realTimeData, nil } @@ -194,13 +218,14 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) { } } +// processMessageBatch define func to bathc process kafka message func processMessageBatch(ctx context.Context, messages []*kafka.Message) { for _, msg := range messages { realTimeData, err := parseKafkaMessage(msg.Value) if err != nil { - logger.Error(ctx, "parse kafka message failed", "error", err) + logger.Error(ctx, "parse kafka message failed", "error", err, "msg", msg) continue } - processRealTimeData(ctx, realTimeData) + go processRealTimeData(ctx, realTimeData) } }