modelRT/real-time-data/compute_analyzer.go

334 lines
9.6 KiB
Go

// Package realtimedata define real time data operation functions
package realtimedata
import (
"context"
"errors"
"fmt"
"strings"
"modelRT/constants"
"modelRT/logger"
"modelRT/real-time-data/event"
)
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
type RealTimeAnalyzer interface {
AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64)
}
// teEventThresholds define struct of store the telemetry float point threshold parsed from conf field cause
type teEventThresholds struct {
up float64
upup float64
down float64
downdown float64
isFloatCause bool
}
// parseTEThresholds define func to parse telemetry thresholds by casue map
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
t := teEventThresholds{}
floatKeys := map[string]*float64{
"upup": &t.upup,
"up": &t.up,
"down": &t.down,
"downdown": &t.downdown,
}
for key, ptr := range floatKeys {
if value, exists := cause[key]; exists {
if floatVal, ok := value.(float64); ok {
*ptr = floatVal
t.isFloatCause = true
} else {
return teEventThresholds{}, fmt.Errorf("key:%s type is incorrect. expected float64, actual %T", key, value)
}
}
}
// quickly check mutual exclusion
if _, exists := cause["edge"]; exists && t.isFloatCause {
return teEventThresholds{}, errors.New("cause config error: 'up/down' keys and 'edge' key are mutually exclusive, but both found")
}
return t, nil
}
// getTEBreachType define func to determine which type of out-of-limit the telemetry real time data belongs to
func getTEBreachType(value float64, t teEventThresholds) string {
if t.upup > 0 && value > t.upup {
return constants.TelemetryUpUpLimit
}
if t.up > 0 && value > t.up {
return constants.TelemetryUpLimit
}
if t.downdown > 0 && value < t.downdown {
return constants.TelemetryDownDownLimit
}
if t.down > 0 && value < t.down {
return constants.TelemetryDownLimit
}
return ""
}
// TEAnalyzer define struct of store the thresholds required for telemetry and implements the analysis logic.
type TEAnalyzer struct {
Thresholds teEventThresholds
}
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
analyzeTEDataLogic(ctx, conf, t.Thresholds, realTimeValues)
}
// analyzeTEDataLogic define func to processing telemetry data and event triggering
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
windowSize := conf.minBreachCount
if windowSize <= 0 {
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
return
}
// mark whether any events have been triggered in this batch
var eventTriggered bool
breachTriggers := map[string]bool{
"up": false, "upup": false, "down": false, "downdown": false,
}
// implement slide window to determine breach counts
for i := 0; i <= len(realTimeValues)-windowSize; i++ {
window := realTimeValues[i : i+windowSize]
firstValueBreachType := getTEBreachType(window[0], thresholds)
if firstValueBreachType == "" {
continue
}
allMatch := true
for j := 1; j < windowSize; j++ {
currentValueBreachType := getTEBreachType(window[j], thresholds)
if currentValueBreachType != firstValueBreachType {
allMatch = false
break
}
}
if allMatch {
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
if !breachTriggers[firstValueBreachType] {
// trigger event
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1])
breachTriggers[firstValueBreachType] = true
eventTriggered = true
}
}
}
if eventTriggered {
command, content := genTEEventCommandAndContent(conf.Action)
// TODO 考虑 content 是否可以为空,先期不允许
if command == "" || content == "" {
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
return
}
event.TriggerEventAction(ctx, command, content)
return
}
}
func genTEEventCommandAndContent(action map[string]any) (command string, content string) {
cmdValue, exist := action["command"]
if !exist {
return "", ""
}
commandStr, ok := cmdValue.(string)
if !ok {
return "", ""
}
command = commandStr
paramsValue, exist := action["parametes"]
if !exist {
return command, ""
}
parameterSlice, ok := paramsValue.([]string)
if !ok {
return command, ""
}
var builder strings.Builder
for i, parameter := range parameterSlice {
if i > 0 {
builder.WriteString(",")
}
builder.WriteString(parameter)
}
return command, builder.String()
}
// tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause
type tiEventThresholds struct {
edge string
isFloatCause bool
}
// parseTEThresholds define func to parse telesignal thresholds by casue map
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
edgeKey := "edge"
t := tiEventThresholds{
isFloatCause: false,
}
if value, exists := cause[edgeKey]; exists {
if strVal, ok := value.(string); ok {
switch strVal {
case "raising", "falling":
t.edge = strVal
return t, nil
default:
return tiEventThresholds{}, fmt.Errorf("key:%s value is incorrect, actual value %s. expected 'raising' or 'falling'", edgeKey, strVal)
}
} else {
return tiEventThresholds{}, fmt.Errorf("key:%s already exists but type is incorrect. expected string, actual %T", edgeKey, value)
}
}
return tiEventThresholds{}, fmt.Errorf("cause map is invalid for telesignal: missing required key '%s'", edgeKey)
}
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
if t.edge == constants.TelesignalRaising {
if previousValue == 0.0 && currentValue == 1.0 {
return constants.TIBreachTriggerType
}
} else if t.edge == constants.TelesignalFalling {
if previousValue == 1.0 && currentValue == 0.0 {
return constants.TIBreachTriggerType
}
}
return ""
}
// TIAnalyzer define struct of store the thresholds required for remote signaling and implements the analysis logic
type TIAnalyzer struct {
Thresholds tiEventThresholds
}
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
func (t *TIAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
analyzeTIDataLogic(ctx, conf, t.Thresholds, realTimeValues)
}
// analyzeTIDataLogic define func to processing telesignal data and event triggering
func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiEventThresholds, realTimeValues []float64) {
windowSize := conf.minBreachCount
if windowSize <= 0 {
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
return
}
numDataPoints := len(realTimeValues)
if numDataPoints < 2 {
logger.Info(ctx, "data points less than 2, no change event possible, analysis skipped", "data_points", numDataPoints)
return
}
// pre calculate the change event type for all adjacent point pairs
numChanges := numDataPoints - 1
changeBreachTypes := make([]string, numChanges)
for i := range numChanges {
previousValue := realTimeValues[i]
currentValue := realTimeValues[i+1]
changeBreachTypes[i] = getTIBreachType(currentValue, previousValue, thresholds)
}
if numChanges < windowSize {
logger.Error(ctx, "number of change events is less than window size, analysis skipped", "num_changes", numChanges, "window_size", windowSize)
return
}
var eventTriggered bool
breachTriggers := map[string]bool{
constants.TIBreachTriggerType: false,
}
for i := 0; i <= numChanges-windowSize; i++ {
windowBreachTypes := changeBreachTypes[i : i+windowSize]
firstBreachType := windowBreachTypes[0]
if firstBreachType == "" {
continue
}
allMatch := true
for j := 1; j < windowSize; j++ {
if windowBreachTypes[j] != firstBreachType {
allMatch = false
break
}
}
if allMatch {
if !breachTriggers[firstBreachType] {
finalValueIndex := i + windowSize
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstBreachType, "value", realTimeValues[finalValueIndex])
breachTriggers[firstBreachType] = true
eventTriggered = true
}
}
}
if eventTriggered {
command, content := genTIEventCommandAndContent(conf.Action)
// TODO 考虑 content 是否可以为空,先期不允许
if command == "" || content == "" {
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
return
}
event.TriggerEventAction(ctx, command, content)
return
}
}
func genTIEventCommandAndContent(action map[string]any) (command string, content string) {
cmdValue, exist := action["command"]
if !exist {
return "", ""
}
commandStr, ok := cmdValue.(string)
if !ok {
return "", ""
}
command = commandStr
paramsValue, exist := action["parametes"]
if !exist {
return command, ""
}
parameterSlice, ok := paramsValue.([]string)
if !ok {
return command, ""
}
var builder strings.Builder
for i, parameter := range parameterSlice {
if i > 0 {
builder.WriteString(",")
}
builder.WriteString(parameter)
}
return command, builder.String()
}