modelRT/real-time-data/real_time_data_computing.go

230 lines
6.7 KiB
Go
Raw Normal View History

// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
2025-11-14 16:34:34 +08:00
"errors"
"fmt"
"time"
"modelRT/constants"
"modelRT/diagram"
"modelRT/logger"
2025-11-14 16:34:34 +08:00
"modelRT/model"
"modelRT/network"
"modelRT/orm"
"modelRT/util"
)
var (
// RealTimeDataChan define channel of real time data receive
RealTimeDataChan chan network.RealTimeDataReceiveRequest
globalComputeState *MeasComputeState
)
func init() {
RealTimeDataChan = make(chan network.RealTimeDataReceiveRequest, 100)
globalComputeState = NewMeasComputeState()
}
// StartRealTimeDataComputing define func to start real time data process goroutines by measurement info
func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurement) {
for _, measurement := range measurements {
enableValue, exist := measurement.EventPlan["enable"]
enable, ok := enableValue.(bool)
if !exist || !enable {
logger.Info(ctx, "measurement object do not need real time data computing", "measurement_uuid", measurement.ComponentUUID)
continue
}
if !ok {
logger.Error(ctx, "covert enable variable to boolean type failed", "measurement_uuid", measurement.ComponentUUID, "enable", enableValue)
continue
}
2025-11-14 16:34:34 +08:00
conf, err := initComputeConfig(measurement)
if err != nil {
logger.Error(ctx, "failed to initialize real time compute config", "measurement_uuid", measurement.ComponentUUID, "error", err)
continue
}
if conf == nil {
logger.Info(ctx, "measurement object is disabled or does not require real time computing", "measurement_uuid", measurement.ComponentUUID)
continue
}
uuidStr := measurement.ComponentUUID.String()
enrichedCtx := context.WithValue(ctx, constants.MeasurementUUIDKey, uuidStr)
conf.StopGchan = make(chan struct{})
globalComputeState.Store(uuidStr, conf)
logger.Info(ctx, "starting real time data computing for measurement", "measurement_uuid", measurement.ComponentUUID)
go continuousComputation(enrichedCtx, conf)
}
}
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
var err error
2025-11-14 16:34:34 +08:00
enableValue, exist := measurement.EventPlan["enable"]
enable, ok := enableValue.(bool)
if !exist {
return nil, nil
}
if !ok {
return nil, fmt.Errorf("field enable can not be converted to boolean, found type: %T", enableValue)
}
if !enable {
return nil, nil
}
conf := &ComputeConfig{}
causeValue, exist := measurement.EventPlan["cause"]
if !exist {
return nil, errors.New("missing required field cause")
}
cause, ok := causeValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
}
conf.Cause, err = processCauseMap(cause)
if err != nil {
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
}
2025-11-14 16:34:34 +08:00
actionValue, exist := measurement.EventPlan["action"]
if !exist {
return nil, errors.New("missing required field action")
}
action, ok := actionValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("field action can not be converted to map[string]any, found type: %T", actionValue)
}
conf.Action = action
queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource)
if err != nil {
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
}
conf.QueryKey = queryKey
conf.DataSize = int64(measurement.Size)
// TODO use constant values for temporary settings
conf.minBreachCount = constants.MinBreachCount
// TODO 后续优化 duration 创建方式
conf.Duration = 10
isFloatCause := false
if _, exists := conf.Cause["up"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["down"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["upup"]; exists {
isFloatCause = true
} else if _, exists := conf.Cause["downdown"]; exists {
isFloatCause = true
}
if isFloatCause {
// te config
teThresholds, err := parseTEThresholds(conf.Cause)
if err != nil {
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
}
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
} else {
// ti config
tiThresholds, err := parseTIThresholds(conf.Cause)
if err != nil {
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
}
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
}
2025-11-14 16:34:34 +08:00
return conf, nil
}
func processCauseMap(data map[string]any) (map[string]any, error) {
causeResult := make(map[string]any)
keysToExtract := []string{"up", "down", "upup", "downdown"}
var foundFloatKey bool
for _, key := range keysToExtract {
if value, exists := data[key]; exists {
foundFloatKey = true
// check value type
if floatVal, ok := value.(float64); ok {
causeResult[key] = floatVal
} else {
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
}
}
}
2026-02-06 17:45:59 +08:00
if foundFloatKey {
return causeResult, nil
}
edgeKey := "edge"
if value, exists := data[edgeKey]; exists {
if stringVal, ok := value.(string); ok {
switch stringVal {
case "raising":
fallthrough
case "falling":
causeResult[edgeKey] = stringVal
default:
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
}
} else {
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
}
} else {
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
}
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
}
2025-11-14 16:34:34 +08:00
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
client := diagram.NewRedisClient()
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
duration := util.SecondsToDuration(conf.Duration)
2025-11-14 16:34:34 +08:00
ticker := time.NewTicker(duration)
defer ticker.Stop()
2025-11-14 16:34:34 +08:00
for {
select {
case <-conf.StopGchan:
logger.Info(ctx, "continuous computing groutine stopped by local StopGchan", "uuid", uuid)
return
case <-ctx.Done():
logger.Info(ctx, "continuous computing goroutine stopped by parent context done signal")
return
case <-ticker.C:
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
members, err := client.QueryByZRange(queryCtx, conf.QueryKey, conf.DataSize)
cancel()
2025-11-14 16:34:34 +08:00
if err != nil {
logger.Error(ctx, "query real time data from redis failed", "key", conf.QueryKey, "error", err)
continue
}
realTimedatas := util.ConvertZSetMembersToFloat64(members)
if len(realTimedatas) == 0 {
logger.Info(ctx, "no real time data queried from redis, skip this computation cycle", "key", conf.QueryKey)
continue
}
if conf.Analyzer != nil {
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
2025-11-14 16:34:34 +08:00
} else {
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
2025-11-14 16:34:34 +08:00
}
}
}
}