diff --git a/constants/context.go b/constants/context.go new file mode 100644 index 0000000..dcac3c3 --- /dev/null +++ b/constants/context.go @@ -0,0 +1,7 @@ +// Package constants define constant variable +package constants + +type contextKey string + +// MeasurementUUIDKey define measurement uuid key into context +const MeasurementUUIDKey contextKey = "measurement_uuid" diff --git a/real-time-data/compute_state_manager.go b/real-time-data/compute_state_manager.go index 3e748b8..557429a 100644 --- a/real-time-data/compute_state_manager.go +++ b/real-time-data/compute_state_manager.go @@ -1,12 +1,18 @@ // Package realtimedata define real time data operation functions package realtimedata -import "sync" +import ( + "sync" +) // ComputeConfig define struct of measurement computation type ComputeConfig struct { - Cause map[string]any - Action map[string]any + Cause map[string]any + Action map[string]any + // TODO 预留自由调整的入口 + Duration int + DataSize int64 + QueryKey string StopGchan chan struct{} } diff --git a/real-time-data/real_time_data_computing.go b/real-time-data/real_time_data_computing.go index 883263f..7d92f41 100644 --- a/real-time-data/real_time_data_computing.go +++ b/real-time-data/real_time_data_computing.go @@ -4,6 +4,7 @@ package realtimedata import ( "context" "encoding/json" + "errors" "fmt" "time" @@ -11,6 +12,7 @@ import ( "modelRT/constants" "modelRT/diagram" "modelRT/logger" + "modelRT/model" "modelRT/network" "modelRT/orm" "modelRT/pool" @@ -45,7 +47,136 @@ func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurem continue } - // TODO 启动协程准备查询 redis 数据进行计算 + conf, err := initComputeConfig(measurement) + if err != nil { + logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err) + continue + } + + if conf == nil { + logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID) + continue + } + + uuidStr := measurement.ComponentUUID.String() + enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr) + conf.StopGchan = make(chan struct{}) + globalComputeState.Store(uuidStr, conf) + logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID) + go continuousComputation(enrichedCtx, conf) + } +} + +func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) { + enableValue, exist := measurement.EventPlan["enable"] + enable, ok := enableValue.(bool) + if !exist { + return nil, nil + } + + if !ok { + return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue) + } + + if !enable { + return nil, nil + } + + conf := &ComputeConfig{} + + causeValue, exist := measurement.EventPlan["cause"] + if !exist { + return nil, errors.New("missing required field cause") + } + + cause, ok := causeValue.(map[string]any) + if !ok { + return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue) + } + conf.Cause = cause + + actionValue, exist := measurement.EventPlan["action"] + if !exist { + return nil, errors.New("missing required field action") + } + action, ok := actionValue.(map[string]any) + if !ok { + return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue) + } + conf.Action = action + + queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource) + if err != nil { + return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err) + } + conf.QueryKey = queryKey + + conf.DataSize = int64(measurement.Size) + + return conf, nil +} + +func continuousComputation(ctx context.Context, conf *ComputeConfig) { + client := diagram.NewRedisClient() + uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string) + // TODO duration 优化为配置项 + duration := util.SecondsToDuration(1) + ticker := time.NewTicker(duration) + defer ticker.Stop() + startTimestamp := util.GenNanoTsStr() + for { + select { + case <-conf.StopGchan: + logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid) + return + case <-ctx.Done(): + logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal") + return + case <-ticker.C: + stopTimestamp := util.GenNanoTsStr() + members, err := client.QueryByZRangeByLex(ctx, conf.QueryKey, conf.DataSize, startTimestamp, stopTimestamp) + if err != nil { + logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err) + continue + } + startTimestamp = stopTimestamp + // TODO 对 redis 数据进行分析 + fmt.Println(members) + } + } +} + +func processCauseMap(data map[string]any) { + keysToExtract := []string{"up", "down", "upup", "downdown"} + + for _, key := range keysToExtract { + if value, exists := data[key]; exists { + // 检查类型是否为 float64 + if floatVal, ok := value.(float64); ok { + fmt.Printf("键 '%s' 存在且为 float64: %.2f\n", key, floatVal) + } else { + // 键存在,但类型不对,进行错误处理或转换尝试 + fmt.Printf("键 '%s' 存在但类型错误: 期望 float64, 实际 %T\n", key, value) + } + } + } + + edgeKey := "edge" + if value, exists := data[edgeKey]; exists { + if stringVal, ok := value.(string); ok { + switch stringVal { + case "raising": + fmt.Println(" -> 识别到 edge: raising") + case "falling": + fmt.Println(" -> 识别到 edge: falling") + default: + fmt.Println(" -> 识别到其他 edge 值") + } + } else { + fmt.Printf("键 '%s' 存在但类型错误: 期望 string, 实际 %T\n", edgeKey, value) + } + } else { + fmt.Printf("键 '%s' 不存在\n", edgeKey) } }