// Package realtimedata define real time data operation functions package realtimedata import ( "context" "encoding/json" "errors" "fmt" "time" "modelRT/config" "modelRT/constants" "modelRT/diagram" "modelRT/logger" "modelRT/model" "modelRT/network" "modelRT/orm" "modelRT/pool" "modelRT/util" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( // RealTimeDataChan define channel of real time data receive RealTimeDataChan chan network.RealTimeDataReceiveRequest globalComputeState *MeasComputeState ) func init() { RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100) globalComputeState = NewMeasComputeState() } // StartRealTimeDataComputing define func to start real time data process goroutines by measurement info func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) { for _, measurement := range measurements { enableValue, exist := measurement.EventPlan["enable"] enable, ok := enableValue.(bool) if !exist || !enable { logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID) continue } if !ok { logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue) continue } conf, err := initComputeConfig(measurement) if err != nil { logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err) continue } if conf == nil { logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID) continue } uuidStr := measurement.ComponentUUID.String() enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr) conf.StopGchan = make(chan struct{}) globalComputeState.Store(uuidStr, conf) logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID) go continuousComputation(enrichedCtx, conf) } } func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) { enableValue, exist := measurement.EventPlan["enable"] enable, ok := enableValue.(bool) if !exist { return nil, nil } if !ok { return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue) } if !enable { return nil, nil } conf := &ComputeConfig{} causeValue, exist := measurement.EventPlan["cause"] if !exist { return nil, errors.New("missing required field cause") } cause, ok := causeValue.(map[string]any) if !ok { return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue) } conf.Cause = cause actionValue, exist := measurement.EventPlan["action"] if !exist { return nil, errors.New("missing required field action") } action, ok := actionValue.(map[string]any) if !ok { return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue) } conf.Action = action queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource) if err != nil { return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err) } conf.QueryKey = queryKey conf.DataSize = int64(measurement.Size) return conf, nil } func continuousComputation(ctx context.Context, conf *ComputeConfig) { client := diagram.NewRedisClient() uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string) // TODO duration 优化为配置项 duration := util.SecondsToDuration(1) ticker := time.NewTicker(duration) defer ticker.Stop() startTimestamp := util.GenNanoTsStr() for { select { case <-conf.StopGchan: logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid) return case <-ctx.Done(): logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal") return case <-ticker.C: stopTimestamp := util.GenNanoTsStr() members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize, startTimestamp, stopTimestamp) if err != nil { logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err) continue } startTimestamp = stopTimestamp // TODO 对 redis 数据进行分析 fmt.Println(members) } } } func processCauseMap(data map[string]any) { keysToExtract := []string{"up", "down", "upup", "downdown"} for _, key := range keysToExtract { if value, exists := data[key]; exists { // 检查类型是否为 float64 if floatVal, ok := value.(float64); ok { fmt.Printf("键 '%s' 存在且为 float64: %.2f\n", key, floatVal) } else { // 键存在,但类型不对,进行错误处理或转换尝试 fmt.Printf("键 '%s' 存在但类型错误: 期望 float64, 实际 %T\n", key, value) } } } edgeKey := "edge" if value, exists := data[edgeKey]; exists { if stringVal, ok := value.(string); ok { switch stringVal { case "raising": fmt.Println(" -> 识别到 edge: raising") case "falling": fmt.Println(" -> 识别到 edge: falling") default: fmt.Println(" -> 识别到其他 edge 值") } } else { fmt.Printf("键 '%s' 存在但类型错误: 期望 string, 实际 %T\n", edgeKey, value) } } else { fmt.Printf("键 '%s' 不存在\n", edgeKey) } } // ReceiveChan define func to real time data receive and process func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []string, duration float32) { consumer, err := kafka.NewConsumer(consumerConfig) if err != nil { logger.Error(ctx, "create kafka consumer failed", "error", err) return } defer consumer.Close() err = consumer.SubscribeTopics(topics, nil) if err != nil { logger.Error(ctx, "subscribe kafka topics failed", "topic", topics, "error", err) return } batchSize := 100 batchTimeout := util.SecondsToDuration(duration) messages := make([]*kafka.Message, 0, batchSize) lastCommit := time.Now() logger.Info(ctx, "start consuming from kafka", "topic", topics) for { select { case <-ctx.Done(): logger.Info(ctx, "stop real time data computing by context cancel") return case realTimeData := <-RealTimeDataChan: componentUUID := realTimeData.PayLoad.ComponentUUID component, err := diagram.GetComponentMap(componentUUID) if err != nil { logger.Error(ctx, "query component info from diagram map by componet id failed", "component_uuid", componentUUID, "error", err) continue } componentType := component.Type if componentType != constants.DemoType { logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID) continue } var anchorName string var compareValUpperLimit, compareValLowerLimit float64 var anchorRealTimeData []float64 var calculateFunc func(archorValue float64, args ...float64) float64 // calculateFunc, params := config.SelectAnchorCalculateFuncAndParams(componentType, anchorName, componentData) for _, param := range realTimeData.PayLoad.Values { anchorRealTimeData = append(anchorRealTimeData, param.Value) } anchorConfig := config.AnchorParamConfig{ AnchorParamBaseConfig: config.AnchorParamBaseConfig{ ComponentUUID: componentUUID, AnchorName: anchorName, CompareValUpperLimit: compareValUpperLimit, CompareValLowerLimit: compareValLowerLimit, AnchorRealTimeData: anchorRealTimeData, }, CalculateFunc: calculateFunc, CalculateParams: []float64{}, } anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID) if err != nil { logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err) continue } anchorChan <- anchorConfig default: msg, err := consumer.ReadMessage(batchTimeout) if err != nil { if err.(kafka.Error).Code() == kafka.ErrTimedOut { // process accumulated messages when timeout if len(messages) > 0 { processMessageBatch(ctx, messages) consumer.Commit() messages = messages[:0] } continue } logger.Error(ctx, "read message from kafka failed", "error", err, "msg", msg) continue } messages = append(messages, msg) // process messages when batch size or timeout period is reached if len(messages) >= batchSize || time.Since(lastCommit) >= batchTimeout { processMessageBatch(ctx, messages) consumer.Commit() messages = messages[:0] lastCommit = time.Now() } } } } type RealTimeDataPayload struct { ComponentUUID string Values []float64 } type RealTimeData struct { Payload RealTimeDataPayload } func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) { var realTimeData RealTimeData err := json.Unmarshal(msgValue, &realTimeData) if err != nil { return nil, fmt.Errorf("unmarshal real time data failed: %w", err) } return &realTimeData, nil } func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) { componentUUID := realTimeData.Payload.ComponentUUID component, err := diagram.GetComponentMap(componentUUID) if err != nil { logger.Error(ctx, "query component info from diagram map by component id failed", "component_uuid", componentUUID, "error", err) return } componentType := component.Type if componentType != constants.DemoType { logger.Error(ctx, "can not process real time data of component type not equal DemoType", "component_uuid", componentUUID) return } var anchorName string var compareValUpperLimit, compareValLowerLimit float64 var anchorRealTimeData []float64 var calculateFunc func(archorValue float64, args ...float64) float64 // 收集实时数据 for _, param := range realTimeData.Payload.Values { anchorRealTimeData = append(anchorRealTimeData, param) } anchorConfig := config.AnchorParamConfig{ AnchorParamBaseConfig: config.AnchorParamBaseConfig{ ComponentUUID: componentUUID, AnchorName: anchorName, CompareValUpperLimit: compareValUpperLimit, CompareValLowerLimit: compareValLowerLimit, AnchorRealTimeData: anchorRealTimeData, }, CalculateFunc: calculateFunc, CalculateParams: []float64{}, } anchorChan, err := pool.GetAnchorParamChan(ctx, componentUUID) if err != nil { logger.Error(ctx, "get anchor param chan failed", "component_uuid", componentUUID, "error", err) return } // TODO 使用select避免channel阻塞 select { case anchorChan <- anchorConfig: // 成功发送 case <-ctx.Done(): logger.Info(ctx, "context done while sending to anchor chan") case <-time.After(5 * time.Second): logger.Error(ctx, "timeout sending to anchor chan", "component_uuid", componentUUID) } } // processMessageBatch define func to bathc process kafka message func processMessageBatch(ctx context.Context, messages []*kafka.Message) { for _, msg := range messages { realTimeData, err := parseKafkaMessage(msg.Value) if err != nil { logger.Error(ctx, "parse kafka message failed", "error", err, "msg", msg) continue } go processRealTimeData(ctx, realTimeData) } }