336 lines
9.9 KiB
Go
336 lines
9.9 KiB
Go
// Package realtimedata define real time data operation functions
|
|
package realtimedata
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"modelRT/constants"
|
|
"modelRT/logger"
|
|
"modelRT/real-time-data/event"
|
|
)
|
|
|
|
// RealTimeAnalyzer define interface general methods for real-time data analysis and event triggering
|
|
type RealTimeAnalyzer interface {
|
|
AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64)
|
|
}
|
|
|
|
// teEventThresholds define struct of store the telemetry float point threshold parsed from conf field cause
|
|
type teEventThresholds struct {
|
|
up float64
|
|
upup float64
|
|
down float64
|
|
downdown float64
|
|
isFloatCause bool
|
|
}
|
|
|
|
// parseTEThresholds define func to parse telemetry thresholds by casue map
|
|
func parseTEThresholds(cause map[string]any) (teEventThresholds, error) {
|
|
t := teEventThresholds{}
|
|
floatKeys := map[string]*float64{
|
|
"upup": &t.upup,
|
|
"up": &t.up,
|
|
"down": &t.down,
|
|
"downdown": &t.downdown,
|
|
}
|
|
|
|
for key, ptr := range floatKeys {
|
|
if value, exists := cause[key]; exists {
|
|
if floatVal, ok := value.(float64); ok {
|
|
*ptr = floatVal
|
|
t.isFloatCause = true
|
|
} else {
|
|
return teEventThresholds{}, fmt.Errorf("key:%s type is incorrect. expected float64, actual %T", key, value)
|
|
}
|
|
}
|
|
}
|
|
|
|
// quickly check mutual exclusion
|
|
if _, exists := cause["edge"]; exists && t.isFloatCause {
|
|
return teEventThresholds{}, errors.New("cause config error: 'up/down' keys and 'edge' key are mutually exclusive, but both found")
|
|
}
|
|
return t, nil
|
|
}
|
|
|
|
// getTEBreachType define func to determine which type of out-of-limit the telemetry real time data belongs to
|
|
func getTEBreachType(value float64, t teEventThresholds) string {
|
|
if t.upup > 0 && value > t.upup {
|
|
return constants.TelemetryUpUpLimit
|
|
}
|
|
if t.up > 0 && value > t.up {
|
|
return constants.TelemetryUpLimit
|
|
}
|
|
if t.downdown > 0 && value < t.downdown {
|
|
return constants.TelemetryDownDownLimit
|
|
}
|
|
if t.down > 0 && value < t.down {
|
|
return constants.TelemetryDownLimit
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// TEAnalyzer define struct of store the thresholds required for telemetry and implements the analysis logic.
|
|
type TEAnalyzer struct {
|
|
Thresholds teEventThresholds
|
|
}
|
|
|
|
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
|
|
func (t *TEAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
|
|
analyzeTEDataLogic(ctx, conf, t.Thresholds, realTimeValues)
|
|
}
|
|
|
|
// analyzeTEDataLogic define func to processing telemetry data and event triggering
|
|
func analyzeTEDataLogic(ctx context.Context, conf *ComputeConfig, thresholds teEventThresholds, realTimeValues []float64) {
|
|
windowSize := conf.minBreachCount
|
|
if windowSize <= 0 {
|
|
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
|
return
|
|
}
|
|
|
|
// mark whether any events have been triggered in this batch
|
|
var eventTriggered bool
|
|
breachTriggers := map[string]bool{
|
|
"up": false, "upup": false, "down": false, "downdown": false,
|
|
}
|
|
|
|
// implement slide window to determine breach counts
|
|
for i := 0; i <= len(realTimeValues)-windowSize; i++ {
|
|
window := realTimeValues[i : i+windowSize]
|
|
firstValueBreachType := getTEBreachType(window[0], thresholds)
|
|
|
|
if firstValueBreachType == "" {
|
|
continue
|
|
}
|
|
|
|
allMatch := true
|
|
for j := 1; j < windowSize; j++ {
|
|
currentValueBreachType := getTEBreachType(window[j], thresholds)
|
|
if currentValueBreachType != firstValueBreachType {
|
|
allMatch = false
|
|
break
|
|
}
|
|
}
|
|
|
|
if allMatch {
|
|
// in the case of a continuous sequence of out-of-limit events, check whether this type of event has already been triggered in the current batch of data
|
|
if !breachTriggers[firstValueBreachType] {
|
|
// trigger event
|
|
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstValueBreachType, "value", window[windowSize-1])
|
|
|
|
breachTriggers[firstValueBreachType] = true
|
|
eventTriggered = true
|
|
}
|
|
}
|
|
}
|
|
|
|
if eventTriggered {
|
|
command, content := genTEEventCommandAndContent(conf.Action)
|
|
// TODO 考虑 content 是否可以为空,先期不允许
|
|
if command == "" || content == "" {
|
|
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
|
return
|
|
}
|
|
event.TriggerEventAction(ctx, command, content)
|
|
return
|
|
}
|
|
logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.")
|
|
}
|
|
|
|
func genTEEventCommandAndContent(action map[string]any) (command string, content string) {
|
|
cmdValue, exist := action["command"]
|
|
if !exist {
|
|
return "", ""
|
|
}
|
|
|
|
commandStr, ok := cmdValue.(string)
|
|
if !ok {
|
|
return "", ""
|
|
}
|
|
command = commandStr
|
|
|
|
paramsValue, exist := action["parametes"]
|
|
if !exist {
|
|
return command, ""
|
|
}
|
|
|
|
parameterSlice, ok := paramsValue.([]string)
|
|
if !ok {
|
|
return command, ""
|
|
}
|
|
|
|
var builder strings.Builder
|
|
for i, parameter := range parameterSlice {
|
|
if i > 0 {
|
|
builder.WriteString(",")
|
|
}
|
|
builder.WriteString(parameter)
|
|
}
|
|
|
|
return command, builder.String()
|
|
}
|
|
|
|
// tiEventThresholds define struct of store the telesignal float point threshold parsed from conf field cause
|
|
type tiEventThresholds struct {
|
|
edge string
|
|
isFloatCause bool
|
|
}
|
|
|
|
// parseTEThresholds define func to parse telesignal thresholds by casue map
|
|
func parseTIThresholds(cause map[string]any) (tiEventThresholds, error) {
|
|
edgeKey := "edge"
|
|
t := tiEventThresholds{
|
|
isFloatCause: false,
|
|
}
|
|
|
|
if value, exists := cause[edgeKey]; exists {
|
|
if strVal, ok := value.(string); ok {
|
|
switch strVal {
|
|
case "raising", "falling":
|
|
t.edge = strVal
|
|
return t, nil
|
|
default:
|
|
return tiEventThresholds{}, fmt.Errorf("key:%s value is incorrect, actual value %s. expected 'raising' or 'falling'", edgeKey, strVal)
|
|
}
|
|
} else {
|
|
return tiEventThresholds{}, fmt.Errorf("key:%s already exists but type is incorrect. expected string, actual %T", edgeKey, value)
|
|
}
|
|
}
|
|
|
|
return tiEventThresholds{}, fmt.Errorf("cause map is invalid for telesignal: missing required key '%s'", edgeKey)
|
|
}
|
|
|
|
// getTIBreachType define func to determine which type of out-of-limit the telesignal real time data belongs to
|
|
func getTIBreachType(currentValue float64, previousValue float64, t tiEventThresholds) string {
|
|
if t.edge == constants.TelesignalRaising {
|
|
if previousValue == 0.0 && currentValue == 1.0 {
|
|
return constants.TIBreachTriggerType
|
|
}
|
|
} else if t.edge == constants.TelesignalFalling {
|
|
if previousValue == 1.0 && currentValue == 0.0 {
|
|
return constants.TIBreachTriggerType
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
// TIAnalyzer define struct of store the thresholds required for remote signaling and implements the analysis logic
|
|
type TIAnalyzer struct {
|
|
Thresholds tiEventThresholds
|
|
}
|
|
|
|
// AnalyzeAndTriggerEvent define func to implemented the RealTimeAnalyzer interface
|
|
func (t *TIAnalyzer) AnalyzeAndTriggerEvent(ctx context.Context, conf *ComputeConfig, realTimeValues []float64) {
|
|
analyzeTIDataLogic(ctx, conf, t.Thresholds, realTimeValues)
|
|
}
|
|
|
|
// analyzeTIDataLogic define func to processing telesignal data and event triggering
|
|
func analyzeTIDataLogic(ctx context.Context, conf *ComputeConfig, thresholds tiEventThresholds, realTimeValues []float64) {
|
|
windowSize := conf.minBreachCount
|
|
if windowSize <= 0 {
|
|
logger.Error(ctx, "variable minBreachCount is invalid or zero, analysis skipped", "minBreachCount", windowSize)
|
|
return
|
|
}
|
|
|
|
numDataPoints := len(realTimeValues)
|
|
if numDataPoints < 2 {
|
|
logger.Info(ctx, "data points less than 2, no change event possible, analysis skipped", "data_points", numDataPoints)
|
|
return
|
|
}
|
|
|
|
// pre calculate the change event type for all adjacent point pairs
|
|
numChanges := numDataPoints - 1
|
|
changeBreachTypes := make([]string, numChanges)
|
|
|
|
for i := range numChanges {
|
|
previousValue := realTimeValues[i]
|
|
currentValue := realTimeValues[i+1]
|
|
|
|
changeBreachTypes[i] = getTIBreachType(currentValue, previousValue, thresholds)
|
|
}
|
|
|
|
if numChanges < windowSize {
|
|
logger.Error(ctx, "number of change events is less than window size, analysis skipped", "num_changes", numChanges, "window_size", windowSize)
|
|
return
|
|
}
|
|
|
|
var eventTriggered bool
|
|
breachTriggers := map[string]bool{
|
|
constants.TIBreachTriggerType: false,
|
|
}
|
|
|
|
for i := 0; i <= numChanges-windowSize; i++ {
|
|
windowBreachTypes := changeBreachTypes[i : i+windowSize]
|
|
firstBreachType := windowBreachTypes[0]
|
|
|
|
if firstBreachType == "" {
|
|
continue
|
|
}
|
|
|
|
allMatch := true
|
|
for j := 1; j < windowSize; j++ {
|
|
if windowBreachTypes[j] != firstBreachType {
|
|
allMatch = false
|
|
break
|
|
}
|
|
}
|
|
|
|
if allMatch {
|
|
if !breachTriggers[firstBreachType] {
|
|
finalValueIndex := i + windowSize
|
|
logger.Warn(ctx, "event triggered by sliding window", "breach_type", firstBreachType, "value", realTimeValues[finalValueIndex])
|
|
|
|
breachTriggers[firstBreachType] = true
|
|
eventTriggered = true
|
|
}
|
|
}
|
|
}
|
|
|
|
if eventTriggered {
|
|
command, content := genTIEventCommandAndContent(conf.Action)
|
|
// TODO 考虑 content 是否可以为空,先期不允许
|
|
if command == "" || content == "" {
|
|
logger.Error(ctx, "generate telemetry evnet command or content failed", "action", conf.Action, "command", command, "content", content)
|
|
return
|
|
}
|
|
event.TriggerEventAction(ctx, command, content)
|
|
return
|
|
}
|
|
logger.Info(ctx, "the real time data analysis has been completed. no continuous boundary violations were found, and no penalties will be imposed.")
|
|
}
|
|
|
|
func genTIEventCommandAndContent(action map[string]any) (command string, content string) {
|
|
cmdValue, exist := action["command"]
|
|
if !exist {
|
|
return "", ""
|
|
}
|
|
|
|
commandStr, ok := cmdValue.(string)
|
|
if !ok {
|
|
return "", ""
|
|
}
|
|
command = commandStr
|
|
|
|
paramsValue, exist := action["parametes"]
|
|
if !exist {
|
|
return command, ""
|
|
}
|
|
|
|
parameterSlice, ok := paramsValue.([]string)
|
|
if !ok {
|
|
return command, ""
|
|
}
|
|
|
|
var builder strings.Builder
|
|
for i, parameter := range parameterSlice {
|
|
if i > 0 {
|
|
builder.WriteString(",")
|
|
}
|
|
builder.WriteString(parameter)
|
|
}
|
|
|
|
return command, builder.String()
|
|
}
|