optimize real time data analyze of continuousComputation func
This commit is contained in:
parent
8cbbfbd695
commit
9593c77c18
|
|
@ -0,0 +1,31 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
const (
|
||||
// TIBreachTriggerType define out of bounds type constant
|
||||
TIBreachTriggerType = "trigger"
|
||||
)
|
||||
|
||||
const (
|
||||
// TelemetryUpLimit define telemetry upper limit
|
||||
TelemetryUpLimit = "up"
|
||||
// TelemetryUpUpLimit define telemetry upper upper limit
|
||||
TelemetryUpUpLimit = "upup"
|
||||
|
||||
// TelemetryDownLimit define telemetry limit
|
||||
TelemetryDownLimit = "down"
|
||||
// TelemetryDownDownLimit define telemetry lower lower limit
|
||||
TelemetryDownDownLimit = "downdown"
|
||||
)
|
||||
|
||||
const (
|
||||
// TelesignalRaising define telesignal raising edge
|
||||
TelesignalRaising = "raising"
|
||||
// TelesignalFalling define telesignal falling edge
|
||||
TelesignalFalling = "falling"
|
||||
)
|
||||
|
||||
const (
|
||||
// MinBreachCount define min breach count of real time data
|
||||
MinBreachCount = 10
|
||||
)
|
||||
|
|
@ -0,0 +1,257 @@
|
|||
// Package realtimedata define real time data operation functions
|
||||
package realtimedata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"modelRT/constants"
|
||||
"modelRT/logger"
|
||||
)
|
||||
|
||||
// RealTimeAnalyzer 接口定义了实时数据分析和事件触发的通用方法
|
||||
type RealTimeAnalyzer interface {
|
||||
AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64)
|
||||
}
|
||||
|
||||
// teEventThresholds define struct of store the telemetry float point threshold parsed from conf field cause
|
||||
type teEventThresholds struct {
|
||||
up float64
|
||||
upup float64
|
||||
down float64
|
||||
downdown float64
|
||||
isFloatCause bool
|
||||
}
|
||||
|
||||
// parseTEThresholds define func to parse telemetry thresholds by casue map
|
||||
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
|
||||
t := teEventThresholds{}
|
||||
floatKeys := map[string]*float64{
|
||||
"upup": &t.upup,
|
||||
"up": &t.up,
|
||||
"down": &t.down,
|
||||
"downdown": &t.downdown,
|
||||
}
|
||||
|
||||
for key, ptr := range floatKeys {
|
||||
if value, exists := cause[key]; exists {
|
||||
if floatVal, ok := value.(float64); ok {
|
||||
*ptr = floatVal
|
||||
t.isFloatCause = true
|
||||
} else {
|
||||
return teEventThresholds{}, fmt.Errorf("key:%s type is incorrect. expected float64, actual %T", key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// quickly check mutual exclusion
|
||||
if _, exists := cause["edge"]; exists && t.isFloatCause {
|
||||
return teEventThresholds{}, errors.New("cause config error: 'up/down' keys and 'edge' key are mutually exclusive, but both found")
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// getTEBreachType define func to determine which type of out-of-limit the telemetry real time data belongs to
|
||||
func getTEBreachType(value float64, t teEventThresholds) string {
|
||||
if t.upup > 0 && value > t.upup {
|
||||
return constants.TelemetryUpUpLimit
|
||||
}
|
||||
if t.up > 0 && value > t.up {
|
||||
return constants.TelemetryUpLimit
|
||||
}
|
||||
if t.downdown > 0 && value < t.downdown {
|
||||
return constants.TelemetryDownDownLimit
|
||||
}
|
||||
if t.down > 0 && value < t.down {
|
||||
return constants.TelemetryDownLimit
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// TEAnalyzer define struct of store the thresholds required for telemetry and implements the analysis logic.
|
||||
type TEAnalyzer struct {
|
||||
Thresholds teEventThresholds
|
||||
}
|
||||
|
||||
// AnalyzeAndTriggerEvent 实现了 RealTimeAnalyzer 接口
|
||||
func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
|
||||
analyzeTEDataLogic(ctx, conf, t.Thresholds, realTimeValues)
|
||||
}
|
||||
|
||||
// 封装原 analyzeTEDataAndTriggerEvent 的核心逻辑
|
||||
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
|
||||
windowSize := conf.minBreachCount
|
||||
if windowSize <= 0 {
|
||||
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
||||
return
|
||||
}
|
||||
|
||||
// mark whether any events have been triggered in this batch
|
||||
var eventTriggered bool
|
||||
breachTriggers := map[string]bool{
|
||||
"up": false, "upup": false, "down": false, "downdown": false,
|
||||
}
|
||||
|
||||
// implement slide window to determine breach counts
|
||||
for i := 0; i <= len(realTimeValues)-windowSize; i++ {
|
||||
window := realTimeValues[i : i+windowSize]
|
||||
firstValueBreachType := getTEBreachType(window[0], thresholds)
|
||||
|
||||
if firstValueBreachType == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
allMatch := true
|
||||
for j := 1; j < windowSize; j++ {
|
||||
currentValueBreachType := getTEBreachType(window[j], thresholds)
|
||||
if currentValueBreachType != firstValueBreachType {
|
||||
allMatch = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if allMatch {
|
||||
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
|
||||
if !breachTriggers[firstValueBreachType] {
|
||||
// trigger event
|
||||
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1])
|
||||
|
||||
breachTriggers[firstValueBreachType] = true
|
||||
eventTriggered = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO 调用 EventRT 接口进行时间推送
|
||||
if eventTriggered {
|
||||
fmt.Println("--- 本次数据切片分析结束:已标记触发事件 ---")
|
||||
} else {
|
||||
fmt.Println("--- 本次数据切片分析结束:未检测到持续越限,不触发事件 ---")
|
||||
}
|
||||
}
|
||||
|
||||
// tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause
|
||||
type tiEventThresholds struct {
|
||||
edge string
|
||||
isFloatCause bool
|
||||
}
|
||||
|
||||
// parseTEThresholds define func to parse telesignal thresholds by casue map
|
||||
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
|
||||
edgeKey := "edge"
|
||||
t := tiEventThresholds{
|
||||
isFloatCause: false,
|
||||
}
|
||||
|
||||
if value, exists := cause[edgeKey]; exists {
|
||||
if strVal, ok := value.(string); ok {
|
||||
switch strVal {
|
||||
case "raising", "falling":
|
||||
t.edge = strVal
|
||||
return t, nil
|
||||
default:
|
||||
return tiEventThresholds{}, fmt.Errorf("key:%s value is incorrect, actual value %s. expected 'raising' or 'falling'", edgeKey, strVal)
|
||||
}
|
||||
} else {
|
||||
return tiEventThresholds{}, fmt.Errorf("key:%s already exists but type is incorrect. expected string, actual %T", edgeKey, value)
|
||||
}
|
||||
}
|
||||
|
||||
return tiEventThresholds{}, fmt.Errorf("cause map is invalid for telesignal: missing required key '%s'", edgeKey)
|
||||
}
|
||||
|
||||
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
|
||||
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
|
||||
if t.edge == constants.TelesignalRaising {
|
||||
if previousValue == 0.0 && currentValue == 1.0 {
|
||||
return constants.TIBreachTriggerType
|
||||
}
|
||||
} else if t.edge == constants.TelesignalFalling {
|
||||
if previousValue == 1.0 && currentValue == 0.0 {
|
||||
return constants.TIBreachTriggerType
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// TIAnalyzer define struct of store the thresholds required for remote signaling and implements the analysis logic
|
||||
type TIAnalyzer struct {
|
||||
Thresholds tiEventThresholds
|
||||
}
|
||||
|
||||
// AnalyzeAndTriggerEvent 实现了 RealTimeAnalyzer 接口
|
||||
func (t *TIAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
|
||||
analyzeTIDataLogic(ctx, conf, t.Thresholds, realTimeValues)
|
||||
}
|
||||
|
||||
// 封装原 analyzeTIDataAndTriggerEvent 的核心逻辑 (使用预计算优化版本)
|
||||
func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiEventThresholds, realTimeValues []float64) {
|
||||
windowSize := conf.minBreachCount
|
||||
if windowSize <= 0 {
|
||||
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
||||
return
|
||||
}
|
||||
|
||||
numDataPoints := len(realTimeValues)
|
||||
if numDataPoints < 2 {
|
||||
logger.Info(ctx, "data points less than 2, no change event possible, analysis skipped", "data_points", numDataPoints)
|
||||
return
|
||||
}
|
||||
|
||||
// pre calculate the change event type for all adjacent point pairs
|
||||
numChanges := numDataPoints - 1
|
||||
changeBreachTypes := make([]string, numChanges)
|
||||
|
||||
for i := range numChanges {
|
||||
previousValue := realTimeValues[i]
|
||||
currentValue := realTimeValues[i+1]
|
||||
|
||||
changeBreachTypes[i] = getTIBreachType(currentValue, previousValue, thresholds)
|
||||
}
|
||||
|
||||
if numChanges < windowSize {
|
||||
logger.Error(ctx, "number of change events is less than window size, analysis skipped", "num_changes", numChanges, "window_size", windowSize)
|
||||
return
|
||||
}
|
||||
|
||||
var eventTriggered bool
|
||||
breachTriggers := map[string]bool{
|
||||
constants.TIBreachTriggerType: false,
|
||||
}
|
||||
|
||||
for i := 0; i <= numChanges-windowSize; i++ {
|
||||
windowBreachTypes := changeBreachTypes[i : i+windowSize]
|
||||
firstBreachType := windowBreachTypes[0]
|
||||
|
||||
if firstBreachType == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
allMatch := true
|
||||
for j := 1; j < windowSize; j++ {
|
||||
if windowBreachTypes[j] != firstBreachType {
|
||||
allMatch = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if allMatch {
|
||||
if !breachTriggers[firstBreachType] {
|
||||
finalValueIndex := i + windowSize
|
||||
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstBreachType, "value", realTimeValues[finalValueIndex])
|
||||
|
||||
breachTriggers[firstBreachType] = true
|
||||
eventTriggered = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO 调用 EventRT 接口进行时间推送
|
||||
if eventTriggered {
|
||||
fmt.Println("--- 本次数据切片分析结束:已标记触发事件 ---")
|
||||
} else {
|
||||
fmt.Println("--- 本次数据切片分析结束:未检测到持续越限,不触发事件 ---")
|
||||
}
|
||||
}
|
||||
|
|
@ -10,10 +10,13 @@ type ComputeConfig struct {
|
|||
Cause map[string]any
|
||||
Action map[string]any
|
||||
// TODO 预留自由调整的入口
|
||||
// min consecutive breach count
|
||||
minBreachCount int
|
||||
Duration int
|
||||
DataSize int64
|
||||
QueryKey string
|
||||
StopGchan chan struct{}
|
||||
Analyzer RealTimeAnalyzer
|
||||
}
|
||||
|
||||
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
|
||||
|
|
|
|||
|
|
@ -68,6 +68,8 @@ func StartRealTimeDataComputing(ctx context.Context, measurements []orm.Measurem
|
|||
}
|
||||
|
||||
func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
||||
var err error
|
||||
|
||||
enableValue, exist := measurement.EventPlan["enable"]
|
||||
enable, ok := enableValue.(bool)
|
||||
if !exist {
|
||||
|
|
@ -93,7 +95,10 @@ func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
|||
if !ok {
|
||||
return nil, fmt.Errorf("field cause can not be converted to map[string]any, found type: %T", causeValue)
|
||||
}
|
||||
conf.Cause = cause
|
||||
conf.Cause, err = processCauseMap(cause)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse content of field cause failed:%w", err)
|
||||
}
|
||||
|
||||
actionValue, exist := measurement.EventPlan["action"]
|
||||
if !exist {
|
||||
|
|
@ -110,17 +115,87 @@ func initComputeConfig(measurement orm.Measurement) (*ComputeConfig, error) {
|
|||
return nil, fmt.Errorf("generate redis query key by datasource failed: %w", err)
|
||||
}
|
||||
conf.QueryKey = queryKey
|
||||
|
||||
conf.DataSize = int64(measurement.Size)
|
||||
// TODO use constant values for temporary settings
|
||||
conf.minBreachCount = constants.MinBreachCount
|
||||
|
||||
isFloatCause := false
|
||||
if _, exists := conf.Cause["up"]; exists {
|
||||
isFloatCause = true
|
||||
} else if _, exists := conf.Cause["down"]; exists {
|
||||
isFloatCause = true
|
||||
} else if _, exists := conf.Cause["upup"]; exists {
|
||||
isFloatCause = true
|
||||
} else if _, exists := conf.Cause["downdown"]; exists {
|
||||
isFloatCause = true
|
||||
}
|
||||
|
||||
if isFloatCause {
|
||||
// te config
|
||||
teThresholds, err := parseTEThresholds(conf.Cause)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse telemetry thresholds: %w", err)
|
||||
}
|
||||
conf.Analyzer = &TEAnalyzer{Thresholds: teThresholds}
|
||||
} else {
|
||||
// ti config
|
||||
tiThresholds, err := parseTIThresholds(conf.Cause)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse telesignal thresholds: %w", err)
|
||||
}
|
||||
conf.Analyzer = &TIAnalyzer{Thresholds: tiThresholds}
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
func processCauseMap(data map[string]any) (map[string]any, error) {
|
||||
causeResult := make(map[string]any)
|
||||
keysToExtract := []string{"up", "down", "upup", "downdown"}
|
||||
|
||||
var foundFloatKey bool
|
||||
for _, key := range keysToExtract {
|
||||
if value, exists := data[key]; exists {
|
||||
|
||||
foundFloatKey = true
|
||||
|
||||
// check value type
|
||||
if floatVal, ok := value.(float64); ok {
|
||||
causeResult[key] = floatVal
|
||||
} else {
|
||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected float64, actual %T", key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if foundFloatKey == true {
|
||||
return causeResult, nil
|
||||
}
|
||||
|
||||
edgeKey := "edge"
|
||||
if value, exists := data[edgeKey]; exists {
|
||||
if stringVal, ok := value.(string); ok {
|
||||
switch stringVal {
|
||||
case "raising":
|
||||
fallthrough
|
||||
case "falling":
|
||||
causeResult[edgeKey] = stringVal
|
||||
default:
|
||||
return nil, fmt.Errorf("key:%s value is incorrect,actual value %s", edgeKey, value)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("key:%s already exists but type is incorrect.expected string, actual %T", edgeKey, value)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("key:%s do not exists", edgeKey)
|
||||
}
|
||||
return nil, fmt.Errorf("cause map is invalid: missing required keys (%v) or '%s'", keysToExtract, edgeKey)
|
||||
}
|
||||
|
||||
func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
||||
client := diagram.NewRedisClient()
|
||||
uuid, _ := ctx.Value(constants.MeasurementUUIDKey).(string)
|
||||
// TODO duration 优化为配置项
|
||||
duration := util.SecondsToDuration(1)
|
||||
duration := util.SecondsToDuration(conf.Duration)
|
||||
ticker := time.NewTicker(duration)
|
||||
defer ticker.Stop()
|
||||
startTimestamp := util.GenNanoTsStr()
|
||||
|
|
@ -140,44 +215,15 @@ func continuousComputation(ctx context.Context, conf *ComputeConfig) {
|
|||
continue
|
||||
}
|
||||
startTimestamp = stopTimestamp
|
||||
// TODO 对 redis 数据进行分析
|
||||
fmt.Println(members)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processCauseMap(data map[string]any) {
|
||||
keysToExtract := []string{"up", "down", "upup", "downdown"}
|
||||
|
||||
for _, key := range keysToExtract {
|
||||
if value, exists := data[key]; exists {
|
||||
// 检查类型是否为 float64
|
||||
if floatVal, ok := value.(float64); ok {
|
||||
fmt.Printf("键 '%s' 存在且为 float64: %.2f\n", key, floatVal)
|
||||
realTimedatas := util.ConvertZSetMembersToFloat64(ctx, members)
|
||||
if conf.Analyzer != nil {
|
||||
conf.Analyzer.AnalyzeAndTriggerEvent(ctx, conf, realTimedatas)
|
||||
} else {
|
||||
// 键存在,但类型不对,进行错误处理或转换尝试
|
||||
fmt.Printf("键 '%s' 存在但类型错误: 期望 float64, 实际 %T\n", key, value)
|
||||
logger.Error(ctx, "analyzer is not initialized for this measurement", "uuid", uuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
edgeKey := "edge"
|
||||
if value, exists := data[edgeKey]; exists {
|
||||
if stringVal, ok := value.(string); ok {
|
||||
switch stringVal {
|
||||
case "raising":
|
||||
fmt.Println(" -> 识别到 edge: raising")
|
||||
case "falling":
|
||||
fmt.Println(" -> 识别到 edge: falling")
|
||||
default:
|
||||
fmt.Println(" -> 识别到其他 edge 值")
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("键 '%s' 存在但类型错误: 期望 string, 实际 %T\n", edgeKey, value)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("键 '%s' 不存在\n", edgeKey)
|
||||
}
|
||||
}
|
||||
|
||||
// ReceiveChan define func to real time data receive and process
|
||||
|
|
@ -275,17 +321,17 @@ func ReceiveChan(ctx context.Context, consumerConfig *kafka.ConfigMap, topics []
|
|||
}
|
||||
}
|
||||
|
||||
type RealTimeDataPayload struct {
|
||||
type realTimeDataPayload struct {
|
||||
ComponentUUID string
|
||||
Values []float64
|
||||
}
|
||||
|
||||
type RealTimeData struct {
|
||||
Payload RealTimeDataPayload
|
||||
type realTimeData struct {
|
||||
Payload realTimeDataPayload
|
||||
}
|
||||
|
||||
func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) {
|
||||
var realTimeData RealTimeData
|
||||
func parseKafkaMessage(msgValue []byte) (*realTimeData, error) {
|
||||
var realTimeData realTimeData
|
||||
err := json.Unmarshal(msgValue, &realTimeData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unmarshal real time data failed: %w", err)
|
||||
|
|
@ -293,7 +339,7 @@ func parseKafkaMessage(msgValue []byte) (*RealTimeData, error) {
|
|||
return &realTimeData, nil
|
||||
}
|
||||
|
||||
func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
|
||||
func processRealTimeData(ctx context.Context, realTimeData *realTimeData) {
|
||||
componentUUID := realTimeData.Payload.ComponentUUID
|
||||
component, err := diagram.GetComponentMap(componentUUID)
|
||||
if err != nil {
|
||||
|
|
@ -314,7 +360,6 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
|
|||
var anchorRealTimeData []float64
|
||||
var calculateFunc func(archorValue float64, args ...float64) float64
|
||||
|
||||
// 收集实时数据
|
||||
for _, param := range realTimeData.Payload.Values {
|
||||
anchorRealTimeData = append(anchorRealTimeData, param)
|
||||
}
|
||||
|
|
@ -338,10 +383,8 @@ func processRealTimeData(ctx context.Context, realTimeData *RealTimeData) {
|
|||
return
|
||||
}
|
||||
|
||||
// TODO 使用select避免channel阻塞
|
||||
select {
|
||||
case anchorChan <- anchorConfig:
|
||||
// 成功发送
|
||||
case <-ctx.Done():
|
||||
logger.Info(ctx, "context done while sending to anchor chan")
|
||||
case <-time.After(5 * time.Second):
|
||||
|
|
|
|||
|
|
@ -0,0 +1,33 @@
|
|||
// Package util provide some utility fun
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"modelRT/logger"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// ConvertZSetMembersToFloat64 define func to conver zset member type to float64
|
||||
func ConvertZSetMembersToFloat64(ctx context.Context, members []redis.Z) []float64 {
|
||||
dataFloats := make([]float64, 0, len(members))
|
||||
|
||||
for _, member := range members {
|
||||
valStr, ok := member.Member.(string)
|
||||
if !ok {
|
||||
logger.Warn(ctx, "redis zset member value is not a string,skipping")
|
||||
continue
|
||||
}
|
||||
|
||||
valFloat, err := strconv.ParseFloat(valStr, 64)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "failed to parse zset member string to float64", "value", valStr, "error", err)
|
||||
continue
|
||||
}
|
||||
dataFloats = append(dataFloats, valFloat)
|
||||
}
|
||||
|
||||
return dataFloats
|
||||
}
|
||||
Loading…
Reference in New Issue